diff --git a/src/compressor.rs b/src/compressor.rs index 77aa189..07d7dec 100644 --- a/src/compressor.rs +++ b/src/compressor.rs @@ -141,6 +141,23 @@ impl<'a> Compressor<'a> { pb.enable_steady_tick(100); for (&state_group, entry) in self.original_state_map { + // Check whether this entry is in_range or is just present in the map due to being + // a predecessor of a group that IS in_range for compression + if !entry.in_range { + let new_entry = StateGroupEntry { + // in_range is kept the same so that the new entry is equal to the old entry + // otherwise it might trigger a useless database transaction + in_range: entry.in_range, + prev_state_group: entry.prev_state_group, + state_map: entry.state_map.clone(), + }; + // Paranoidly assert that not making changes to this entry + // could probably be removed... + assert!(new_entry == *entry); + self.new_state_group_map.insert(state_group, new_entry); + + continue; + } let mut prev_state_group = None; for level in &mut self.levels { if level.has_space() { @@ -162,6 +179,7 @@ impl<'a> Compressor<'a> { self.new_state_group_map.insert( state_group, StateGroupEntry { + in_range: true, prev_state_group, state_map: delta, }, @@ -239,6 +257,7 @@ fn test_new_map() { initial.insert( i, StateGroupEntry { + in_range: true, prev_state_group: prev, state_map: StateMap::new(), }, diff --git a/src/database.rs b/src/database.rs index 9bb9fc5..2813c5b 100644 --- a/src/database.rs +++ b/src/database.rs @@ -25,40 +25,62 @@ use super::StateGroupEntry; /// specific room. /// /// - Connects to the database -/// - Fetches rows with group id lower than max +/// - Fetches the first [group] rows with group id after [min] /// - Recursively searches for missing predecessors and adds those /// +/// Returns with the state_group map and the id of the last group that was used +/// /// # Arguments /// -/// * `room_id` - The ID of the room in the database -/// * `db_url` - The URL of a Postgres database. This should be of the -/// form: "postgresql://user:pass@domain:port/database" -/// * `max_state_group` - If specified, then only fetch the entries for state -/// groups lower than or equal to this number. (N.B. all -/// predecessors are also fetched) +/// * `room_id` - The ID of the room in the database +/// * `db_url` - The URL of a Postgres database. This should be of the +/// form: "postgresql://user:pass@domain:port/database" +/// * `min_state_group` - If specified, then only fetch the entries for state +/// groups greater than (but not equal) to this number. It +/// also requires groups_to_compress to be specified +/// * 'groups_to_compress' - The number of groups to get from the database before stopping +/// * `max_state_group` - If specified, then only fetch the entries for state +/// groups lower than or equal to this number. pub fn get_data_from_db( db_url: &str, room_id: &str, + min_state_group: Option, + groups_to_compress: Option, max_state_group: Option, -) -> BTreeMap { +) -> (BTreeMap, i64) { + // connect to the database let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); let connector = MakeTlsConnector::new(builder.build()); let mut client = Client::connect(db_url, connector).unwrap(); - let mut state_group_map = get_initial_data_from_db(&mut client, room_id, max_state_group); + // Search for the group id of the groups_to_compress'th group after min_state_group + // If this is saved, then the compressor can continue by having min_state_group being + // set to this maximum + let max_group_found = find_max_group( + &mut client, + room_id, + min_state_group, + groups_to_compress, + max_state_group, + ); + + let mut state_group_map = + get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found); println!("Got initial state from database. Checking for any missing state groups..."); // Due to reasons some of the state groups appear in the edges table, but - // not in the state_groups_state table. This means they don't get included - // in our DB queries, so we have to fetch any missing groups explicitly. + // not in the state_groups_state table. + // + // Also it is likely that the predecessor of a node will not be within the + // chunk that was specified by min_state_group and groups_to_compress. + // This means they don't get included in our DB queries, so we have to fetch + // any missing groups explicitly. + // // Since the returned groups may themselves reference groups we don't have, // we need to do this recursively until we don't find any more missing. - // - // N.B. This does NOT currently fetch the deltas for the missing groups! - // By carefully chosen max_state_group this might cause issues...? loop { let mut missing_sgs: Vec<_> = state_group_map .iter() @@ -76,41 +98,92 @@ pub fn get_data_from_db( .collect(); if missing_sgs.is_empty() { - println!("No missing state groups"); + // println!("No missing state groups"); break; } missing_sgs.sort_unstable(); missing_sgs.dedup(); - println!("Missing {} state groups", missing_sgs.len()); + // println!("Missing {} state groups", missing_sgs.len()); - let map = get_missing_from_db(&mut client, &missing_sgs); - state_group_map.extend(map.into_iter()); + // find state groups not picked up already and add them to the map + let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found); + for (k, v) in map { + state_group_map.entry(k).or_insert(v); + } } - state_group_map + (state_group_map, max_group_found) +} + +/// Returns the group ID of the last group to be compressed +/// +/// This can be saved so that future runs of the compressor only +/// continue from after this point +/// +/// # Arguments +/// +/// * `client` - A Postgres client to make requests with +/// * `room_id` - The ID of the room in the database +/// * `min_state_group` - The lower limit (non inclusive) of group id's to compress +/// * 'groups_to_compress' - How many groups to compress +/// * `max_state_group` - The upper bound on what this method can return +fn find_max_group( + client: &mut Client, + room_id: &str, + min_state_group: Option, + groups_to_compress: Option, + max_state_group: Option, +) -> i64 { + // Get list of state_id's in a certain room + let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string(); + let params: Vec<&(dyn ToSql + Sync)>; + + if let Some(max) = max_state_group { + query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max) + } + + // Adds additional constraint if a groups_to_compress has been specified + if min_state_group.is_some() && groups_to_compress.is_some() { + params = vec![&room_id, &min_state_group, &groups_to_compress]; + query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids); + } else { + params = vec![&room_id]; + query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids); + } + + let sql_query = format!( + "SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1", + query_chunk_of_ids + ); + let final_row = client.query(sql_query.as_str(), ¶ms).unwrap(); + + final_row.last().unwrap().get(0) } /// Fetch the entries in state_groups_state and immediate predecessors for /// a specific room. /// -/// - Fetches rows with group id lower than max +/// - Fetches first [groups_to_compress] rows with group id higher than min /// - Stores the group id, predecessor id and deltas into a map +/// - returns map and maximum row that was considered /// /// # Arguments /// /// * `client` - A Postgres client to make requests with /// * `room_id` - The ID of the room in the database -/// * `max_state_group` - If specified, then only fetch the entries for state -/// groups lower than or equal to this number. (N.B. doesn't -/// fetch IMMEDIATE predecessors if ID is above this number) +/// * `min_state_group` - If specified, then only fetch the entries for state +/// groups greater than (but not equal) to this number. It +/// also requires groups_to_compress to be specified +/// * 'max_group_found' - The upper limit on state_groups ids to get from the database fn get_initial_data_from_db( client: &mut Client, room_id: &str, - max_state_group: Option, + min_state_group: Option, + max_group_found: i64, ) -> BTreeMap { - // Query to get id, predecessor and delta for each state group + // Query to get id, predecessor and deltas for each state group let sql = r#" SELECT m.id, prev_state_group, type, state_key, s.event_id FROM state_groups AS m @@ -119,18 +192,21 @@ fn get_initial_data_from_db( WHERE m.room_id = $1 "#; - // Adds additional constraint if a max_state_group has been specified - // Then sends query to the datatbase - let mut rows = if let Some(s) = max_state_group { - let params: Vec<&dyn ToSql> = vec![&room_id, &s]; - client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params) + // Adds additional constraint if minimum state_group has been specified. + // note that the maximum group only affects queries if there is also a minimum + // otherwise it is assumed that ALL groups should be fetched + let mut rows = if let Some(min) = min_state_group { + let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found]; + client.query_raw( + format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(), + params, + ) } else { client.query_raw(sql, &[room_id]) } .unwrap(); // Copy the data from the database into a map - let mut state_group_map: BTreeMap = BTreeMap::new(); let pb = ProgressBar::new_spinner(); @@ -143,8 +219,10 @@ fn get_initial_data_from_db( // The row in the map to copy the data to let entry = state_group_map.entry(row.get(0)).or_default(); - // Save the predecessor (this may already be there) + // Save the predecessor and mark for compression (this may already be there) + // TODO: slightly fewer redundant rewrites entry.prev_state_group = row.get(1); + entry.in_range = true; // Copy the single delta from the predecessor stored in this row if let Some(etype) = row.get::<_, Option>(2) { @@ -172,34 +250,57 @@ fn get_initial_data_from_db( /// /// * `client` - A Postgres client to make requests with /// * `missing_sgs` - An array of missing state_group ids -fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap { - let mut rows = client - .query_raw( - r#" - SELECT state_group, prev_state_group - FROM state_group_edges - WHERE state_group = ANY($1) - "#, - &[missing_sgs], - ) - .unwrap(); +/// * 'min_state_group' - Minimum state_group id to mark as in range +/// * 'max_group_found' - Maximum state_group id to mark as in range +fn get_missing_from_db( + client: &mut Client, + missing_sgs: &[i64], + min_state_group: Option, + max_group_found: i64, +) -> BTreeMap { + // "Due to reasons" it is possible that some states only appear in edges table and not in state_groups table + // so since we know the IDs we're looking for as they are the missing predecessors, we can find them by + // left joining onto the edges table (instead of the state_group table!) + let sql = r#" + SELECT target.prev_state_group, source.prev_state_group, state.type, state.state_key, state.event_id + FROM state_group_edges AS target + LEFT JOIN state_group_edges AS source ON (target.prev_state_group = source.state_group) + LEFT JOIN state_groups_state AS state ON (target.prev_state_group = state.state_group) + WHERE target.prev_state_group = ANY($1) + "#; - // initialise the map with empty entries (the missing group may not - // have a prev_state_group either) - let mut state_group_map: BTreeMap = missing_sgs - .iter() - .map(|sg| (*sg, StateGroupEntry::default())) - .collect(); + let mut rows = client.query_raw(sql, &[missing_sgs]).unwrap(); + + let mut state_group_map: BTreeMap = BTreeMap::new(); while let Some(row) = rows.next().unwrap() { - let state_group = row.get(0); - let entry = state_group_map.get_mut(&state_group).unwrap(); + let id = row.get(0); + // The row in the map to copy the data to + let entry = state_group_map.entry(id).or_default(); + + // Save the predecessor and mark for compression (this may already be there) + // Also may well not exist! entry.prev_state_group = row.get(1); + if let Some(min) = min_state_group { + if min < id && id <= max_group_found { + entry.in_range = true + } + } + + // Copy the single delta from the predecessor stored in this row + if let Some(etype) = row.get::<_, Option>(2) { + entry.state_map.insert( + &etype, + &row.get::<_, String>(3), + row.get::<_, String>(4).into(), + ); + } } state_group_map } +// TODO: find a library that has an existing safe postgres escape function /// Helper function that escapes the wrapped text when writing SQL pub struct PGEscape<'a>(pub &'a str); diff --git a/src/graphing.rs b/src/graphing.rs index 0d1a30d..4985c15 100644 --- a/src/graphing.rs +++ b/src/graphing.rs @@ -57,7 +57,7 @@ fn output_csv(groups: &Graph, edges_output: &mut File, nodes_output: &mut File) /// * `after` - A map from state group ids to StateGroupEntries /// the information from this map goes into after_edges.csv /// and after_nodes.csv -pub fn make_graphs(before: Graph, after: Graph) { +pub fn make_graphs(before: &Graph, after: &Graph) { // Open all the files to output to let mut before_edges_file = File::create("before_edges.csv").unwrap(); let mut before_nodes_file = File::create("before_nodes.csv").unwrap(); @@ -65,7 +65,7 @@ pub fn make_graphs(before: Graph, after: Graph) { let mut after_nodes_file = File::create("after_nodes.csv").unwrap(); // Write before's information to before_edges and before_nodes - output_csv(&before, &mut before_edges_file, &mut before_nodes_file); + output_csv(before, &mut before_edges_file, &mut before_nodes_file); // Write afters's information to after_edges and after_nodes - output_csv(&after, &mut after_edges_file, &mut after_nodes_file); + output_csv(after, &mut after_edges_file, &mut after_nodes_file); } diff --git a/src/lib.rs b/src/lib.rs index c1d80b5..028c416 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,7 @@ use database::PGEscape; /// delta from that previous group (or the full state if no previous group) #[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct StateGroupEntry { + in_range: bool, prev_state_group: Option, state_map: StateMap, } @@ -70,13 +71,40 @@ impl FromStr for LevelSizes { /// Contains configuration information for this run of the compressor pub struct Config { + // the url for the postgres database + // this should be of the form postgres://user:pass@domain/database db_url: String, + // The file where the transactions are written that would carry out + // the compression that get's calculated output_file: Option, + // The ID of the room who's state is being compressed room_id: String, - max_state_group: Option, + // The group to start compressing from + // N.B. THIS STATE ITSELF IS NOT COMPRESSED!!! + // Note there is no state 0 so if want to compress all then can enter 0 + // (this is the same as leaving it blank) + min_state_group: Option, + // How many groups to do the compression on + // Note: State groups within the range specified will get compressed + // if they are in the state_groups table. States that only appear in + // the edges table MIGHT NOT get compressed - it is assumed that these + // groups have no associated state. (Note that this was also an assumption + // in previous versions of the state compressor, and would only be a problem + // if the database was in a bad way already...) + groups_to_compress: Option, + // If the compressor results in less than this many rows being saved then + // it will abort min_saved_rows: Option, - transactions: bool, + // If a max_state_group is specified then only state groups with id's lower + // than this number are able to be compressed. + max_state_group: Option, + // The sizes of the different levels in the new state_group tree being built level_sizes: LevelSizes, + // Whether or not to wrap each change to an individual state_group in a transaction + // This is very much reccomended when running the compression when synapse is live + transactions: bool, + // Whether or not to output before and after directed graphs (these can be + // visualised in somthing like Gephi) graphs: bool, } @@ -102,17 +130,24 @@ impl Config { .takes_value(true) .required(true), ).arg( - Arg::with_name("max_state_group") - .short("s") - .value_name("MAX_STATE_GROUP") - .help("The maximum state group to process up to") + Arg::with_name("min_state_group") + .short("b") + .value_name("MIN_STATE_GROUP") + .help("The state group to start processing from (non inclusive)") .takes_value(true) .required(false), ).arg( Arg::with_name("min_saved_rows") .short("m") .value_name("COUNT") - .help("Suppress output if fewer than COUNT rows would be saved") + .help("Abort if fewer than COUNT rows would be saved") + .takes_value(true) + .required(false), + ).arg( + Arg::with_name("groups_to_compress") + .short("n") + .value_name("GROUPS_TO_COMPRESS") + .help("How many groups to load into memory to compress") .takes_value(true) .required(false), ).arg( @@ -122,10 +157,15 @@ impl Config { .help("File to output the changes to in SQL") .takes_value(true), ).arg( - Arg::with_name("transactions") - .short("t") - .help("Whether to wrap each state group change in a transaction") - .requires("output_file"), + Arg::with_name("max_state_group") + .short("s") + .value_name("MAX_STATE_GROUP") + .help("The maximum state group to process up to") + .long_help(concat!( + "If a max_state_group is specified then only state groups with id's lower", + " than this number are able to be compressed.")) + .takes_value(true) + .required(false), ).arg( Arg::with_name("level_sizes") .short("l") @@ -143,6 +183,11 @@ impl Config { )) .default_value("100,50,25") .takes_value(true), + ).arg( + Arg::with_name("transactions") + .short("t") + .help("Whether to wrap each state group change in a transaction") + .requires("output_file"), ).arg( Arg::with_name("graphs") .short("g") @@ -161,29 +206,39 @@ impl Config { .value_of("room_id") .expect("room_id should be required since no file"); - let max_state_group = matches - .value_of("max_state_group") - .map(|s| s.parse().expect("max_state_group must be an integer")); + let min_state_group = matches + .value_of("min_state_group") + .map(|s| s.parse().expect("min_state_group must be an integer")); + + let groups_to_compress = matches + .value_of("groups_to_compress") + .map(|s| s.parse().expect("groups_to_compress must be an integer")); let min_saved_rows = matches .value_of("min_saved_rows") .map(|v| v.parse().expect("COUNT must be an integer")); - let transactions = matches.is_present("transactions"); + let max_state_group = matches + .value_of("max_state_group") + .map(|s| s.parse().expect("max_state_group must be an integer")); let level_sizes = value_t!(matches, "level_sizes", LevelSizes) .unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e)); + let transactions = matches.is_present("transactions"); + let graphs = matches.is_present("graphs"); Config { db_url: String::from(db_url), output_file, room_id: String::from(room_id), - max_state_group, + min_state_group, + groups_to_compress, min_saved_rows, - transactions, + max_state_group, level_sizes, + transactions, graphs, } } @@ -208,8 +263,14 @@ pub fn run(mut config: Config) { // First we need to get the current state groups println!("Fetching state from DB for room '{}'...", config.room_id); - let state_group_map = - database::get_data_from_db(&config.db_url, &config.room_id, config.max_state_group); + let (state_group_map, max_group_found) = database::get_data_from_db( + &config.db_url, + &config.room_id, + config.min_state_group, + config.groups_to_compress, + config.max_state_group, + ); + println!("Fetched state groups up to {}", max_group_found); println!("Number of state groups: {}", state_group_map.len()); @@ -225,7 +286,7 @@ pub fn run(mut config: Config) { let compressor = Compressor::compress(&state_group_map, &config.level_sizes.0); - let new_state_group_map = compressor.new_state_group_map; + let new_state_group_map = &compressor.new_state_group_map; // Done! Now to print a bunch of stats. @@ -266,16 +327,16 @@ pub fn run(mut config: Config) { } } - check_that_maps_match(&state_group_map, &new_state_group_map); + check_that_maps_match(&state_group_map, new_state_group_map); // If we are given an output file, we output the changes as SQL. If the // `transactions` argument is set we wrap each change to a state group in a // transaction. - output_sql(&mut config, &state_group_map, &new_state_group_map); + output_sql(&mut config, &state_group_map, new_state_group_map); if config.graphs { - graphing::make_graphs(state_group_map, new_state_group_map); + graphing::make_graphs(&state_group_map, new_state_group_map); } } @@ -312,11 +373,11 @@ fn output_sql( for (sg, old_entry) in old_map { let new_entry = &new_map[sg]; + // N.B. also checks if in_range fields agree if old_entry != new_entry { if config.transactions { writeln!(output, "BEGIN;").unwrap(); } - writeln!( output, "DELETE FROM state_group_edges WHERE state_group = {};", @@ -462,10 +523,12 @@ impl Config { db_url: String, room_id: String, output_file: Option, - max_state_group: Option, + min_state_group: Option, + groups_to_compress: Option, min_saved_rows: Option, - transactions: bool, + max_state_group: Option, level_sizes: String, + transactions: bool, graphs: bool, ) -> Result { let mut output: Option = None; @@ -486,10 +549,12 @@ impl Config { db_url, output_file, room_id, - max_state_group, + min_state_group, + groups_to_compress, min_saved_rows, - transactions, + max_state_group, level_sizes, + transactions, graphs, }) } @@ -505,30 +570,38 @@ impl Config { // db_url has no default // room_id has no default output_file = "None", - max_state_group = "None", + min_state_group = "None", + groups_to_compress = "None", min_saved_rows = "None", - transactions = false, + max_state_group = "None", level_sizes = "String::from(\"100,50,25\")", + // have this default to true as is much worse to not have it if you need it + // than to have it and not need it + transactions = true, graphs = false )] fn run_compression( db_url: String, room_id: String, output_file: Option, - max_state_group: Option, + min_state_group: Option, + groups_to_compress: Option, min_saved_rows: Option, - transactions: bool, + max_state_group: Option, level_sizes: String, + transactions: bool, graphs: bool, ) -> PyResult<()> { let config = Config::new( db_url, room_id, output_file, - max_state_group, + min_state_group, + groups_to_compress, min_saved_rows, - transactions, + max_state_group, level_sizes, + transactions, graphs, ); match config {