Don't panic in continue_run if no groups found within range (#62)

This commit is contained in:
Azrenbeth
2021-09-14 17:29:48 +01:00
committed by GitHub
parent a409cdbd8e
commit 55ee83ce13
3 changed files with 85 additions and 78 deletions

View File

@@ -30,7 +30,7 @@ fn continue_run_called_twice_same_as_run() {
let room_id = "room1".to_string(); let room_id = "room1".to_string();
// will run the compression in two batches // will run the compression in two batches
let start = -1; let start = None;
let chunk_size = 7; let chunk_size = 7;
// compress in 3,3 level sizes // compress in 3,3 level sizes
@@ -38,7 +38,7 @@ fn continue_run_called_twice_same_as_run() {
let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
// Run the compressor with those settings // Run the compressor with those settings
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
// Assert that it stopped at 6 (i.e. after the 7 groups 0...6) // Assert that it stopped at 6 (i.e. after the 7 groups 0...6)
assert_eq!(chunk_stats_1.last_compressed_group, 6); assert_eq!(chunk_stats_1.last_compressed_group, 6);
@@ -53,12 +53,12 @@ fn continue_run_called_twice_same_as_run() {
vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))] vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))]
); );
let start = 6; let start = Some(6);
let chunk_size = 7; let chunk_size = 7;
let level_info = chunk_stats_1.new_level_info.clone(); let level_info = chunk_stats_1.new_level_info.clone();
// Run the compressor with those settings // Run the compressor with those settings
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info); let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
// Assert that it stopped at 7 // Assert that it stopped at 7
assert_eq!(chunk_stats_2.last_compressed_group, 13); assert_eq!(chunk_stats_2.last_compressed_group, 13);

View File

@@ -27,6 +27,7 @@ use super::StateGroupEntry;
/// specific room. /// specific room.
/// ///
/// Returns with the state_group map and the id of the last group that was used /// Returns with the state_group map and the id of the last group that was used
/// Or None if there are no state groups within the range given
/// ///
/// # Arguments /// # Arguments
/// ///
@@ -36,6 +37,8 @@ use super::StateGroupEntry;
/// * `min_state_group` - If specified, then only fetch the entries for state /// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It /// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified /// also requires groups_to_compress to be specified
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number.
/// * 'groups_to_compress' - The number of groups to get from the database before stopping /// * 'groups_to_compress' - The number of groups to get from the database before stopping
pub fn get_data_from_db( pub fn get_data_from_db(
@@ -44,7 +47,7 @@ pub fn get_data_from_db(
min_state_group: Option<i64>, min_state_group: Option<i64>,
groups_to_compress: Option<i64>, groups_to_compress: Option<i64>,
max_state_group: Option<i64>, max_state_group: Option<i64>,
) -> (BTreeMap<i64, StateGroupEntry>, i64) { ) -> Option<(BTreeMap<i64, StateGroupEntry>, i64)> {
// connect to the database // connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE); builder.set_verify(SslVerifyMode::NONE);
@@ -53,16 +56,27 @@ pub fn get_data_from_db(
let mut client = Client::connect(db_url, connector) let mut client = Client::connect(db_url, connector)
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new(); // Search for the group id of the groups_to_compress'th group after min_state_group
// If this is saved, then the compressor can continue by having min_state_group being
// set to this maximum. If no such group can be found then return None.
load_map_from_db( let max_group_found = find_max_group(
&mut client, &mut client,
room_id, room_id,
min_state_group, min_state_group,
groups_to_compress, groups_to_compress,
max_state_group, max_state_group,
)?;
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
Some(load_map_from_db(
&mut client,
room_id,
min_state_group,
max_group_found,
state_group_map, state_group_map,
) ))
} }
/// Fetch the entries in state_groups_state (and their prev groups) for a /// Fetch the entries in state_groups_state (and their prev groups) for a
@@ -71,6 +85,7 @@ pub fn get_data_from_db(
/// of each of the levels (as they were at the end of the last run of the compressor) /// of each of the levels (as they were at the end of the last run of the compressor)
/// ///
/// Returns with the state_group map and the id of the last group that was used /// Returns with the state_group map and the id of the last group that was used
/// Or None if there are no state groups within the range given
/// ///
/// # Arguments /// # Arguments
/// ///
@@ -81,8 +96,6 @@ pub fn get_data_from_db(
/// groups greater than (but not equal) to this number. It /// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified /// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping /// * 'groups_to_compress' - The number of groups to get from the database before stopping
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number.
/// * 'level_info' - The maximum size, current length and current head for each /// * 'level_info' - The maximum size, current length and current head for each
/// level (as it was when the compressor last finished for this /// level (as it was when the compressor last finished for this
/// room) /// room)
@@ -92,7 +105,7 @@ pub fn reload_data_from_db(
min_state_group: Option<i64>, min_state_group: Option<i64>,
groups_to_compress: Option<i64>, groups_to_compress: Option<i64>,
level_info: &[Level], level_info: &[Level],
) -> (BTreeMap<i64, StateGroupEntry>, i64) { ) -> Option<(BTreeMap<i64, StateGroupEntry>, i64)> {
// connect to the database // connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE); builder.set_verify(SslVerifyMode::NONE);
@@ -101,20 +114,30 @@ pub fn reload_data_from_db(
let mut client = Client::connect(db_url, connector) let mut client = Client::connect(db_url, connector)
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e)); .unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
// load just the state_groups at the head of each level // Search for the group id of the groups_to_compress'th group after min_state_group
// this doesn't load their predecessors as that will be done at the end of // If this is saved, then the compressor can continue by having min_state_group being
// load_map_from_db() // set to this maximum.If no such group can be found then return None.
let state_group_map: BTreeMap<i64, StateGroupEntry> = load_level_heads(&mut client, level_info); let max_group_found = find_max_group(
load_map_from_db(
&mut client, &mut client,
room_id, room_id,
min_state_group, min_state_group,
groups_to_compress, groups_to_compress,
// max state group not used when saving and loading // max state group not used when saving and loading
None, None,
)?;
// load just the state_groups at the head of each level
// this doesn't load their predecessors as that will be done at the end of
// load_map_from_db()
let state_group_map: BTreeMap<i64, StateGroupEntry> = load_level_heads(&mut client, level_info);
Some(load_map_from_db(
&mut client,
room_id,
min_state_group,
max_group_found,
state_group_map, state_group_map,
) ))
} }
/// Finds the state_groups that are at the head of each compressor level /// Finds the state_groups that are at the head of each compressor level
@@ -181,28 +204,16 @@ fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap<i64,
/// * `min_state_group` - If specified, then only fetch the entries for state /// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It /// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified /// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping /// * 'max_group_found' - The last group to get from the database before stopping
/// * 'state_group_map' - The map to populate with the entries from the database /// * 'state_group_map' - The map to populate with the entries from the database
fn load_map_from_db( fn load_map_from_db(
client: &mut Client, client: &mut Client,
room_id: &str, room_id: &str,
min_state_group: Option<i64>, min_state_group: Option<i64>,
groups_to_compress: Option<i64>, max_group_found: i64,
max_state_group: Option<i64>,
mut state_group_map: BTreeMap<i64, StateGroupEntry>, mut state_group_map: BTreeMap<i64, StateGroupEntry>,
) -> (BTreeMap<i64, StateGroupEntry>, i64) { ) -> (BTreeMap<i64, StateGroupEntry>, i64) {
// Search for the group id of the groups_to_compress'th group after min_state_group
// If this is saved, then the compressor can continue by having min_state_group being
// set to this maximum
let max_group_found = find_max_group(
client,
room_id,
min_state_group,
groups_to_compress,
max_state_group,
);
state_group_map.append(&mut get_initial_data_from_db( state_group_map.append(&mut get_initial_data_from_db(
client, client,
room_id, room_id,
@@ -261,7 +272,8 @@ fn load_map_from_db(
/// Returns the group ID of the last group to be compressed /// Returns the group ID of the last group to be compressed
/// ///
/// This can be saved so that future runs of the compressor only /// This can be saved so that future runs of the compressor only
/// continue from after this point /// continue from after this point. If no groups can be found in
/// the range specified it returns None.
/// ///
/// # Arguments /// # Arguments
/// ///
@@ -276,7 +288,7 @@ fn find_max_group(
min_state_group: Option<i64>, min_state_group: Option<i64>,
groups_to_compress: Option<i64>, groups_to_compress: Option<i64>,
max_state_group: Option<i64>, max_state_group: Option<i64>,
) -> i64 { ) -> Option<i64> {
// Get list of state_id's in a certain room // Get list of state_id's in a certain room
let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string(); let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string();
let params: Vec<&(dyn ToSql + Sync)>; let params: Vec<&(dyn ToSql + Sync)>;
@@ -285,22 +297,33 @@ fn find_max_group(
query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max) query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max)
} }
// Adds additional constraint if a groups_to_compress has been specified // Adds additional constraint if a groups_to_compress or min_state_group have been specified
// Note a min state group is only used if groups_to_compress also is
if min_state_group.is_some() && groups_to_compress.is_some() { if min_state_group.is_some() && groups_to_compress.is_some() {
params = vec![&room_id, &min_state_group, &groups_to_compress]; params = vec![&room_id, &min_state_group, &groups_to_compress];
query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids); query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids);
} else if groups_to_compress.is_some() {
params = vec![&room_id, &groups_to_compress];
query_chunk_of_ids = format!(r"{} LIMIT $2", query_chunk_of_ids);
} else { } else {
params = vec![&room_id]; params = vec![&room_id];
query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids);
} }
let sql_query = format!( let sql_query = format!(
"SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1", "SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1",
query_chunk_of_ids query_chunk_of_ids
); );
let final_row = client.query(sql_query.as_str(), &params).unwrap();
final_row.last().unwrap().get(0) // This vector should have length 0 or 1
let rows = client
.query(sql_query.as_str(), &params)
.expect("Something went wrong while querying the database");
// If no row can be found then return None
let final_row = rows.last()?;
// Else return the id of the group found
Some(final_row.get::<_, i64>(0))
} }
/// Fetch the entries in state_groups_state and immediate predecessors for /// Fetch the entries in state_groups_state and immediate predecessors for
@@ -330,22 +353,18 @@ fn get_initial_data_from_db(
FROM state_groups AS m FROM state_groups AS m
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group) LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group) LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
WHERE m.room_id = $1 WHERE m.room_id = $1 AND m.id <= $2
"#; "#;
// Adds additional constraint if minimum state_group has been specified. // Adds additional constraint if minimum state_group has been specified.
// note that the maximum group only affects queries if there is also a minimum
// otherwise it is assumed that ALL groups should be fetched
let mut rows = if let Some(min) = min_state_group { let mut rows = if let Some(min) = min_state_group {
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found]; let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found, &min];
client.query_raw( client.query_raw(format!(r"{} AND m.id > $3", sql).as_str(), params)
format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(),
params,
)
} else { } else {
client.query_raw(sql, &[room_id]) let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found];
client.query_raw(sql, params)
} }
.unwrap(); .expect("Something went wrong while querying the database");
// Copy the data from the database into a map // Copy the data from the database into a map
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new(); let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
@@ -481,13 +500,16 @@ fn test_pg_escape() {
assert_eq!(&s[start_pos - 1..start_pos], "$"); assert_eq!(&s[start_pos - 1..start_pos], "$");
} }
/// Send changes to the database
///
/// Note that currently ignores config.transactions and wraps every state /// Note that currently ignores config.transactions and wraps every state
/// group in it's own transaction (i.e. as if config.transactions was true) /// group in it's own transaction (i.e. as if config.transactions was true)
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `config` - A Config struct that contains information /// * `db_url` - The URL of a Postgres database. This should be of the
/// about the run (e.g. room_id and database url) /// form: "postgresql://user:pass@domain:port/database"
/// * `room_id` - The ID of the room in the database
/// * `old_map` - The state group data originally in the database /// * `old_map` - The state group data originally in the database
/// * `new_map` - The state group data generated by the compressor to /// * `new_map` - The state group data generated by the compressor to
/// replace replace the old contents /// replace replace the old contents

View File

@@ -298,7 +298,9 @@ pub fn run(mut config: Config) {
config.min_state_group, config.min_state_group,
config.groups_to_compress, config.groups_to_compress,
config.max_state_group, config.max_state_group,
); )
.unwrap_or_else(|| panic!("No state groups found within this range"));
println!("Fetched state groups up to {}", max_group_found); println!("Fetched state groups up to {}", max_group_found);
println!("Number of state groups: {}", state_group_map.len()); println!("Number of state groups: {}", state_group_map.len());
@@ -510,6 +512,7 @@ fn output_sql(
} }
/// Information about what compressor did to chunk that it was ran on /// Information about what compressor did to chunk that it was ran on
#[derive(Debug)]
pub struct ChunkStats { pub struct ChunkStats {
// The state of each of the levels of the compressor when it stopped // The state of each of the levels of the compressor when it stopped
pub new_level_info: Vec<Level>, pub new_level_info: Vec<Level>,
@@ -524,16 +527,18 @@ pub struct ChunkStats {
pub commited: bool, pub commited: bool,
} }
/// Loads a compressor state, runs it on a room and then returns info on how it got on
pub fn continue_run( pub fn continue_run(
start: i64, start: Option<i64>,
chunk_size: i64, chunk_size: i64,
db_url: &str, db_url: &str,
room_id: &str, room_id: &str,
level_info: &[Level], level_info: &[Level],
) -> ChunkStats { ) -> Option<ChunkStats> {
// First we need to get the current state groups // First we need to get the current state groups
// If nothing was found then return None
let (state_group_map, max_group_found) = let (state_group_map, max_group_found) =
database::reload_data_from_db(db_url, room_id, Some(start), Some(chunk_size), level_info); database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum(); let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
@@ -548,48 +553,28 @@ pub fn continue_run(
let ratio = (new_num_rows as f64) / (original_num_rows as f64); let ratio = (new_num_rows as f64) / (original_num_rows as f64);
println!(
"Number of rows after compression: {} ({:.2}%)",
new_num_rows,
ratio * 100.
);
println!("Compression Statistics:");
println!(
" Number of forced resets due to lacking prev: {}",
compressor.stats.resets_no_suitable_prev
);
println!(
" Number of compressed rows caused by the above: {}",
compressor.stats.resets_no_suitable_prev_size
);
println!(
" Number of state groups changed: {}",
compressor.stats.state_groups_changed
);
if ratio > 1.0 { if ratio > 1.0 {
println!("This compression would not remove any rows. Aborting."); println!("This compression would not remove any rows. Aborting.");
return ChunkStats { return Some(ChunkStats {
new_level_info: compressor.get_level_info(), new_level_info: compressor.get_level_info(),
last_compressed_group: max_group_found, last_compressed_group: max_group_found,
original_num_rows, original_num_rows,
new_num_rows, new_num_rows,
commited: false, commited: false,
}; });
} }
check_that_maps_match(&state_group_map, new_state_group_map); check_that_maps_match(&state_group_map, new_state_group_map);
database::send_changes_to_db(db_url, room_id, &state_group_map, new_state_group_map); database::send_changes_to_db(db_url, room_id, &state_group_map, new_state_group_map);
ChunkStats { Some(ChunkStats {
new_level_info: compressor.get_level_info(), new_level_info: compressor.get_level_info(),
last_compressed_group: max_group_found, last_compressed_group: max_group_found,
original_num_rows, original_num_rows,
new_num_rows, new_num_rows,
commited: true, commited: true,
} })
} }
/// Compares two sets of state groups /// Compares two sets of state groups