diff --git a/Cargo.lock b/Cargo.lock index 963cd92..3cfc6fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,7 +60,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "env_logger", + "env_logger 0.9.0 (git+https://github.com/TilCreator/env_logger?branch=fix_pipe)", "jemallocator", "log", "log-panics", @@ -145,6 +145,8 @@ name = "compressor_integration_tests" version = "0.1.0" dependencies = [ "auto_compressor", + "env_logger 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log", "openssl", "postgres", "postgres-openssl", @@ -252,6 +254,19 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "env_logger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "env_logger" version = "0.9.0" diff --git a/auto_compressor/src/lib.rs b/auto_compressor/src/lib.rs index 9069430..4e28941 100644 --- a/auto_compressor/src/lib.rs +++ b/auto_compressor/src/lib.rs @@ -6,4 +6,5 @@ //! to the database and uses these to enable it to incrementally work //! on space reductions +pub mod manager; pub mod state_saving; diff --git a/auto_compressor/src/manager.rs b/auto_compressor/src/manager.rs new file mode 100644 index 0000000..e1796ac --- /dev/null +++ b/auto_compressor/src/manager.rs @@ -0,0 +1,112 @@ +// This module contains functions that carry out diffferent types +// of compression on the database. + +use crate::state_saving::{ + connect_to_database, read_room_compressor_state, write_room_compressor_state, +}; +use anyhow::{Context, Result}; +use log::{debug, warn}; +use synapse_compress_state::{continue_run, ChunkStats, Level}; + +/// Runs the compressor on a chunk of the room +/// +/// Returns `Some(chunk_stats)` if the compressor has progressed +/// and `None` if it had already got to the end of the room +/// +/// # Arguments +/// +/// * `db_url` - The URL of the postgres database that synapse is using. +/// e.g. "postgresql://user:password@domain.com/synapse" +/// +/// * `room_id` - The id of the room to run the compressor on. Note this +/// is the id as stored in the database and will look like +/// "!aasdfasdfafdsdsa:matrix.org" instead of the common +/// name +/// +/// * `chunk_size` - The number of state_groups to work on. All of the entries +/// from state_groups_state are requested from the database +/// for state groups that are worked on. Therefore small +/// chunk sizes may be needed on machines with low memory. +/// (Note: if the compressor fails to find space savings on the +/// chunk as a whole (which may well happen in rooms with lots +/// of backfill in) then the entire chunk is skipped.) +/// +/// * `default_levels` - If the compressor has never been run on this room before +/// then we need to provide the compressor with some information +/// on what sort of compression structure we want. The default that +/// the library suggests is `vec![Level::new(100), Level::new(50), Level::new(25)]` +pub fn run_compressor_on_room_chunk( + db_url: &str, + room_id: &str, + chunk_size: i64, + default_levels: &[Level], +) -> Result> { + // connect to the database + let mut client = + connect_to_database(db_url).with_context(|| format!("Failed to connect to {}", db_url))?; + + // Access the database to find out where the compressor last got up to + let retrieved_state = read_room_compressor_state(&mut client, room_id) + .with_context(|| format!("Failed to read compressor state for room {}", room_id,))?; + + // If the database didn't contain any information, then use the default state + let (start, level_info) = match retrieved_state { + Some((s, l)) => (Some(s), l), + None => (None, default_levels.to_vec()), + }; + + // run the compressor on this chunk + let option_chunk_stats = continue_run(start, chunk_size, db_url, room_id, &level_info); + + if option_chunk_stats.is_none() { + debug!("No work to do on this room..."); + return Ok(None); + } + + // Ok to unwrap because have checked that it's not None + let chunk_stats = option_chunk_stats.unwrap(); + + debug!("{:?}", chunk_stats); + + // Check to see whether the compressor sent its changes to the database + if !chunk_stats.commited { + if chunk_stats.new_num_rows - chunk_stats.original_num_rows != 0 { + warn!( + "The compressor tried to increase the number of rows in {} between {:?} and {}. Skipping...", + room_id, start, chunk_stats.last_compressed_group, + ); + } + + // Skip over the failed chunk and set the level info to the default (empty) state + write_room_compressor_state( + &mut client, + room_id, + default_levels, + chunk_stats.last_compressed_group, + ) + .with_context(|| { + format!( + "Failed to skip chunk in room {} between {:?} and {}", + room_id, start, chunk_stats.last_compressed_group + ) + })?; + + return Ok(Some(chunk_stats)); + } + + // Save where we got up to after this successful commit + write_room_compressor_state( + &mut client, + room_id, + &chunk_stats.new_level_info, + chunk_stats.last_compressed_group, + ) + .with_context(|| { + format!( + "Failed to save state after compressing chunk in room {} between {:?} and {}", + room_id, start, chunk_stats.last_compressed_group + ) + })?; + + Ok(Some(chunk_stats)) +} diff --git a/compressor_integration_tests/Cargo.toml b/compressor_integration_tests/Cargo.toml index d7098ea..89b30e1 100644 --- a/compressor_integration_tests/Cargo.toml +++ b/compressor_integration_tests/Cargo.toml @@ -14,6 +14,8 @@ postgres-openssl = "0.5.0" rand = "0.8.0" synapse_compress_state = { path = "../" } auto_compressor = { path = "../auto_compressor/" } +env_logger = "0.9.0" +log = "0.4.14" [dependencies.state-map] git = "https://github.com/matrix-org/rust-matrix-state-map" \ No newline at end of file diff --git a/compressor_integration_tests/src/lib.rs b/compressor_integration_tests/src/lib.rs index a9e4788..181ffef 100644 --- a/compressor_integration_tests/src/lib.rs +++ b/compressor_integration_tests/src/lib.rs @@ -1,9 +1,10 @@ +use log::LevelFilter; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use postgres::{fallible_iterator::FallibleIterator, Client}; use postgres_openssl::MakeTlsConnector; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use state_map::StateMap; -use std::{borrow::Cow, collections::BTreeMap, fmt}; +use std::{borrow::Cow, collections::BTreeMap, env, fmt}; use string_cache::DefaultAtom as Atom; use synapse_compress_state::StateGroupEntry; @@ -352,3 +353,27 @@ fn functions_are_self_consistent() { assert!(database_collapsed_states_match_map(&initial)); assert!(database_structure_matches_map(&initial)); } + +pub fn setup_logger() { + // setup the logger for the auto_compressor + // The default can be overwritten with COMPRESSOR_LOG_LEVEL + // see the README for more information <--- TODO + if env::var("COMPRESSOR_LOG_LEVEL").is_err() { + let mut log_builder = env_logger::builder(); + // set is_test(true) so that the output is hidden by cargo test (unless the test fails) + log_builder.is_test(true); + // default to printing the debug information for both packages being tested + // (Note that just setting the global level to debug will log every sql transaction) + log_builder.filter_module("synapse_compress_state", LevelFilter::Debug); + log_builder.filter_module("auto_compressor", LevelFilter::Debug); + // use try_init() incase the logger has been setup by some previous test + let _ = log_builder.try_init(); + } else { + // If COMPRESSOR_LOG_LEVEL was set then use that + let mut log_builder = env_logger::Builder::from_env("COMPRESSOR_LOG_LEVEL"); + // set is_test(true) so that the output is hidden by cargo test (unless the test fails) + log_builder.is_test(true); + // use try_init() in case the logger has been setup by some previous test + let _ = log_builder.try_init(); + } +} diff --git a/compressor_integration_tests/tests/auto_compressor_manager_tests.rs b/compressor_integration_tests/tests/auto_compressor_manager_tests.rs new file mode 100644 index 0000000..eb59b5c --- /dev/null +++ b/compressor_integration_tests/tests/auto_compressor_manager_tests.rs @@ -0,0 +1,65 @@ +use auto_compressor::{ + manager::run_compressor_on_room_chunk, + state_saving::{connect_to_database, create_tables_if_needed}, +}; +use compressor_integration_tests::{ + add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map, + database_structure_matches_map, empty_database, + map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state}, + setup_logger, DB_URL, +}; +use serial_test::serial; +use synapse_compress_state::Level; + +#[test] +#[serial(db)] +fn run_compressor_on_room_chunk_works() { + setup_logger(); + // This starts with the following structure + // + // 0-1-2 3-4-5 6-7-8 9-10-11 12-13 + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + let initial = line_segments_with_state(0, 13); + empty_database(); + add_contents_to_database("room1", &initial); + + let mut client = connect_to_database(DB_URL).unwrap(); + create_tables_if_needed(&mut client).unwrap(); + clear_compressor_state(); + + // compress in 3,3 level sizes by default + let default_levels = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; + + // compress the first 7 groups in the room + // structure should be the following afterwards + // (NOTE: only including compressed groups) + // + // 0 3\ + // 1 4 6 + // 2 5 + run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap(); + + // compress the next 7 groups + + run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap(); + + // This should have created the following structure in the database + // i.e. groups 6 and 9 should have changed from before + // N.B. this saves 11 rows + // + // 0 3\ 12 + // 1 4 6\ 13 + // 2 5 7 9 + // 8 10 + // 11 + let expected = compressed_3_3_from_0_to_13_with_state(); + + // Check that the database still gives correct states for each group! + assert!(database_collapsed_states_match_map(&initial)); + + // Check that the structure of the database matches the expected structure + assert!(database_structure_matches_map(&expected)); +} diff --git a/compressor_integration_tests/tests/compressor_continue_run_tests.rs b/compressor_integration_tests/tests/compressor_continue_run_tests.rs index a09ac13..5f44610 100644 --- a/compressor_integration_tests/tests/compressor_continue_run_tests.rs +++ b/compressor_integration_tests/tests/compressor_continue_run_tests.rs @@ -35,7 +35,7 @@ fn continue_run_called_twice_same_as_run() { // compress in 3,3 level sizes // since the compressor hasn't been run before they are empty - let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; + let level_info = vec![Level::new(3), Level::new(3)]; // Run the compressor with those settings let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();