diff --git a/src/database.rs b/src/database.rs index 292b41c..9bb9fc5 100644 --- a/src/database.rs +++ b/src/database.rs @@ -21,8 +21,21 @@ use std::{borrow::Cow, collections::BTreeMap, fmt}; use super::StateGroupEntry; -/// Fetch the entries in state_groups_state (and their prev groups) for the -/// given `room_id` by connecting to the postgres database at `db_url`. +/// Fetch the entries in state_groups_state (and their prev groups) for a +/// specific room. +/// +/// - Connects to the database +/// - Fetches rows with group id lower than max +/// - Recursively searches for missing predecessors and adds those +/// +/// # Arguments +/// +/// * `room_id` - The ID of the room in the database +/// * `db_url` - The URL of a Postgres database. This should be of the +/// form: "postgresql://user:pass@domain:port/database" +/// * `max_state_group` - If specified, then only fetch the entries for state +/// groups lower than or equal to this number. (N.B. all +/// predecessors are also fetched) pub fn get_data_from_db( db_url: &str, room_id: &str, @@ -43,6 +56,9 @@ pub fn get_data_from_db( // in our DB queries, so we have to fetch any missing groups explicitly. // Since the returned groups may themselves reference groups we don't have, // we need to do this recursively until we don't find any more missing. + // + // N.B. This does NOT currently fetch the deltas for the missing groups! + // By carefully chosen max_state_group this might cause issues...? loop { let mut missing_sgs: Vec<_> = state_group_map .iter() @@ -76,13 +92,25 @@ pub fn get_data_from_db( state_group_map } -/// Fetch the entries in state_groups_state (and their prev groups) for the -/// given `room_id` by fetching all state with the given `room_id`. +/// Fetch the entries in state_groups_state and immediate predecessors for +/// a specific room. +/// +/// - Fetches rows with group id lower than max +/// - Stores the group id, predecessor id and deltas into a map +/// +/// # Arguments +/// +/// * `client` - A Postgres client to make requests with +/// * `room_id` - The ID of the room in the database +/// * `max_state_group` - If specified, then only fetch the entries for state +/// groups lower than or equal to this number. (N.B. doesn't +/// fetch IMMEDIATE predecessors if ID is above this number) fn get_initial_data_from_db( client: &mut Client, room_id: &str, max_state_group: Option, ) -> BTreeMap { + // Query to get id, predecessor and delta for each state group let sql = r#" SELECT m.id, prev_state_group, type, state_key, s.event_id FROM state_groups AS m @@ -91,6 +119,8 @@ fn get_initial_data_from_db( WHERE m.room_id = $1 "#; + // Adds additional constraint if a max_state_group has been specified + // Then sends query to the datatbase let mut rows = if let Some(s) = max_state_group { let params: Vec<&dyn ToSql> = vec![&room_id, &s]; client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params) @@ -99,6 +129,8 @@ fn get_initial_data_from_db( } .unwrap(); + // Copy the data from the database into a map + let mut state_group_map: BTreeMap = BTreeMap::new(); let pb = ProgressBar::new_spinner(); @@ -108,10 +140,13 @@ fn get_initial_data_from_db( pb.enable_steady_tick(100); while let Some(row) = rows.next().unwrap() { + // The row in the map to copy the data to let entry = state_group_map.entry(row.get(0)).or_default(); + // Save the predecessor (this may already be there) entry.prev_state_group = row.get(1); + // Copy the single delta from the predecessor stored in this row if let Some(etype) = row.get::<_, Option>(2) { entry.state_map.insert( &etype, @@ -129,7 +164,14 @@ fn get_initial_data_from_db( state_group_map } -/// Get any missing state groups from the database +/// Finds the predecessors of missing state groups +/// +/// N.B. this does NOT find their deltas +/// +/// # Arguments +/// +/// * `client` - A Postgres client to make requests with +/// * `missing_sgs` - An array of missing state_group ids fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap { let mut rows = client .query_raw( diff --git a/src/lib.rs b/src/lib.rs index 4288288..4a7306b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,15 +16,9 @@ //! Synapse instance's database. Specifically, it aims to reduce the number of //! rows that a given room takes up in the `state_groups_state` table. -mod compressor; -mod database; - #[global_allocator] static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; -use compressor::Compressor; -use database::PGEscape; - use clap::{ crate_authors, crate_description, crate_name, crate_version, value_t_or_exit, App, Arg, }; @@ -34,6 +28,12 @@ use state_map::StateMap; use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr}; use string_cache::DefaultAtom as Atom; +mod compressor; +mod database; + +use compressor::Compressor; +use database::PGEscape; + /// An entry for a state group. Consists of an (optional) previous group and the /// delta from that previous group (or the full state if no previous group) #[derive(Default, Debug, Clone, PartialEq, Eq)] @@ -42,33 +42,6 @@ pub struct StateGroupEntry { state_map: StateMap, } -/// Gets the full state for a given group from the map (of deltas) -fn collapse_state_maps(map: &BTreeMap, state_group: i64) -> StateMap { - let mut entry = &map[&state_group]; - let mut state_map = StateMap::new(); - - let mut stack = vec![state_group]; - - while let Some(prev_state_group) = entry.prev_state_group { - stack.push(prev_state_group); - if !map.contains_key(&prev_state_group) { - panic!("Missing {}", prev_state_group); - } - entry = &map[&prev_state_group]; - } - - for sg in stack.iter().rev() { - state_map.extend( - map[&sg] - .state_map - .iter() - .map(|((t, s), e)| ((t, s), e.clone())), - ); - } - - state_map -} - /// Helper struct for parsing the `level_sizes` argument. struct LevelSizes(Vec); @@ -89,6 +62,7 @@ impl FromStr for LevelSizes { } } +/// Contains configuration information for this run of the compressor pub struct Config { db_url: String, output_file: Option, @@ -100,6 +74,7 @@ pub struct Config { } impl Config { + /// Build up config from command line arguments pub fn parse_arguments() -> Config { let matches = App::new(crate_name!()) .version(crate_version!()) @@ -199,9 +174,22 @@ impl Config { } } -pub fn run(mut config: Config) { - // let mut config = Config::parse_arguments(); +/// Runs through the steps of the compression: +/// +/// - Fetches current state groups for a room and their predecessors +/// - Outputs #state groups and #lines in table they occupy +/// - Runs the compressor to produce a new predecessor mapping +/// - Outputs #lines in table that the new mapping would occupy +/// - Outputs info about how the compressor got on +/// - Checks that number of lines saved is greater than threshold +/// - Ensures new mapping doesn't affect actual state contents +/// - Produces SQL code to carry out changes and saves it to file +/// +/// # Arguments +/// +/// * `config: Config` - A Config struct that controlls the run +pub fn run(mut config: Config) { // First we need to get the current state groups println!("Fetching state from DB for room '{}'...", config.room_id); @@ -272,6 +260,17 @@ pub fn run(mut config: Config) { output_sql(&mut config, &state_group_map, &new_state_group_map); } +/// Produces SQL code to carry out changes and saves it to file +/// +/// # Arguments +/// +/// * `config` - A Config struct that contains information +/// about the run. It's mutable because it contains +/// the pointer to the output file (which needs to +/// be mutable for the file to be written to) +/// * `old_map` - The state group data originally in the database +/// * `new_map` - The state group data generated by the compressor to +/// replace replace the old contents fn output_sql( config: &mut Config, old_map: &BTreeMap, @@ -353,6 +352,20 @@ fn output_sql( pb.finish(); } +/// Compares two sets of state groups +/// +/// A state group entry contains a predecessor state group and a delta. +/// The complete contents of a certain state group can be calculated by +/// following this chain of predecessors back to some empty state and +/// combining all the deltas together. This is called "collapsing". +/// +/// This function confirms that two state groups mappings lead to the +/// exact same entries for each state group after collapsing them down. +/// +/// # Arguments +/// * `old_map` - The state group data currently in the database +/// * `new_map` - The state group data that the old_map is being compared +/// to fn check_that_maps_match( old_map: &BTreeMap, new_map: &BTreeMap, @@ -391,3 +404,30 @@ fn check_that_maps_match( println!("New state map matches old one"); } + +/// Gets the full state for a given group from the map (of deltas) +fn collapse_state_maps(map: &BTreeMap, state_group: i64) -> StateMap { + let mut entry = &map[&state_group]; + let mut state_map = StateMap::new(); + + let mut stack = vec![state_group]; + + while let Some(prev_state_group) = entry.prev_state_group { + stack.push(prev_state_group); + if !map.contains_key(&prev_state_group) { + panic!("Missing {}", prev_state_group); + } + entry = &map[&prev_state_group]; + } + + for sg in stack.iter().rev() { + state_map.extend( + map[&sg] + .state_map + .iter() + .map(|((t, s), e)| ((t, s), e.clone())), + ); + } + + state_map +}