diff --git a/compressor_integration_tests/tests/compressor_continue_run_tests.rs b/compressor_integration_tests/tests/compressor_continue_run_tests.rs index 0936eb7..a09ac13 100644 --- a/compressor_integration_tests/tests/compressor_continue_run_tests.rs +++ b/compressor_integration_tests/tests/compressor_continue_run_tests.rs @@ -30,7 +30,7 @@ fn continue_run_called_twice_same_as_run() { let room_id = "room1".to_string(); // will run the compression in two batches - let start = -1; + let start = None; let chunk_size = 7; // compress in 3,3 level sizes @@ -38,7 +38,7 @@ fn continue_run_called_twice_same_as_run() { let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; // Run the compressor with those settings - let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); + let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap(); // Assert that it stopped at 6 (i.e. after the 7 groups 0...6) assert_eq!(chunk_stats_1.last_compressed_group, 6); @@ -53,12 +53,12 @@ fn continue_run_called_twice_same_as_run() { vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))] ); - let start = 6; + let start = Some(6); let chunk_size = 7; let level_info = chunk_stats_1.new_level_info.clone(); // Run the compressor with those settings - let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); + let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap(); // Assert that it stopped at 7 assert_eq!(chunk_stats_2.last_compressed_group, 13); diff --git a/src/database.rs b/src/database.rs index 03b9301..23a5836 100644 --- a/src/database.rs +++ b/src/database.rs @@ -27,6 +27,7 @@ use super::StateGroupEntry; /// specific room. /// /// Returns with the state_group map and the id of the last group that was used +/// Or None if there are no state groups within the range given /// /// # Arguments /// @@ -36,6 +37,8 @@ use super::StateGroupEntry; /// * `min_state_group` - If specified, then only fetch the entries for state /// groups greater than (but not equal) to this number. It /// also requires groups_to_compress to be specified +/// * `max_state_group` - If specified, then only fetch the entries for state +/// groups lower than or equal to this number. /// * 'groups_to_compress' - The number of groups to get from the database before stopping pub fn get_data_from_db( @@ -44,7 +47,7 @@ pub fn get_data_from_db( min_state_group: Option, groups_to_compress: Option, max_state_group: Option, -) -> (BTreeMap, i64) { +) -> Option<(BTreeMap, i64)> { // connect to the database let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); @@ -53,16 +56,27 @@ pub fn get_data_from_db( let mut client = Client::connect(db_url, connector) .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); - let state_group_map: BTreeMap = BTreeMap::new(); + // Search for the group id of the groups_to_compress'th group after min_state_group + // If this is saved, then the compressor can continue by having min_state_group being + // set to this maximum. If no such group can be found then return None. - load_map_from_db( + let max_group_found = find_max_group( &mut client, room_id, min_state_group, groups_to_compress, max_state_group, + )?; + + let state_group_map: BTreeMap = BTreeMap::new(); + + Some(load_map_from_db( + &mut client, + room_id, + min_state_group, + max_group_found, state_group_map, - ) + )) } /// Fetch the entries in state_groups_state (and their prev groups) for a @@ -71,6 +85,7 @@ pub fn get_data_from_db( /// of each of the levels (as they were at the end of the last run of the compressor) /// /// Returns with the state_group map and the id of the last group that was used +/// Or None if there are no state groups within the range given /// /// # Arguments /// @@ -81,8 +96,6 @@ pub fn get_data_from_db( /// groups greater than (but not equal) to this number. It /// also requires groups_to_compress to be specified /// * 'groups_to_compress' - The number of groups to get from the database before stopping -/// * `max_state_group` - If specified, then only fetch the entries for state -/// groups lower than or equal to this number. /// * 'level_info' - The maximum size, current length and current head for each /// level (as it was when the compressor last finished for this /// room) @@ -92,7 +105,7 @@ pub fn reload_data_from_db( min_state_group: Option, groups_to_compress: Option, level_info: &[Level], -) -> (BTreeMap, i64) { +) -> Option<(BTreeMap, i64)> { // connect to the database let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); @@ -101,20 +114,30 @@ pub fn reload_data_from_db( let mut client = Client::connect(db_url, connector) .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); - // load just the state_groups at the head of each level - // this doesn't load their predecessors as that will be done at the end of - // load_map_from_db() - let state_group_map: BTreeMap = load_level_heads(&mut client, level_info); - - load_map_from_db( + // Search for the group id of the groups_to_compress'th group after min_state_group + // If this is saved, then the compressor can continue by having min_state_group being + // set to this maximum.If no such group can be found then return None. + let max_group_found = find_max_group( &mut client, room_id, min_state_group, groups_to_compress, // max state group not used when saving and loading None, + )?; + + // load just the state_groups at the head of each level + // this doesn't load their predecessors as that will be done at the end of + // load_map_from_db() + let state_group_map: BTreeMap = load_level_heads(&mut client, level_info); + + Some(load_map_from_db( + &mut client, + room_id, + min_state_group, + max_group_found, state_group_map, - ) + )) } /// Finds the state_groups that are at the head of each compressor level @@ -181,28 +204,16 @@ fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap, - groups_to_compress: Option, - max_state_group: Option, + max_group_found: i64, mut state_group_map: BTreeMap, ) -> (BTreeMap, i64) { - // Search for the group id of the groups_to_compress'th group after min_state_group - // If this is saved, then the compressor can continue by having min_state_group being - // set to this maximum - let max_group_found = find_max_group( - client, - room_id, - min_state_group, - groups_to_compress, - max_state_group, - ); - state_group_map.append(&mut get_initial_data_from_db( client, room_id, @@ -261,7 +272,8 @@ fn load_map_from_db( /// Returns the group ID of the last group to be compressed /// /// This can be saved so that future runs of the compressor only -/// continue from after this point +/// continue from after this point. If no groups can be found in +/// the range specified it returns None. /// /// # Arguments /// @@ -276,7 +288,7 @@ fn find_max_group( min_state_group: Option, groups_to_compress: Option, max_state_group: Option, -) -> i64 { +) -> Option { // Get list of state_id's in a certain room let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string(); let params: Vec<&(dyn ToSql + Sync)>; @@ -285,22 +297,33 @@ fn find_max_group( query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max) } - // Adds additional constraint if a groups_to_compress has been specified + // Adds additional constraint if a groups_to_compress or min_state_group have been specified + // Note a min state group is only used if groups_to_compress also is if min_state_group.is_some() && groups_to_compress.is_some() { params = vec![&room_id, &min_state_group, &groups_to_compress]; query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids); + } else if groups_to_compress.is_some() { + params = vec![&room_id, &groups_to_compress]; + query_chunk_of_ids = format!(r"{} LIMIT $2", query_chunk_of_ids); } else { params = vec![&room_id]; - query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids); } let sql_query = format!( "SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1", query_chunk_of_ids ); - let final_row = client.query(sql_query.as_str(), ¶ms).unwrap(); - final_row.last().unwrap().get(0) + // This vector should have length 0 or 1 + let rows = client + .query(sql_query.as_str(), ¶ms) + .expect("Something went wrong while querying the database"); + + // If no row can be found then return None + let final_row = rows.last()?; + + // Else return the id of the group found + Some(final_row.get::<_, i64>(0)) } /// Fetch the entries in state_groups_state and immediate predecessors for @@ -330,22 +353,18 @@ fn get_initial_data_from_db( FROM state_groups AS m LEFT JOIN state_groups_state AS s ON (m.id = s.state_group) LEFT JOIN state_group_edges AS e ON (m.id = e.state_group) - WHERE m.room_id = $1 + WHERE m.room_id = $1 AND m.id <= $2 "#; // Adds additional constraint if minimum state_group has been specified. - // note that the maximum group only affects queries if there is also a minimum - // otherwise it is assumed that ALL groups should be fetched let mut rows = if let Some(min) = min_state_group { - let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found]; - client.query_raw( - format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(), - params, - ) + let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found, &min]; + client.query_raw(format!(r"{} AND m.id > $3", sql).as_str(), params) } else { - client.query_raw(sql, &[room_id]) + let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found]; + client.query_raw(sql, params) } - .unwrap(); + .expect("Something went wrong while querying the database"); // Copy the data from the database into a map let mut state_group_map: BTreeMap = BTreeMap::new(); @@ -481,13 +500,16 @@ fn test_pg_escape() { assert_eq!(&s[start_pos - 1..start_pos], "$"); } +/// Send changes to the database +/// /// Note that currently ignores config.transactions and wraps every state /// group in it's own transaction (i.e. as if config.transactions was true) /// /// # Arguments /// -/// * `config` - A Config struct that contains information -/// about the run (e.g. room_id and database url) +/// * `db_url` - The URL of a Postgres database. This should be of the +/// form: "postgresql://user:pass@domain:port/database" +/// * `room_id` - The ID of the room in the database /// * `old_map` - The state group data originally in the database /// * `new_map` - The state group data generated by the compressor to /// replace replace the old contents diff --git a/src/lib.rs b/src/lib.rs index 3a237a7..87d250b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -298,7 +298,9 @@ pub fn run(mut config: Config) { config.min_state_group, config.groups_to_compress, config.max_state_group, - ); + ) + .unwrap_or_else(|| panic!("No state groups found within this range")); + println!("Fetched state groups up to {}", max_group_found); println!("Number of state groups: {}", state_group_map.len()); @@ -510,6 +512,7 @@ fn output_sql( } /// Information about what compressor did to chunk that it was ran on +#[derive(Debug)] pub struct ChunkStats { // The state of each of the levels of the compressor when it stopped pub new_level_info: Vec, @@ -524,16 +527,18 @@ pub struct ChunkStats { pub commited: bool, } +/// Loads a compressor state, runs it on a room and then returns info on how it got on pub fn continue_run( - start: i64, + start: Option, chunk_size: i64, db_url: &str, room_id: &str, level_info: &[Level], -) -> ChunkStats { +) -> Option { // First we need to get the current state groups + // If nothing was found then return None let (state_group_map, max_group_found) = - database::reload_data_from_db(db_url, room_id, Some(start), Some(chunk_size), level_info); + database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?; let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum(); @@ -548,48 +553,28 @@ pub fn continue_run( let ratio = (new_num_rows as f64) / (original_num_rows as f64); - println!( - "Number of rows after compression: {} ({:.2}%)", - new_num_rows, - ratio * 100. - ); - - println!("Compression Statistics:"); - println!( - " Number of forced resets due to lacking prev: {}", - compressor.stats.resets_no_suitable_prev - ); - println!( - " Number of compressed rows caused by the above: {}", - compressor.stats.resets_no_suitable_prev_size - ); - println!( - " Number of state groups changed: {}", - compressor.stats.state_groups_changed - ); - if ratio > 1.0 { println!("This compression would not remove any rows. Aborting."); - return ChunkStats { + return Some(ChunkStats { new_level_info: compressor.get_level_info(), last_compressed_group: max_group_found, original_num_rows, new_num_rows, commited: false, - }; + }); } check_that_maps_match(&state_group_map, new_state_group_map); database::send_changes_to_db(db_url, room_id, &state_group_map, new_state_group_map); - ChunkStats { + Some(ChunkStats { new_level_info: compressor.get_level_info(), last_compressed_group: max_group_found, original_num_rows, new_num_rows, commited: true, - } + }) } /// Compares two sets of state groups