Add option to commit changes to the database automatically (#53)

This commit is contained in:
Azrenbeth
2021-09-08 11:39:57 +01:00
committed by GitHub
parent 65861de06e
commit d908d13f8f
6 changed files with 601 additions and 57 deletions

View File

@@ -106,6 +106,11 @@ If this flag is set then output the node and edge information for the state_grou
directed graph built up from the predecessor state_group links. These can be looked
at in something like Gephi (https://gephi.org)
- -c
If this flag is set then the changes the compressor makes will be committed to the
database. This should be safe to use while synapse is running as it assumes by default
that the transactions flag is set
## Using as python library
The compressor can also be built into a python library as it uses PyO3. It can be

View File

@@ -1,8 +1,10 @@
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::Client;
use postgres::{fallible_iterator::FallibleIterator, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use state_map::StateMap;
use std::{borrow::Cow, collections::BTreeMap, fmt};
use string_cache::DefaultAtom as Atom;
use synapse_compress_state::StateGroupEntry;
@@ -105,3 +107,230 @@ impl<'a> fmt::Display for PGEscape<'a> {
write!(f, "{}{}{}", delim, self.0, delim)
}
}
/// Checks whether the state at each state group is the same as what the map thinks it should be
///
/// i.e. when synapse tries to work out the state for a given state group by looking at
/// the database. Will the state it gets be the same as what the map thinks it should be
pub fn database_collapsed_states_match_map(
state_group_map: &BTreeMap<i64, StateGroupEntry>,
) -> bool {
for sg in state_group_map.keys() {
let map_state = collapse_state_with_map(state_group_map, *sg);
let database_state = collapse_state_with_database(*sg);
if map_state != database_state {
println!("database state {} doesn't match", sg);
println!("expected {:?}", map_state);
println!("but found {:?}", database_state);
return false;
}
}
true
}
/// Gets the full state for a given group from the map (of deltas)
fn collapse_state_with_map(
map: &BTreeMap<i64, StateGroupEntry>,
state_group: i64,
) -> StateMap<Atom> {
let mut entry = &map[&state_group];
let mut state_map = StateMap::new();
let mut stack = vec![state_group];
while let Some(prev_state_group) = entry.prev_state_group {
stack.push(prev_state_group);
if !map.contains_key(&prev_state_group) {
panic!("Missing {}", prev_state_group);
}
entry = &map[&prev_state_group];
}
for sg in stack.iter().rev() {
state_map.extend(
map[sg]
.state_map
.iter()
.map(|((t, s), e)| ((t, s), e.clone())),
);
}
state_map
}
fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
// connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let mut client = Client::connect(DB_URL, connector).unwrap();
// Gets the delta for a specific state group
let query_deltas = r#"
SELECT m.id, type, state_key, s.event_id
FROM state_groups AS m
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
WHERE m.id = $1
"#;
// If there is no delta for that specific state group, then we still want to find
// the predecessor (so have split this into a different query)
let query_pred = r#"
SELECT prev_state_group
FROM state_group_edges
WHERE state_group = $1
"#;
let mut state_map: StateMap<Atom> = StateMap::new();
let mut next_group = Some(state_group);
while let Some(sg) = next_group {
// get predecessor from state_group_edges
let mut pred = client.query_raw(query_pred, &[sg]).unwrap();
// set next_group to predecessor
next_group = match pred.next().unwrap() {
Some(p) => p.get(0),
None => None,
};
// if there was a predecessor then assert that it is unique
if next_group.is_some() {
assert!(pred.next().unwrap().is_none());
}
drop(pred);
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
while let Some(row) = rows.next().unwrap() {
// Copy the single delta from the predecessor stored in this row
if let Some(etype) = row.get::<_, Option<String>>(1) {
let key = &row.get::<_, String>(2);
// only insert if not overriding existing entry
// this is because the newer delta is found FIRST
if state_map.get(&etype, key).is_none() {
state_map.insert(&etype, key, row.get::<_, String>(3).into());
}
}
}
}
state_map
}
/// Check whether predecessors and deltas stored in the database are the same as in the map
pub fn database_structure_matches_map(state_group_map: &BTreeMap<i64, StateGroupEntry>) -> bool {
// connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let mut client = Client::connect(DB_URL, connector).unwrap();
// Gets the delta for a specific state group
let query_deltas = r#"
SELECT m.id, type, state_key, s.event_id
FROM state_groups AS m
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
WHERE m.id = $1
"#;
// If there is no delta for that specific state group, then we still want to find
// the predecessor (so have split this into a different query)
let query_pred = r#"
SELECT prev_state_group
FROM state_group_edges
WHERE state_group = $1
"#;
for (sg, entry) in state_group_map {
// get predecessor from state_group_edges
let mut pred_iter = client.query_raw(query_pred, &[sg]).unwrap();
// read out the predecessor value from the database
let database_pred = match pred_iter.next().unwrap() {
Some(p) => p.get(0),
None => None,
};
// if there was a predecessor then assert that it is unique
if database_pred.is_some() {
assert!(pred_iter.next().unwrap().is_none());
}
// check if it matches map
if database_pred != entry.prev_state_group {
println!(
"ERROR: predecessor for {} was {:?} (expected {:?})",
sg, database_pred, entry.prev_state_group
);
return false;
}
// needed so that can create another query
drop(pred_iter);
// Now check that deltas are the same
let mut state_map: StateMap<Atom> = StateMap::new();
// Get delta from state_groups_state
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
while let Some(row) = rows.next().unwrap() {
// Copy the single delta from the predecessor stored in this row
if let Some(etype) = row.get::<_, Option<String>>(1) {
state_map.insert(
&etype,
&row.get::<_, String>(2),
row.get::<_, String>(3).into(),
);
}
}
// Check that the delta matches the map
if state_map != entry.state_map {
println!("ERROR: delta for {} didn't match", sg);
println!("Expected: {:?}", entry.state_map);
println!("Actual: {:?}", state_map);
return false;
}
}
true
}
#[test]
fn functions_are_self_consistent() {
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let mut prev = None;
// This starts with the following structure
//
// 0-1-2-3-4-5-6-7-8-9-10-11-12-13
//
// Each group i has state:
// ('node','is', i)
// ('group', j, 'seen') - for all j less than i
for i in 0i64..=13i64 {
let mut entry = StateGroupEntry {
in_range: true,
prev_state_group: prev,
state_map: StateMap::new(),
};
entry
.state_map
.insert("group", &i.to_string(), "seen".into());
entry.state_map.insert("node", "is", i.to_string().into());
initial.insert(i, entry);
prev = Some(i)
}
empty_database();
add_contents_to_database("room1", &initial);
assert!(database_collapsed_states_match_map(&initial));
assert!(database_structure_matches_map(&initial));
}

View File

@@ -34,3 +34,129 @@ pub fn line_with_state(start: i64, end: i64) -> BTreeMap<i64, StateGroupEntry> {
initial
}
/// Generates line segments in a chain of state groups each with state deltas
///
/// If called wiht start=0, end=13 this would build the following:
///
/// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
///
/// Where each group i has state:
/// ('node','is', i)
/// ('group', j, 'seen') - for all j less than i
pub fn line_segments_with_state(start: i64, end: i64) -> BTreeMap<i64, StateGroupEntry> {
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let mut prev = None;
for i in start..=end {
// if the state is a snapshot then set its predecessor to NONE
if (i - start) % 3 == 0 {
prev = None;
}
// create a blank entry for it
let mut entry = StateGroupEntry {
in_range: true,
prev_state_group: prev,
state_map: StateMap::new(),
};
// if it's a snapshot then add in all previous state
if prev.is_none() {
for j in start..i {
entry
.state_map
.insert("group", &j.to_string(), "seen".into());
}
}
// add in the new state for this state group
entry
.state_map
.insert("group", &i.to_string(), "seen".into());
entry.state_map.insert("node", "is", i.to_string().into());
// put it into the initial map
initial.insert(i, entry);
// set this group as the predecessor for the next
prev = Some(i)
}
initial
}
/// This generates the correct compressed structure with 3,3 levels
///
/// Note: only correct structure when no impossible predecessors
///
/// Structure generated:
///
/// 0 3\ 12
/// 1 4 6\ 13
/// 2 5 7 9
/// 8 10
/// 11
/// Where each group i has state:
/// ('node','is', i)
/// ('group', j, 'seen') - for all j less than i
pub fn compressed_3_3_from_0_to_13_with_state() -> BTreeMap<i64, StateGroupEntry> {
let expected_edges: BTreeMap<i64, i64> = vec![
(1, 0),
(2, 1),
(4, 3),
(5, 4),
(6, 3),
(7, 6),
(8, 7),
(9, 6),
(10, 9),
(11, 10),
(13, 12),
]
.into_iter()
.collect();
let mut expected: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
// Each group i has state:
// ('node','is', i)
// ('group', j, 'seen') - for all j less than i
for i in 0i64..=13i64 {
let prev = expected_edges.get(&i);
//change from Option<&i64> to Option<i64>
let prev = prev.copied();
// create a blank entry for it
let mut entry = StateGroupEntry {
in_range: true,
prev_state_group: prev,
state_map: StateMap::new(),
};
// Add in all state between predecessor and now (non inclusive)
if let Some(p) = prev {
for j in (p + 1)..i {
entry
.state_map
.insert("group", &j.to_string(), "seen".into());
}
} else {
for j in 0i64..i {
entry
.state_map
.insert("group", &j.to_string(), "seen".into());
}
}
// add in the new state for this state group
entry
.state_map
.insert("group", &i.to_string(), "seen".into());
entry.state_map.insert("node", "is", i.to_string().into());
// put it into the expected map
expected.insert(i, entry);
}
expected
}

View File

@@ -1,5 +1,10 @@
use compressor_integration_tests::{
add_contents_to_database, empty_database, map_builder::line_with_state, DB_URL,
add_contents_to_database, database_collapsed_states_match_map, database_structure_matches_map,
empty_database,
map_builder::{
compressed_3_3_from_0_to_13_with_state, line_segments_with_state, line_with_state,
},
DB_URL,
};
use serial_test::serial;
use synapse_compress_state::{run, Config};
@@ -36,6 +41,7 @@ fn run_succeeds_without_crashing() {
let level_sizes = "3,3".to_string();
let transactions = true;
let graphs = false;
let commit_changes = false;
let config = Config::new(
db_url.clone(),
@@ -48,8 +54,74 @@ fn run_succeeds_without_crashing() {
level_sizes,
transactions,
graphs,
commit_changes,
)
.unwrap();
run(config);
}
#[test]
#[serial(db)]
fn changes_commited_if_no_min_saved_rows() {
// This starts with the following structure
//
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
//
// Each group i has state:
// ('node','is', i)
// ('group', j, 'seen') - for all j less than i
let initial = line_segments_with_state(0, 13);
// Place this initial state into an empty database
empty_database();
add_contents_to_database("room1", &initial);
// set up the config options
let db_url = DB_URL.to_string();
let room_id = "room1".to_string();
let output_file = Some("./tests/tmp/changes_commited_if_no_min_saved_rows.sql".to_string());
let min_state_group = None;
let min_saved_rows = None;
let groups_to_compress = None;
let max_state_group = None;
let level_sizes = "3,3".to_string();
let transactions = true;
let graphs = false;
let commit_changes = true;
let config = Config::new(
db_url,
room_id,
output_file,
min_state_group,
groups_to_compress,
min_saved_rows,
max_state_group,
level_sizes,
transactions,
graphs,
commit_changes,
)
.unwrap();
// Run the compressor with those settings
run(config);
// This should have created the following structure in the database
// i.e. groups 6 and 9 should have changed from before
// N.B. this saves 11 rows
//
// 0 3\ 12
// 1 4 6\ 13
// 2 5 7 9
// 8 10
// 11
let expected = compressed_3_3_from_0_to_13_with_state();
// Check that the database still gives correct states for each group!
assert!(database_collapsed_states_match_map(&initial));
// Check that the structure of the database matches the expected structure
assert!(database_structure_matches_map(&expected))
}

View File

@@ -19,6 +19,8 @@ use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{borrow::Cow, collections::BTreeMap, fmt};
use crate::{generate_sql, Config};
use super::StateGroupEntry;
/// Fetch the entries in state_groups_state (and their prev groups) for a
@@ -339,3 +341,51 @@ fn test_pg_escape() {
assert_eq!(&s[0..1], "$");
assert_eq!(&s[start_pos - 1..start_pos], "$");
}
/// Note that currently ignores config.transactions and wraps every state
/// group in it's own transaction (i.e. as if config.transactions was true)
///
/// # Arguments
///
/// * `config` - A Config struct that contains information
/// about the run (e.g. room_id and database url)
/// * `old_map` - The state group data originally in the database
/// * `new_map` - The state group data generated by the compressor to
/// replace replace the old contents
pub fn send_changes_to_db(
config: &Config,
old_map: &BTreeMap<i64, StateGroupEntry>,
new_map: &BTreeMap<i64, StateGroupEntry>,
) {
// connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let mut client = Client::connect(&config.db_url, connector).unwrap();
println!("Writing changes...");
// setup the progress bar
let pb = ProgressBar::new(old_map.len() as u64);
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
for sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
// commit this change to the database
// N.B. this is a synchronous library so will wait until finished before continueing...
// if want to speed up compressor then this might be a good place to start!
let mut single_group_transaction = client.transaction().unwrap();
single_group_transaction
.batch_execute(&sql_transaction)
.unwrap();
single_group_transaction.commit().unwrap();
pb.inc(1);
}
pb.finish();
}

View File

@@ -107,6 +107,9 @@ pub struct Config {
// Whether or not to output before and after directed graphs (these can be
// visualised in somthing like Gephi)
graphs: bool,
// Whether or not to commit changes to the database automatically
// N.B. currently assumes transactions is true (to be on the safe side)
commit_changes: bool,
}
impl Config {
@@ -209,7 +212,14 @@ impl Config {
.help("Output before and after graphs")
.long_help(concat!("If this flag is set then output the node and edge information for",
" the state_group directed graph built up from the predecessor state_group links.",
" These can be looked at in something like Gephi (https://gephi.org)"))
" These can be looked at in something like Gephi (https://gephi.org)")),
).arg(
Arg::with_name("commit_changes")
.short("c")
.help("Commit changes to the database")
.long_help(concat!("If this flag is set then the changes the compressor makes will",
" be committed to the database. This should be safe to use while synapse is running",
" as it assumes by default that the transactions flag is set")),
).get_matches();
let db_url = matches
@@ -247,6 +257,8 @@ impl Config {
let graphs = matches.is_present("graphs");
let commit_changes = matches.is_present("commit_changes");
Config {
db_url: String::from(db_url),
output_file,
@@ -258,6 +270,7 @@ impl Config {
level_sizes,
transactions,
graphs,
commit_changes,
}
}
}
@@ -358,11 +371,95 @@ pub fn run(mut config: Config) {
output_sql(&mut config, &state_group_map, new_state_group_map);
// If commit_changes is set then commit the changes to the database
if config.commit_changes {
database::send_changes_to_db(&config, &state_group_map, new_state_group_map);
}
if config.graphs {
graphing::make_graphs(&state_group_map, new_state_group_map);
}
}
/// Produce SQL code to carry out changes to database.
///
/// It returns an iterator where each call to `next()` will
/// return the SQL to alter a single state group in the database
///
/// # Arguments
///
/// * `old_map` - An iterator through the state group data originally
/// in the database
/// * `new_map` - The state group data generated by the compressor to
/// replace replace the old contents
/// * `room_id` - The room_id that the compressor was working on
fn generate_sql<'a>(
old_map: &'a BTreeMap<i64, StateGroupEntry>,
new_map: &'a BTreeMap<i64, StateGroupEntry>,
room_id: &'a str,
) -> impl Iterator<Item = String> + 'a {
old_map.iter().map(move |(sg,old_entry)| {
let new_entry = &new_map[sg];
// Check if the new map has a different entry for this state group
// N.B. also checks if in_range fields agree
if old_entry != new_entry {
// the sql commands that will carry out these changes
let mut sql = String::new();
// remove the current edge
sql.push_str(&format!(
"DELETE FROM state_group_edges WHERE state_group = {};\n",
sg
));
// if the new entry has a predecessor then put that into state_group_edges
if let Some(prev_sg) = new_entry.prev_state_group {
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
}
// remove the current deltas for this state group
sql.push_str(&format!(
"DELETE FROM state_groups_state WHERE state_group = {};\n",
sg
));
if !new_entry.state_map.is_empty() {
// place all the deltas for the state group in the new map into state_groups_state
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
let mut first = true;
for ((t, s), e) in new_entry.state_map.iter() {
// Add a comma at the start if not the first row to be inserted
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
}
// write the row to be insterted of the form:
// (state_group, room_id, type, state_key, event_id)
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
}
sql.push_str(";\n");
}
sql
} else {
String::new()
}
})
}
/// Produces SQL code to carry out changes and saves it to file
///
/// # Arguments
@@ -393,61 +490,15 @@ fn output_sql(
pb.enable_steady_tick(100);
if let Some(output) = &mut config.output_file {
for (sg, old_entry) in old_map {
let new_entry = &new_map[sg];
// N.B. also checks if in_range fields agree
if old_entry != new_entry {
if config.transactions {
writeln!(output, "BEGIN;").unwrap();
}
writeln!(
output,
"DELETE FROM state_group_edges WHERE state_group = {};",
sg
)
.unwrap();
if let Some(prev_sg) = new_entry.prev_state_group {
writeln!(output, "INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});", sg, prev_sg).unwrap();
}
writeln!(
output,
"DELETE FROM state_groups_state WHERE state_group = {};",
sg
)
.unwrap();
if !new_entry.state_map.is_empty() {
writeln!(output, "INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES").unwrap();
let mut first = true;
for ((t, s), e) in new_entry.state_map.iter() {
if first {
write!(output, " ").unwrap();
first = false;
} else {
write!(output, " ,").unwrap();
}
writeln!(
output,
"({}, {}, {}, {}, {})",
sg,
PGEscape(&config.room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
)
.unwrap();
}
writeln!(output, ";").unwrap();
}
if config.transactions {
writeln!(output, "COMMIT;").unwrap();
}
writeln!(output).unwrap();
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
if config.transactions {
sql_transaction.insert_str(0, "BEGIN;\n");
sql_transaction.push_str("COMMIT;")
}
write!(output, "{}", sql_transaction)
.expect("Something went wrong while writing SQL to file");
pb.inc(1);
}
}
@@ -557,6 +608,7 @@ impl Config {
level_sizes: String,
transactions: bool,
graphs: bool,
commit_changes: bool,
) -> Result<Config, String> {
let mut output: Option<File> = None;
if let Some(file) = output_file {
@@ -583,6 +635,7 @@ impl Config {
level_sizes,
transactions,
graphs,
commit_changes,
})
}
}
@@ -605,7 +658,8 @@ impl Config {
// have this default to true as is much worse to not have it if you need it
// than to have it and not need it
transactions = true,
graphs = false
graphs = false,
commit_changes = false,
)]
fn run_compression(
db_url: String,
@@ -618,6 +672,7 @@ fn run_compression(
level_sizes: String,
transactions: bool,
graphs: bool,
commit_changes: bool,
) -> PyResult<()> {
let config = Config::new(
db_url,
@@ -630,6 +685,7 @@ fn run_compression(
level_sizes,
transactions,
graphs,
commit_changes,
);
match config {
Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)),
@@ -1071,6 +1127,7 @@ mod pyo3_tests {
let level_sizes = "100,50,25".to_string();
let transactions = false;
let graphs = false;
let commit_changes = false;
let config = Config::new(
db_url.clone(),
@@ -1083,6 +1140,7 @@ mod pyo3_tests {
level_sizes,
transactions,
graphs,
commit_changes,
)
.unwrap();
@@ -1099,6 +1157,7 @@ mod pyo3_tests {
);
assert_eq!(config.transactions, transactions);
assert_eq!(config.graphs, graphs);
assert_eq!(config.commit_changes, commit_changes);
}
#[test]
@@ -1114,6 +1173,7 @@ mod pyo3_tests {
let level_sizes = "128,64,32".to_string();
let transactions = true;
let graphs = true;
let commit_changes = true;
let config = Config::new(
db_url.clone(),
@@ -1126,6 +1186,7 @@ mod pyo3_tests {
level_sizes,
transactions,
graphs,
commit_changes,
)
.unwrap();
@@ -1142,5 +1203,6 @@ mod pyo3_tests {
);
assert_eq!(config.transactions, transactions);
assert_eq!(config.graphs, graphs);
assert_eq!(config.commit_changes, commit_changes);
}
}