diff --git a/Cargo.lock b/Cargo.lock index 6232c5b..963cd92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -11,6 +20,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "anyhow" +version = "1.0.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" + +[[package]] +name = "arc-swap" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5ab7d9e73059c86c36473f459b52adbd99c3554a4fec492caef460806006f00" + [[package]] name = "async-trait" version = "0.1.50" @@ -33,6 +54,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto_compressor" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "env_logger", + "jemallocator", + "log", + "log-panics", + "openssl", + "postgres", + "postgres-openssl", + "pyo3", + "pyo3-log", + "rand", + "serial_test", + "synapse_compress_state", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -103,6 +144,7 @@ dependencies = [ name = "compressor_integration_tests" version = "0.1.0" dependencies = [ + "auto_compressor", "openssl", "postgres", "postgres-openssl", @@ -210,6 +252,18 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "env_logger" +version = "0.9.0" +source = "git+https://github.com/TilCreator/env_logger?branch=fix_pipe#3d09e0d824d9301cf1c0d4a9f148f8cfeb216329" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -371,6 +425,12 @@ dependencies = [ "digest", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "indicatif" version = "0.16.2" @@ -466,6 +526,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "log-panics" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae0136257df209261daa18d6c16394757c63e032e27aafd8b07788b051082bef" +dependencies = [ + "log", +] + [[package]] name = "matches" version = "0.1.8" @@ -790,6 +859,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "pyo3-log" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d42fe53a9ba733c9dd4289f73389dafa35b6ad0e2e07fbb480e5a111b83749" +dependencies = [ + "arc-swap", + "log", + "pyo3", +] + [[package]] name = "pyo3-macros" version = "0.14.1" @@ -902,6 +982,8 @@ version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" dependencies = [ + "aho-corasick", + "memchr", "regex-syntax", ] @@ -1054,6 +1136,15 @@ dependencies = [ "string_cache", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "terminal_size" version = "0.1.17" @@ -1234,6 +1325,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 99f9e3a..493509d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["compressor_integration_tests"] +members = ["auto_compressor", "compressor_integration_tests"] [package] authors = ["Erik Johnston"] diff --git a/auto_compressor/Cargo.toml b/auto_compressor/Cargo.toml new file mode 100644 index 0000000..0f35a5a --- /dev/null +++ b/auto_compressor/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "auto_compressor" +authors = ["William Ashton"] +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = "2.33.0" +openssl = "0.10.32" +postgres = "0.19.0" +postgres-openssl = "0.5.0" +jemallocator = "0.3.2" +rand = "0.8.0" +serial_test = "0.5.1" +synapse_compress_state = { path = "../" } +env_logger = { version = "0.9.0", git = "https://github.com/TilCreator/env_logger", branch = "fix_pipe" } +log = "0.4.14" +log-panics = "2.0.0" +anyhow = "1.0.42" +pyo3-log = "0.4.0" + +# Needed for pyo3 support +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies.pyo3] +version = "0.14.1" +features = ["extension-module","abi3-py36"] diff --git a/auto_compressor/src/lib.rs b/auto_compressor/src/lib.rs new file mode 100644 index 0000000..9069430 --- /dev/null +++ b/auto_compressor/src/lib.rs @@ -0,0 +1,9 @@ +//! This is a tool that uses the synapse_compress_state library to +//! reduce the size of the synapse state_groups_state table in a postgres +//! database. +//! +//! It adds the tables state_compressor_state and state_compressor_progress +//! to the database and uses these to enable it to incrementally work +//! on space reductions + +pub mod state_saving; diff --git a/auto_compressor/src/state_saving.rs b/auto_compressor/src/state_saving.rs new file mode 100644 index 0000000..988b111 --- /dev/null +++ b/auto_compressor/src/state_saving.rs @@ -0,0 +1,251 @@ +// This module contains functions to communicate with the database + +use anyhow::{bail, Result}; +use synapse_compress_state::Level; + +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; +use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client}; +use postgres_openssl::MakeTlsConnector; + +/// Connects to the database and returns a postgres client +/// +/// # Arguments +/// +/// * `db_url` - The URL of the postgres database that synapse is using. +/// e.g. "postgresql://user:password@domain.com/synapse" +pub fn connect_to_database(db_url: &str) -> Result { + let mut builder = SslConnector::builder(SslMethod::tls())?; + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let client = Client::connect(db_url, connector)?; + Ok(client) +} + +/// Creates the state_compressor_state and state_compressor progress tables +/// +/// If these tables already exist then this function does nothing +/// +/// # Arguments +/// +/// * `client` - A postgres client used to send the requests to the database +pub fn create_tables_if_needed(client: &mut Client) -> Result<()> { + let create_state_table = r#" + CREATE TABLE IF NOT EXISTS state_compressor_state ( + room_id TEXT NOT NULL, + level_num INT NOT NULL, + max_size INT NOT NULL, + current_length INT NOT NULL, + current_head BIGINT, + UNIQUE (room_id, level_num) + )"#; + + client.execute(create_state_table, &[])?; + + let create_state_table_indexes = r#" + CREATE INDEX IF NOT EXISTS state_compressor_state_index ON state_compressor_state (room_id)"#; + + client.execute(create_state_table_indexes, &[])?; + + let create_progress_table = r#" + CREATE TABLE IF NOT EXISTS state_compressor_progress ( + room_id TEXT PRIMARY KEY, + last_compressed BIGINT NOT NULL + )"#; + + client.execute(create_progress_table, &[])?; + + Ok(()) +} + +/// Retrieve the level info so we can restart the compressor +/// +/// # Arguments +/// +/// * `client` - A postgres client used to send the requests to the database +/// * `room_id` - The room who's saved compressor state we want to load +pub fn read_room_compressor_state( + client: &mut Client, + room_id: &str, +) -> Result)>> { + // Query to retrieve all levels from state_compressor_state + // Ordered by ascending level_number + let sql = r#" + SELECT level_num, max_size, current_length, current_head, last_compressed + FROM state_compressor_state + LEFT JOIN state_compressor_progress USING (room_id) + WHERE room_id = $1 + ORDER BY level_num ASC + "#; + + // send the query to the database + let mut levels = client.query_raw(sql, &[room_id])?; + + // Needed to ensure that the rows are for unique consecutive levels + // starting from 1 (i.e of form [1,2,3] not [0,1,2] or [1,1,2,2,3]) + let mut prev_seen = 0; + + // The vector to store the level info from the database in + let mut level_info: Vec = Vec::new(); + + // Where the last compressor run stopped + let mut last_compressed = None; + // Used to only read last_compressed value once + let mut first_row = true; + + // Loop through all the rows retrieved by that query + while let Some(l) = levels.next()? { + // Read out the fields into variables + // + // Some of these are `usize` as they may be used to index vectors, but stored as Postgres + // type `INT` which is the same as`i32`. + // + // Since usize is unlikely to be ess than 32 bits wide, this conversion should be safe + let level_num: usize = l.get::<_, i32>("level_num") as usize; + let max_size: usize = l.get::<_, i32>("max_size") as usize; + let current_length: usize = l.get::<_, i32>("current_length") as usize; + let current_head: Option = l.get("current_head"); + + // Only read the last compressed column once since is the same for each row + if first_row { + last_compressed = l.get("last_compressed"); // might be NULL if corrupted + if last_compressed.is_none() { + bail!( + "No entry in state_compressor_progress for room {} but entries in state_compressor_state were found", + room_id + ) + } + first_row = false; + } + + // Check that there aren't multiple entries for the same level number + // in the database. (Should be impossible due to unique key constraint) + if prev_seen == level_num { + bail!( + "The level {} occurs twice in state_compressor_state for room {}", + level_num, + room_id, + ); + } + + // Check that there is no missing level in the database + // e.g. if the previous row retrieved was for level 1 and this + // row is for level 3 then since the SQL query orders the results + // in ascenting level numbers, there was no level 2 found! + if prev_seen != level_num - 1 { + bail!("Levels between {} and {} are missing", prev_seen, level_num,); + } + + // if the level is not empty, then it must have a head! + if current_head.is_none() && current_length != 0 { + bail!( + "Level {} has no head but current length is {} in room {}", + level_num, + current_length, + room_id, + ); + } + + // If the level has more groups in than the maximum then something is wrong! + if current_length > max_size { + bail!( + "Level {} has length {} but max size {} in room {}", + level_num, + current_length, + max_size, + room_id, + ); + } + + // Add this level to the level_info vector + level_info.push(Level::restore(max_size, current_length, current_head)); + // Mark the previous level_number seen as the current one + prev_seen = level_num; + } + + // If we didn't retrieve anything from the database then there is no saved state + // in the database! + if level_info.is_empty() { + return Ok(None); + } + + // Return the compressor state we retrieved + // last_compressed cannot be None at this point, so safe to unwrap + Ok(Some((last_compressed.unwrap(), level_info))) +} + +/// Save the level info so it can be loaded by the next run of the compressor +/// +/// # Arguments +/// +/// * `client` - A postgres client used to send the requests to the database +/// * `room_id` - The room who's saved compressor state we want to save +/// * `level_info` - The state that can be used to restore the compressor later +/// * `last_compressed` - The last state_group that was compressed. This is needed +/// so that the compressor knows where to start from next +pub fn write_room_compressor_state( + client: &mut Client, + room_id: &str, + level_info: &[Level], + last_compressed: i64, +) -> Result<()> { + // Wrap all the changes to the state for this room in a transaction + // This prevents accidentally having malformed compressor start info + let mut write_transaction = client.transaction()?; + + // Go through every level that the compressor is using + for (level_num, level) in level_info.iter().enumerate() { + // the 1st level is level 1 not level 0, but enumerate starts at 0 + // so need to add 1 to get correct number + let level_num = level_num + 1; + + // bring the level info out of the Level struct + let (max_size, current_len, current_head) = ( + level.get_max_length(), + level.get_current_length(), + level.get_head(), + ); + + // Update the database with this compressor state information + // + // Some of these are `usize` as they may be used to index vectors, but stored as Postgres + // type `INT` which is the same as`i32`. + // + // Since these values should always be small, this conversion should be safe. + let (level_num, max_size, current_len) = + (level_num as i32, max_size as i32, current_len as i32); + let params: Vec<&(dyn ToSql + Sync)> = + vec![&room_id, &level_num, &max_size, ¤t_len, ¤t_head]; + + write_transaction.execute( + r#" + INSERT INTO state_compressor_state + (room_id, level_num, max_size, current_length, current_head) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (room_id, level_num) + DO UPDATE SET + max_size = excluded.max_size, + current_length = excluded.current_length, + current_head= excluded.current_head; + "#, + ¶ms, + )?; + } + + // Update the database with this progress information + let params: Vec<&(dyn ToSql + Sync)> = vec![&room_id, &last_compressed]; + write_transaction.execute( + r#" + INSERT INTO state_compressor_progress (room_id, last_compressed) + VALUES ($1, $2) + ON CONFLICT (room_id) + DO UPDATE SET last_compressed = excluded.last_compressed; + "#, + ¶ms, + )?; + + // Commit the transaction (otherwise changes never happen) + write_transaction.commit()?; + + Ok(()) +} diff --git a/compressor_integration_tests/Cargo.toml b/compressor_integration_tests/Cargo.toml index c647043..d7098ea 100644 --- a/compressor_integration_tests/Cargo.toml +++ b/compressor_integration_tests/Cargo.toml @@ -13,6 +13,7 @@ postgres = "0.19.0" postgres-openssl = "0.5.0" rand = "0.8.0" synapse_compress_state = { path = "../" } +auto_compressor = { path = "../auto_compressor/" } [dependencies.state-map] git = "https://github.com/matrix-org/rust-matrix-state-map" \ No newline at end of file diff --git a/compressor_integration_tests/src/lib.rs b/compressor_integration_tests/src/lib.rs index 2d460bc..a9e4788 100644 --- a/compressor_integration_tests/src/lib.rs +++ b/compressor_integration_tests/src/lib.rs @@ -69,7 +69,7 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap = BTreeMap::new(); diff --git a/compressor_integration_tests/tests/auto_compressor_state_saving_tests.rs b/compressor_integration_tests/tests/auto_compressor_state_saving_tests.rs new file mode 100644 index 0000000..b5ae98c --- /dev/null +++ b/compressor_integration_tests/tests/auto_compressor_state_saving_tests.rs @@ -0,0 +1,28 @@ +use auto_compressor::state_saving::{ + connect_to_database, create_tables_if_needed, read_room_compressor_state, + write_room_compressor_state, +}; +use compressor_integration_tests::{clear_compressor_state, DB_URL}; +use serial_test::serial; +use synapse_compress_state::Level; + +#[test] +#[serial(db)] +fn write_then_read_state_gives_correct_results() { + let mut client = connect_to_database(DB_URL).unwrap(); + create_tables_if_needed(&mut client).unwrap(); + clear_compressor_state(); + + let room_id = "room1"; + let written_info: Vec = + vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))]; + let written_num = 53; + write_room_compressor_state(&mut client, room_id, &written_info, written_num).unwrap(); + + let (read_num, read_info) = read_room_compressor_state(&mut client, room_id) + .unwrap() + .unwrap(); + + assert_eq!(written_info, read_info); + assert_eq!(written_num, read_num); +}