From d908d13f8ffeb80a84326e5577a32dd7cde10ffe Mon Sep 17 00:00:00 2001 From: Azrenbeth <77782548+Azrenbeth@users.noreply.github.com> Date: Wed, 8 Sep 2021 11:39:57 +0100 Subject: [PATCH] Add option to commit changes to the database automatically (#53) --- README.md | 5 + compressor_integration_tests/src/lib.rs | 231 +++++++++++++++++- .../src/map_builder.rs | 126 ++++++++++ .../tests/compressor_config_tests.rs | 74 +++++- src/database.rs | 50 ++++ src/lib.rs | 172 ++++++++----- 6 files changed, 601 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 6f6520c..e1f5816 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,11 @@ If this flag is set then output the node and edge information for the state_grou directed graph built up from the predecessor state_group links. These can be looked at in something like Gephi (https://gephi.org) +- -c +If this flag is set then the changes the compressor makes will be committed to the +database. This should be safe to use while synapse is running as it assumes by default +that the transactions flag is set + ## Using as python library The compressor can also be built into a python library as it uses PyO3. It can be diff --git a/compressor_integration_tests/src/lib.rs b/compressor_integration_tests/src/lib.rs index f969967..2d460bc 100644 --- a/compressor_integration_tests/src/lib.rs +++ b/compressor_integration_tests/src/lib.rs @@ -1,8 +1,10 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; -use postgres::Client; +use postgres::{fallible_iterator::FallibleIterator, Client}; use postgres_openssl::MakeTlsConnector; use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use state_map::StateMap; use std::{borrow::Cow, collections::BTreeMap, fmt}; +use string_cache::DefaultAtom as Atom; use synapse_compress_state::StateGroupEntry; @@ -105,3 +107,230 @@ impl<'a> fmt::Display for PGEscape<'a> { write!(f, "{}{}{}", delim, self.0, delim) } } + +/// Checks whether the state at each state group is the same as what the map thinks it should be +/// +/// i.e. when synapse tries to work out the state for a given state group by looking at +/// the database. Will the state it gets be the same as what the map thinks it should be +pub fn database_collapsed_states_match_map( + state_group_map: &BTreeMap, +) -> bool { + for sg in state_group_map.keys() { + let map_state = collapse_state_with_map(state_group_map, *sg); + let database_state = collapse_state_with_database(*sg); + if map_state != database_state { + println!("database state {} doesn't match", sg); + println!("expected {:?}", map_state); + println!("but found {:?}", database_state); + return false; + } + } + true +} + +/// Gets the full state for a given group from the map (of deltas) +fn collapse_state_with_map( + map: &BTreeMap, + state_group: i64, +) -> StateMap { + let mut entry = &map[&state_group]; + let mut state_map = StateMap::new(); + + let mut stack = vec![state_group]; + + while let Some(prev_state_group) = entry.prev_state_group { + stack.push(prev_state_group); + if !map.contains_key(&prev_state_group) { + panic!("Missing {}", prev_state_group); + } + entry = &map[&prev_state_group]; + } + + for sg in stack.iter().rev() { + state_map.extend( + map[sg] + .state_map + .iter() + .map(|((t, s), e)| ((t, s), e.clone())), + ); + } + + state_map +} + +fn collapse_state_with_database(state_group: i64) -> StateMap { + // connect to the database + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let mut client = Client::connect(DB_URL, connector).unwrap(); + + // Gets the delta for a specific state group + let query_deltas = r#" + SELECT m.id, type, state_key, s.event_id + FROM state_groups AS m + LEFT JOIN state_groups_state AS s ON (m.id = s.state_group) + WHERE m.id = $1 + "#; + + // If there is no delta for that specific state group, then we still want to find + // the predecessor (so have split this into a different query) + let query_pred = r#" + SELECT prev_state_group + FROM state_group_edges + WHERE state_group = $1 + "#; + + let mut state_map: StateMap = StateMap::new(); + + let mut next_group = Some(state_group); + + while let Some(sg) = next_group { + // get predecessor from state_group_edges + let mut pred = client.query_raw(query_pred, &[sg]).unwrap(); + + // set next_group to predecessor + next_group = match pred.next().unwrap() { + Some(p) => p.get(0), + None => None, + }; + + // if there was a predecessor then assert that it is unique + if next_group.is_some() { + assert!(pred.next().unwrap().is_none()); + } + drop(pred); + + let mut rows = client.query_raw(query_deltas, &[sg]).unwrap(); + + while let Some(row) = rows.next().unwrap() { + // Copy the single delta from the predecessor stored in this row + if let Some(etype) = row.get::<_, Option>(1) { + let key = &row.get::<_, String>(2); + + // only insert if not overriding existing entry + // this is because the newer delta is found FIRST + if state_map.get(&etype, key).is_none() { + state_map.insert(&etype, key, row.get::<_, String>(3).into()); + } + } + } + } + + state_map +} + +/// Check whether predecessors and deltas stored in the database are the same as in the map +pub fn database_structure_matches_map(state_group_map: &BTreeMap) -> bool { + // connect to the database + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let mut client = Client::connect(DB_URL, connector).unwrap(); + + // Gets the delta for a specific state group + let query_deltas = r#" + SELECT m.id, type, state_key, s.event_id + FROM state_groups AS m + LEFT JOIN state_groups_state AS s ON (m.id = s.state_group) + WHERE m.id = $1 + "#; + + // If there is no delta for that specific state group, then we still want to find + // the predecessor (so have split this into a different query) + let query_pred = r#" + SELECT prev_state_group + FROM state_group_edges + WHERE state_group = $1 + "#; + + for (sg, entry) in state_group_map { + // get predecessor from state_group_edges + let mut pred_iter = client.query_raw(query_pred, &[sg]).unwrap(); + + // read out the predecessor value from the database + let database_pred = match pred_iter.next().unwrap() { + Some(p) => p.get(0), + None => None, + }; + + // if there was a predecessor then assert that it is unique + if database_pred.is_some() { + assert!(pred_iter.next().unwrap().is_none()); + } + + // check if it matches map + if database_pred != entry.prev_state_group { + println!( + "ERROR: predecessor for {} was {:?} (expected {:?})", + sg, database_pred, entry.prev_state_group + ); + return false; + } + // needed so that can create another query + drop(pred_iter); + + // Now check that deltas are the same + let mut state_map: StateMap = StateMap::new(); + + // Get delta from state_groups_state + let mut rows = client.query_raw(query_deltas, &[sg]).unwrap(); + + while let Some(row) = rows.next().unwrap() { + // Copy the single delta from the predecessor stored in this row + if let Some(etype) = row.get::<_, Option>(1) { + state_map.insert( + &etype, + &row.get::<_, String>(2), + row.get::<_, String>(3).into(), + ); + } + } + + // Check that the delta matches the map + if state_map != entry.state_map { + println!("ERROR: delta for {} didn't match", sg); + println!("Expected: {:?}", entry.state_map); + println!("Actual: {:?}", state_map); + return false; + } + } + true +} + +#[test] +fn functions_are_self_consistent() { + let mut initial: BTreeMap = BTreeMap::new(); + let mut prev = None; + + // This starts with the following structure + // + // 0-1-2-3-4-5-6-7-8-9-10-11-12-13 + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + for i in 0i64..=13i64 { + let mut entry = StateGroupEntry { + in_range: true, + prev_state_group: prev, + state_map: StateMap::new(), + }; + entry + .state_map + .insert("group", &i.to_string(), "seen".into()); + entry.state_map.insert("node", "is", i.to_string().into()); + + initial.insert(i, entry); + + prev = Some(i) + } + + empty_database(); + add_contents_to_database("room1", &initial); + + assert!(database_collapsed_states_match_map(&initial)); + assert!(database_structure_matches_map(&initial)); +} diff --git a/compressor_integration_tests/src/map_builder.rs b/compressor_integration_tests/src/map_builder.rs index 810e06b..9069e8b 100644 --- a/compressor_integration_tests/src/map_builder.rs +++ b/compressor_integration_tests/src/map_builder.rs @@ -34,3 +34,129 @@ pub fn line_with_state(start: i64, end: i64) -> BTreeMap { initial } + +/// Generates line segments in a chain of state groups each with state deltas +/// +/// If called wiht start=0, end=13 this would build the following: +/// +/// 0-1-2 3-4-5 6-7-8 9-10-11 12-13 +/// +/// Where each group i has state: +/// ('node','is', i) +/// ('group', j, 'seen') - for all j less than i +pub fn line_segments_with_state(start: i64, end: i64) -> BTreeMap { + let mut initial: BTreeMap = BTreeMap::new(); + let mut prev = None; + + for i in start..=end { + // if the state is a snapshot then set its predecessor to NONE + if (i - start) % 3 == 0 { + prev = None; + } + + // create a blank entry for it + let mut entry = StateGroupEntry { + in_range: true, + prev_state_group: prev, + state_map: StateMap::new(), + }; + + // if it's a snapshot then add in all previous state + if prev.is_none() { + for j in start..i { + entry + .state_map + .insert("group", &j.to_string(), "seen".into()); + } + } + + // add in the new state for this state group + entry + .state_map + .insert("group", &i.to_string(), "seen".into()); + entry.state_map.insert("node", "is", i.to_string().into()); + + // put it into the initial map + initial.insert(i, entry); + + // set this group as the predecessor for the next + prev = Some(i) + } + initial +} + +/// This generates the correct compressed structure with 3,3 levels +/// +/// Note: only correct structure when no impossible predecessors +/// +/// Structure generated: +/// +/// 0 3\ 12 +/// 1 4 6\ 13 +/// 2 5 7 9 +/// 8 10 +/// 11 +/// Where each group i has state: +/// ('node','is', i) +/// ('group', j, 'seen') - for all j less than i +pub fn compressed_3_3_from_0_to_13_with_state() -> BTreeMap { + let expected_edges: BTreeMap = vec![ + (1, 0), + (2, 1), + (4, 3), + (5, 4), + (6, 3), + (7, 6), + (8, 7), + (9, 6), + (10, 9), + (11, 10), + (13, 12), + ] + .into_iter() + .collect(); + + let mut expected: BTreeMap = BTreeMap::new(); + + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + for i in 0i64..=13i64 { + let prev = expected_edges.get(&i); + + //change from Option<&i64> to Option + let prev = prev.copied(); + + // create a blank entry for it + let mut entry = StateGroupEntry { + in_range: true, + prev_state_group: prev, + state_map: StateMap::new(), + }; + + // Add in all state between predecessor and now (non inclusive) + if let Some(p) = prev { + for j in (p + 1)..i { + entry + .state_map + .insert("group", &j.to_string(), "seen".into()); + } + } else { + for j in 0i64..i { + entry + .state_map + .insert("group", &j.to_string(), "seen".into()); + } + } + + // add in the new state for this state group + entry + .state_map + .insert("group", &i.to_string(), "seen".into()); + entry.state_map.insert("node", "is", i.to_string().into()); + + // put it into the expected map + expected.insert(i, entry); + } + expected +} diff --git a/compressor_integration_tests/tests/compressor_config_tests.rs b/compressor_integration_tests/tests/compressor_config_tests.rs index 866c362..b99eccd 100644 --- a/compressor_integration_tests/tests/compressor_config_tests.rs +++ b/compressor_integration_tests/tests/compressor_config_tests.rs @@ -1,5 +1,10 @@ use compressor_integration_tests::{ - add_contents_to_database, empty_database, map_builder::line_with_state, DB_URL, + add_contents_to_database, database_collapsed_states_match_map, database_structure_matches_map, + empty_database, + map_builder::{ + compressed_3_3_from_0_to_13_with_state, line_segments_with_state, line_with_state, + }, + DB_URL, }; use serial_test::serial; use synapse_compress_state::{run, Config}; @@ -36,6 +41,7 @@ fn run_succeeds_without_crashing() { let level_sizes = "3,3".to_string(); let transactions = true; let graphs = false; + let commit_changes = false; let config = Config::new( db_url.clone(), @@ -48,8 +54,74 @@ fn run_succeeds_without_crashing() { level_sizes, transactions, graphs, + commit_changes, ) .unwrap(); run(config); } + +#[test] +#[serial(db)] +fn changes_commited_if_no_min_saved_rows() { + // This starts with the following structure + // + // 0-1-2 3-4-5 6-7-8 9-10-11 12-13 + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + let initial = line_segments_with_state(0, 13); + + // Place this initial state into an empty database + empty_database(); + add_contents_to_database("room1", &initial); + + // set up the config options + let db_url = DB_URL.to_string(); + let room_id = "room1".to_string(); + let output_file = Some("./tests/tmp/changes_commited_if_no_min_saved_rows.sql".to_string()); + let min_state_group = None; + let min_saved_rows = None; + let groups_to_compress = None; + let max_state_group = None; + let level_sizes = "3,3".to_string(); + let transactions = true; + let graphs = false; + let commit_changes = true; + + let config = Config::new( + db_url, + room_id, + output_file, + min_state_group, + groups_to_compress, + min_saved_rows, + max_state_group, + level_sizes, + transactions, + graphs, + commit_changes, + ) + .unwrap(); + + // Run the compressor with those settings + run(config); + + // This should have created the following structure in the database + // i.e. groups 6 and 9 should have changed from before + // N.B. this saves 11 rows + // + // 0 3\ 12 + // 1 4 6\ 13 + // 2 5 7 9 + // 8 10 + // 11 + let expected = compressed_3_3_from_0_to_13_with_state(); + + // Check that the database still gives correct states for each group! + assert!(database_collapsed_states_match_map(&initial)); + + // Check that the structure of the database matches the expected structure + assert!(database_structure_matches_map(&expected)) +} diff --git a/src/database.rs b/src/database.rs index 2813c5b..6ecd1e3 100644 --- a/src/database.rs +++ b/src/database.rs @@ -19,6 +19,8 @@ use postgres_openssl::MakeTlsConnector; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::{borrow::Cow, collections::BTreeMap, fmt}; +use crate::{generate_sql, Config}; + use super::StateGroupEntry; /// Fetch the entries in state_groups_state (and their prev groups) for a @@ -339,3 +341,51 @@ fn test_pg_escape() { assert_eq!(&s[0..1], "$"); assert_eq!(&s[start_pos - 1..start_pos], "$"); } + +/// Note that currently ignores config.transactions and wraps every state +/// group in it's own transaction (i.e. as if config.transactions was true) +/// +/// # Arguments +/// +/// * `config` - A Config struct that contains information +/// about the run (e.g. room_id and database url) +/// * `old_map` - The state group data originally in the database +/// * `new_map` - The state group data generated by the compressor to +/// replace replace the old contents +pub fn send_changes_to_db( + config: &Config, + old_map: &BTreeMap, + new_map: &BTreeMap, +) { + // connect to the database + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let mut client = Client::connect(&config.db_url, connector).unwrap(); + + println!("Writing changes..."); + + // setup the progress bar + let pb = ProgressBar::new(old_map.len() as u64); + pb.set_style( + ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"), + ); + pb.set_message("state groups"); + pb.enable_steady_tick(100); + + for sql_transaction in generate_sql(old_map, new_map, &config.room_id) { + // commit this change to the database + // N.B. this is a synchronous library so will wait until finished before continueing... + // if want to speed up compressor then this might be a good place to start! + let mut single_group_transaction = client.transaction().unwrap(); + single_group_transaction + .batch_execute(&sql_transaction) + .unwrap(); + single_group_transaction.commit().unwrap(); + + pb.inc(1); + } + + pb.finish(); +} diff --git a/src/lib.rs b/src/lib.rs index a310ddd..334e024 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,6 +107,9 @@ pub struct Config { // Whether or not to output before and after directed graphs (these can be // visualised in somthing like Gephi) graphs: bool, + // Whether or not to commit changes to the database automatically + // N.B. currently assumes transactions is true (to be on the safe side) + commit_changes: bool, } impl Config { @@ -209,7 +212,14 @@ impl Config { .help("Output before and after graphs") .long_help(concat!("If this flag is set then output the node and edge information for", " the state_group directed graph built up from the predecessor state_group links.", - " These can be looked at in something like Gephi (https://gephi.org)")) + " These can be looked at in something like Gephi (https://gephi.org)")), + ).arg( + Arg::with_name("commit_changes") + .short("c") + .help("Commit changes to the database") + .long_help(concat!("If this flag is set then the changes the compressor makes will", + " be committed to the database. This should be safe to use while synapse is running", + " as it assumes by default that the transactions flag is set")), ).get_matches(); let db_url = matches @@ -247,6 +257,8 @@ impl Config { let graphs = matches.is_present("graphs"); + let commit_changes = matches.is_present("commit_changes"); + Config { db_url: String::from(db_url), output_file, @@ -258,6 +270,7 @@ impl Config { level_sizes, transactions, graphs, + commit_changes, } } } @@ -358,11 +371,95 @@ pub fn run(mut config: Config) { output_sql(&mut config, &state_group_map, new_state_group_map); + // If commit_changes is set then commit the changes to the database + if config.commit_changes { + database::send_changes_to_db(&config, &state_group_map, new_state_group_map); + } + if config.graphs { graphing::make_graphs(&state_group_map, new_state_group_map); } } +/// Produce SQL code to carry out changes to database. +/// +/// It returns an iterator where each call to `next()` will +/// return the SQL to alter a single state group in the database +/// +/// # Arguments +/// +/// * `old_map` - An iterator through the state group data originally +/// in the database +/// * `new_map` - The state group data generated by the compressor to +/// replace replace the old contents +/// * `room_id` - The room_id that the compressor was working on +fn generate_sql<'a>( + old_map: &'a BTreeMap, + new_map: &'a BTreeMap, + room_id: &'a str, +) -> impl Iterator + 'a { + old_map.iter().map(move |(sg,old_entry)| { + + let new_entry = &new_map[sg]; + + // Check if the new map has a different entry for this state group + // N.B. also checks if in_range fields agree + if old_entry != new_entry { + // the sql commands that will carry out these changes + let mut sql = String::new(); + + // remove the current edge + sql.push_str(&format!( + "DELETE FROM state_group_edges WHERE state_group = {};\n", + sg + )); + + // if the new entry has a predecessor then put that into state_group_edges + if let Some(prev_sg) = new_entry.prev_state_group { + sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg)); + } + + // remove the current deltas for this state group + sql.push_str(&format!( + "DELETE FROM state_groups_state WHERE state_group = {};\n", + sg + )); + + if !new_entry.state_map.is_empty() { + // place all the deltas for the state group in the new map into state_groups_state + sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n"); + + let mut first = true; + for ((t, s), e) in new_entry.state_map.iter() { + // Add a comma at the start if not the first row to be inserted + if first { + sql.push_str(" "); + first = false; + } else { + sql.push_str(" ,"); + } + + // write the row to be insterted of the form: + // (state_group, room_id, type, state_key, event_id) + sql.push_str(&format!( + "({}, {}, {}, {}, {})", + sg, + PGEscape(room_id), + PGEscape(t), + PGEscape(s), + PGEscape(e) + )); + } + sql.push_str(";\n"); + } + + sql + } else { + String::new() + } + }) +} + /// Produces SQL code to carry out changes and saves it to file /// /// # Arguments @@ -393,61 +490,15 @@ fn output_sql( pb.enable_steady_tick(100); if let Some(output) = &mut config.output_file { - for (sg, old_entry) in old_map { - let new_entry = &new_map[sg]; - - // N.B. also checks if in_range fields agree - if old_entry != new_entry { - if config.transactions { - writeln!(output, "BEGIN;").unwrap(); - } - writeln!( - output, - "DELETE FROM state_group_edges WHERE state_group = {};", - sg - ) - .unwrap(); - - if let Some(prev_sg) = new_entry.prev_state_group { - writeln!(output, "INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});", sg, prev_sg).unwrap(); - } - - writeln!( - output, - "DELETE FROM state_groups_state WHERE state_group = {};", - sg - ) - .unwrap(); - if !new_entry.state_map.is_empty() { - writeln!(output, "INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES").unwrap(); - let mut first = true; - for ((t, s), e) in new_entry.state_map.iter() { - if first { - write!(output, " ").unwrap(); - first = false; - } else { - write!(output, " ,").unwrap(); - } - writeln!( - output, - "({}, {}, {}, {}, {})", - sg, - PGEscape(&config.room_id), - PGEscape(t), - PGEscape(s), - PGEscape(e) - ) - .unwrap(); - } - writeln!(output, ";").unwrap(); - } - - if config.transactions { - writeln!(output, "COMMIT;").unwrap(); - } - writeln!(output).unwrap(); + for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) { + if config.transactions { + sql_transaction.insert_str(0, "BEGIN;\n"); + sql_transaction.push_str("COMMIT;") } + write!(output, "{}", sql_transaction) + .expect("Something went wrong while writing SQL to file"); + pb.inc(1); } } @@ -557,6 +608,7 @@ impl Config { level_sizes: String, transactions: bool, graphs: bool, + commit_changes: bool, ) -> Result { let mut output: Option = None; if let Some(file) = output_file { @@ -583,6 +635,7 @@ impl Config { level_sizes, transactions, graphs, + commit_changes, }) } } @@ -605,7 +658,8 @@ impl Config { // have this default to true as is much worse to not have it if you need it // than to have it and not need it transactions = true, - graphs = false + graphs = false, + commit_changes = false, )] fn run_compression( db_url: String, @@ -618,6 +672,7 @@ fn run_compression( level_sizes: String, transactions: bool, graphs: bool, + commit_changes: bool, ) -> PyResult<()> { let config = Config::new( db_url, @@ -630,6 +685,7 @@ fn run_compression( level_sizes, transactions, graphs, + commit_changes, ); match config { Err(e) => Err(PyErr::new::(e)), @@ -1071,6 +1127,7 @@ mod pyo3_tests { let level_sizes = "100,50,25".to_string(); let transactions = false; let graphs = false; + let commit_changes = false; let config = Config::new( db_url.clone(), @@ -1083,6 +1140,7 @@ mod pyo3_tests { level_sizes, transactions, graphs, + commit_changes, ) .unwrap(); @@ -1099,6 +1157,7 @@ mod pyo3_tests { ); assert_eq!(config.transactions, transactions); assert_eq!(config.graphs, graphs); + assert_eq!(config.commit_changes, commit_changes); } #[test] @@ -1114,6 +1173,7 @@ mod pyo3_tests { let level_sizes = "128,64,32".to_string(); let transactions = true; let graphs = true; + let commit_changes = true; let config = Config::new( db_url.clone(), @@ -1126,6 +1186,7 @@ mod pyo3_tests { level_sizes, transactions, graphs, + commit_changes, ) .unwrap(); @@ -1142,5 +1203,6 @@ mod pyo3_tests { ); assert_eq!(config.transactions, transactions); assert_eq!(config.graphs, graphs); + assert_eq!(config.commit_changes, commit_changes); } }