diff --git a/compressor_integration_tests/tests/compressor_continue_run_tests.rs b/compressor_integration_tests/tests/compressor_continue_run_tests.rs new file mode 100644 index 0000000..0936eb7 --- /dev/null +++ b/compressor_integration_tests/tests/compressor_continue_run_tests.rs @@ -0,0 +1,82 @@ +use compressor_integration_tests::{ + add_contents_to_database, database_collapsed_states_match_map, database_structure_matches_map, + empty_database, + map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state}, + DB_URL, +}; +use serial_test::serial; +use synapse_compress_state::{continue_run, Level}; + +// Tests the saving and continuing functionality +// The compressor should produce the same results when run in one go +// as when run in multiple stages +#[test] +#[serial(db)] +fn continue_run_called_twice_same_as_run() { + // This starts with the following structure + // + // 0-1-2 3-4-5 6-7-8 9-10-11 12-13 + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + let initial = line_segments_with_state(0, 13); + + // Place this initial state into an empty database + empty_database(); + add_contents_to_database("room1", &initial); + + let db_url = DB_URL.to_string(); + let room_id = "room1".to_string(); + + // will run the compression in two batches + let start = -1; + let chunk_size = 7; + + // compress in 3,3 level sizes + // since the compressor hasn't been run before they are empty + let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; + + // Run the compressor with those settings + let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); + + // Assert that it stopped at 6 (i.e. after the 7 groups 0...6) + assert_eq!(chunk_stats_1.last_compressed_group, 6); + // structure should be the following at this point + // (NOTE: only including compressed groups) + // + // 0 3\ + // 1 4 6 + // 2 5 + assert_eq!( + chunk_stats_1.new_level_info, + vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))] + ); + + let start = 6; + let chunk_size = 7; + let level_info = chunk_stats_1.new_level_info.clone(); + + // Run the compressor with those settings + let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); + + // Assert that it stopped at 7 + assert_eq!(chunk_stats_2.last_compressed_group, 13); + + // This should have created the following structure in the database + // i.e. groups 6 and 9 should have changed from before + // N.B. this saves 11 rows + // + // 0 3\ 12 + // 1 4 6\ 13 + // 2 5 7 9 + // 8 10 + // 11 + let expected = compressed_3_3_from_0_to_13_with_state(); + + // Check that the database still gives correct states for each group! + assert!(database_collapsed_states_match_map(&initial)); + + // Check that the structure of the database matches the expected structure + assert!(database_structure_matches_map(&expected)) +} diff --git a/src/compressor.rs b/src/compressor.rs index c33d7e9..26bacd2 100644 --- a/src/compressor.rs +++ b/src/compressor.rs @@ -36,8 +36,8 @@ use string_cache::DefaultAtom as Atom; use super::{collapse_state_maps, StateGroupEntry}; /// Holds information about a particular level. -#[derive(Debug)] -struct Level { +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Level { /// The maximum size this level is allowed to be max_length: usize, /// The (approximate) current chain length of this level. This is equivalent @@ -57,11 +57,20 @@ impl Level { } } + /// Creates a new level from stored state + pub fn restore(max_length: usize, current_chain_length: usize, current: Option) -> Level { + Level { + max_length, + current_chain_length, + current, + } + } + /// Update the current head of this level. If delta is true then it means /// that given state group will (probably) reference the previous head. /// /// Panics if `delta` is true and the level is already full. - pub fn update(&mut self, current: i64, delta: bool) { + fn update(&mut self, current: i64, delta: bool) { self.current = Some(current); if delta { @@ -128,6 +137,35 @@ impl<'a> Compressor<'a> { compressor } + /// Creates a compressor and runs the compression algorithm. + /// used when restoring compressor state from a previous run + /// in which case the levels heads are also known + pub fn compress_from_save( + original_state_map: &'a BTreeMap, + // level_info: &[(usize, usize, Option)], + level_info: &[Level], + ) -> Compressor<'a> { + let levels = level_info + .iter() + .map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).current)) + .collect(); + + let mut compressor = Compressor { + original_state_map, + new_state_group_map: BTreeMap::new(), + levels, + stats: Stats::default(), + }; + + compressor.create_new_tree(); + compressor + } + + /// Returns all the state required to save the compressor so it can be continued later + pub fn get_level_info(&self) -> Vec { + self.levels.clone() + } + /// Actually runs the compression algorithm fn create_new_tree(&mut self) { if !self.new_state_group_map.is_empty() { diff --git a/src/database.rs b/src/database.rs index 9d1bf8c..bb0da27 100644 --- a/src/database.rs +++ b/src/database.rs @@ -19,17 +19,13 @@ use postgres_openssl::MakeTlsConnector; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::{borrow::Cow, collections::BTreeMap, fmt}; -use crate::{generate_sql, Config}; +use crate::{compressor::Level, generate_sql}; use super::StateGroupEntry; /// Fetch the entries in state_groups_state (and their prev groups) for a /// specific room. /// -/// - Connects to the database -/// - Fetches the first [group] rows with group id after [min] -/// - Recursively searches for missing predecessors and adds those -/// /// Returns with the state_group map and the id of the last group that was used /// /// # Arguments @@ -41,8 +37,7 @@ use super::StateGroupEntry; /// groups greater than (but not equal) to this number. It /// also requires groups_to_compress to be specified /// * 'groups_to_compress' - The number of groups to get from the database before stopping -/// * `max_state_group` - If specified, then only fetch the entries for state -/// groups lower than or equal to this number. + pub fn get_data_from_db( db_url: &str, room_id: &str, @@ -58,19 +53,165 @@ pub fn get_data_from_db( let mut client = Client::connect(db_url, connector) .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); + let state_group_map: BTreeMap = BTreeMap::new(); + + load_map_from_db( + &mut client, + room_id, + min_state_group, + groups_to_compress, + max_state_group, + state_group_map, + ) +} + +/// Fetch the entries in state_groups_state (and their prev groups) for a +/// specific room. This method should only be called if resuming the compressor from +/// where it last finished - and as such also loads in the state groups from the heads +/// of each of the levels (as they were at the end of the last run of the compressor) +/// +/// Returns with the state_group map and the id of the last group that was used +/// +/// # Arguments +/// +/// * `room_id` - The ID of the room in the database +/// * `db_url` - The URL of a Postgres database. This should be of the +/// form: "postgresql://user:pass@domain:port/database" +/// * `min_state_group` - If specified, then only fetch the entries for state +/// groups greater than (but not equal) to this number. It +/// also requires groups_to_compress to be specified +/// * 'groups_to_compress' - The number of groups to get from the database before stopping +/// * `max_state_group` - If specified, then only fetch the entries for state +/// groups lower than or equal to this number. +/// * 'level_info' - The maximum size, current length and current head for each +/// level (as it was when the compressor last finished for this +/// room) +pub fn reload_data_from_db( + db_url: &str, + room_id: &str, + min_state_group: Option, + groups_to_compress: Option, + level_info: &[Level], +) -> (BTreeMap, i64) { + // connect to the database + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let mut client = Client::connect(db_url, connector) + .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); + + // load just the state_groups at the head of each level + // this doesn't load their predecessors as that will be done at the end of + // load_map_from_db() + let state_group_map: BTreeMap = load_level_heads(&mut client, level_info); + + load_map_from_db( + &mut client, + room_id, + min_state_group, + groups_to_compress, + // max state group not used when saving and loading + None, + state_group_map, + ) +} + +/// Finds the state_groups that are at the head of each compressor level +/// NOTE this does not also retrieve their predecessors +/// +/// # Arguments +/// +/// * `client' - A Postgres client to make requests with +/// * `levels' - The levels who's heads are being requested +fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap { + // obtain all of the heads that aren't None from level_info + let level_heads: Vec = level_info + .iter() + .filter_map(|l| (*l).get_current()) + .collect(); + + // Query to get id, predecessor and deltas for each state group + let sql = r#" + SELECT m.id, prev_state_group, type, state_key, s.event_id + FROM state_groups AS m + LEFT JOIN state_groups_state AS s ON (m.id = s.state_group) + LEFT JOIN state_group_edges AS e ON (m.id = e.state_group) + WHERE m.id = ANY($1) + "#; + + // Actually do the query + let mut rows = client.query_raw(sql, &[&level_heads]).unwrap(); + + // Copy the data from the database into a map + let mut state_group_map: BTreeMap = BTreeMap::new(); + + while let Some(row) = rows.next().unwrap() { + // The row in the map to copy the data to + // NOTE: default StateGroupEntry has in_range as false + // This is what we want since as a level head, it has already been compressed by the + // previous run! + let entry = state_group_map.entry(row.get(0)).or_default(); + + // Save the predecessor (this may already be there) + entry.prev_state_group = row.get(1); + + // Copy the single delta from the predecessor stored in this row + if let Some(etype) = row.get::<_, Option>(2) { + entry.state_map.insert( + &etype, + &row.get::<_, String>(3), + row.get::<_, String>(4).into(), + ); + } + } + state_group_map +} + +/// Fetch the entries in state_groups_state (and their prev groups) for a +/// specific room within a certain range. These are appended onto the provided +/// map. +/// +/// - Fetches the first [group] rows with group id after [min] +/// - Recursively searches for missing predecessors and adds those +/// +/// Returns with the state_group map and the id of the last group that was used +/// +/// # Arguments +/// +/// * `client` - A Postgres client to make requests with +/// * `room_id` - The ID of the room in the database +/// * `min_state_group` - If specified, then only fetch the entries for state +/// groups greater than (but not equal) to this number. It +/// also requires groups_to_compress to be specified +/// * 'groups_to_compress' - The number of groups to get from the database before stopping +/// * 'state_group_map' - The map to populate with the entries from the database + +fn load_map_from_db( + client: &mut Client, + room_id: &str, + min_state_group: Option, + groups_to_compress: Option, + max_state_group: Option, + mut state_group_map: BTreeMap, +) -> (BTreeMap, i64) { // Search for the group id of the groups_to_compress'th group after min_state_group // If this is saved, then the compressor can continue by having min_state_group being // set to this maximum let max_group_found = find_max_group( - &mut client, + client, room_id, min_state_group, groups_to_compress, max_state_group, ); - let mut state_group_map = - get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found); + state_group_map.append(&mut get_initial_data_from_db( + client, + room_id, + min_state_group, + max_group_found, + )); println!("Got initial state from database. Checking for any missing state groups..."); @@ -111,7 +252,7 @@ pub fn get_data_from_db( // println!("Missing {} state groups", missing_sgs.len()); // find state groups not picked up already and add them to the map - let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found); + let map = get_missing_from_db(client, &missing_sgs, min_state_group, max_group_found); for (k, v) in map { state_group_map.entry(k).or_insert(v); } @@ -354,7 +495,8 @@ fn test_pg_escape() { /// * `new_map` - The state group data generated by the compressor to /// replace replace the old contents pub fn send_changes_to_db( - config: &Config, + db_url: &str, + room_id: &str, old_map: &BTreeMap, new_map: &BTreeMap, ) { @@ -363,7 +505,7 @@ pub fn send_changes_to_db( builder.set_verify(SslVerifyMode::NONE); let connector = MakeTlsConnector::new(builder.build()); - let mut client = Client::connect(&config.db_url, connector).unwrap(); + let mut client = Client::connect(db_url, connector).unwrap(); println!("Writing changes..."); @@ -375,7 +517,7 @@ pub fn send_changes_to_db( pb.set_message("state groups"); pb.enable_steady_tick(100); - for sql_transaction in generate_sql(old_map, new_map, &config.room_id) { + for sql_transaction in generate_sql(old_map, new_map, room_id) { // commit this change to the database // N.B. this is a synchronous library so will wait until finished before continueing... // if want to speed up compressor then this might be a good place to start! diff --git a/src/lib.rs b/src/lib.rs index 334e024..33b9366 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ mod compressor; mod database; mod graphing; +pub use compressor::Level; + use compressor::Compressor; use database::PGEscape; @@ -373,7 +375,12 @@ pub fn run(mut config: Config) { // If commit_changes is set then commit the changes to the database if config.commit_changes { - database::send_changes_to_db(&config, &state_group_map, new_state_group_map); + database::send_changes_to_db( + &config.db_url, + &config.room_id, + &state_group_map, + new_state_group_map, + ); } if config.graphs { @@ -506,6 +513,89 @@ fn output_sql( pb.finish(); } +/// Information about what compressor did to chunk that it was ran on +pub struct ChunkStats { + // The state of each of the levels of the compressor when it stopped + pub new_level_info: Vec, + // The last state_group that was compressed + // (to continue from where the compressor stopped, call with this as 'start' value) + pub last_compressed_group: i64, + // The number of rows in the database for the current chunk of state_groups before compressing + pub original_num_rows: usize, + // The number of rows in the database for the current chunk of state_groups after compressing + pub new_num_rows: usize, + // Whether or not the changes were commited to the database + pub commited: bool, +} + +pub fn continue_run( + start: i64, + chunk_size: i64, + db_url: &str, + room_id: &str, + level_info: &[Level], +) -> ChunkStats { + // First we need to get the current state groups + let (state_group_map, max_group_found) = + database::reload_data_from_db(db_url, room_id, Some(start), Some(chunk_size), level_info); + + let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum(); + + // Now we actually call the compression algorithm. + let compressor = Compressor::compress_from_save(&state_group_map, level_info); + let new_state_group_map = &compressor.new_state_group_map; + + // Done! Now to print a bunch of stats. + let new_num_rows = new_state_group_map + .iter() + .fold(0, |acc, (_, v)| acc + v.state_map.len()); + + let ratio = (new_num_rows as f64) / (original_num_rows as f64); + + println!( + "Number of rows after compression: {} ({:.2}%)", + new_num_rows, + ratio * 100. + ); + + println!("Compression Statistics:"); + println!( + " Number of forced resets due to lacking prev: {}", + compressor.stats.resets_no_suitable_prev + ); + println!( + " Number of compressed rows caused by the above: {}", + compressor.stats.resets_no_suitable_prev_size + ); + println!( + " Number of state groups changed: {}", + compressor.stats.state_groups_changed + ); + + if ratio > 1.0 { + println!("This compression would not remove any rows. Aborting."); + return ChunkStats { + new_level_info: compressor.get_level_info(), + last_compressed_group: max_group_found, + original_num_rows, + new_num_rows, + commited: false, + }; + } + + check_that_maps_match(&state_group_map, new_state_group_map); + + database::send_changes_to_db(db_url, room_id, &state_group_map, new_state_group_map); + + ChunkStats { + new_level_info: compressor.get_level_info(), + last_compressed_group: max_group_found, + original_num_rows, + new_num_rows, + commited: true, + } +} + /// Compares two sets of state groups /// /// A state group entry contains a predecessor state group and a delta.