Add method that compresses next chunk of room (#64)
This commit is contained in:
17
Cargo.lock
generated
17
Cargo.lock
generated
@@ -60,7 +60,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"clap",
|
"clap",
|
||||||
"env_logger",
|
"env_logger 0.9.0 (git+https://github.com/TilCreator/env_logger?branch=fix_pipe)",
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
"log",
|
"log",
|
||||||
"log-panics",
|
"log-panics",
|
||||||
@@ -145,6 +145,8 @@ name = "compressor_integration_tests"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"auto_compressor",
|
"auto_compressor",
|
||||||
|
"env_logger 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"log",
|
||||||
"openssl",
|
"openssl",
|
||||||
"postgres",
|
"postgres",
|
||||||
"postgres-openssl",
|
"postgres-openssl",
|
||||||
@@ -252,6 +254,19 @@ version = "0.3.6"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
|
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "env_logger"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
|
||||||
|
dependencies = [
|
||||||
|
"atty",
|
||||||
|
"humantime",
|
||||||
|
"log",
|
||||||
|
"regex",
|
||||||
|
"termcolor",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "env_logger"
|
name = "env_logger"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
|||||||
@@ -6,4 +6,5 @@
|
|||||||
//! to the database and uses these to enable it to incrementally work
|
//! to the database and uses these to enable it to incrementally work
|
||||||
//! on space reductions
|
//! on space reductions
|
||||||
|
|
||||||
|
pub mod manager;
|
||||||
pub mod state_saving;
|
pub mod state_saving;
|
||||||
|
|||||||
112
auto_compressor/src/manager.rs
Normal file
112
auto_compressor/src/manager.rs
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
// This module contains functions that carry out diffferent types
|
||||||
|
// of compression on the database.
|
||||||
|
|
||||||
|
use crate::state_saving::{
|
||||||
|
connect_to_database, read_room_compressor_state, write_room_compressor_state,
|
||||||
|
};
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use log::{debug, warn};
|
||||||
|
use synapse_compress_state::{continue_run, ChunkStats, Level};
|
||||||
|
|
||||||
|
/// Runs the compressor on a chunk of the room
|
||||||
|
///
|
||||||
|
/// Returns `Some(chunk_stats)` if the compressor has progressed
|
||||||
|
/// and `None` if it had already got to the end of the room
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `db_url` - The URL of the postgres database that synapse is using.
|
||||||
|
/// e.g. "postgresql://user:password@domain.com/synapse"
|
||||||
|
///
|
||||||
|
/// * `room_id` - The id of the room to run the compressor on. Note this
|
||||||
|
/// is the id as stored in the database and will look like
|
||||||
|
/// "!aasdfasdfafdsdsa:matrix.org" instead of the common
|
||||||
|
/// name
|
||||||
|
///
|
||||||
|
/// * `chunk_size` - The number of state_groups to work on. All of the entries
|
||||||
|
/// from state_groups_state are requested from the database
|
||||||
|
/// for state groups that are worked on. Therefore small
|
||||||
|
/// chunk sizes may be needed on machines with low memory.
|
||||||
|
/// (Note: if the compressor fails to find space savings on the
|
||||||
|
/// chunk as a whole (which may well happen in rooms with lots
|
||||||
|
/// of backfill in) then the entire chunk is skipped.)
|
||||||
|
///
|
||||||
|
/// * `default_levels` - If the compressor has never been run on this room before
|
||||||
|
/// then we need to provide the compressor with some information
|
||||||
|
/// on what sort of compression structure we want. The default that
|
||||||
|
/// the library suggests is `vec![Level::new(100), Level::new(50), Level::new(25)]`
|
||||||
|
pub fn run_compressor_on_room_chunk(
|
||||||
|
db_url: &str,
|
||||||
|
room_id: &str,
|
||||||
|
chunk_size: i64,
|
||||||
|
default_levels: &[Level],
|
||||||
|
) -> Result<Option<ChunkStats>> {
|
||||||
|
// connect to the database
|
||||||
|
let mut client =
|
||||||
|
connect_to_database(db_url).with_context(|| format!("Failed to connect to {}", db_url))?;
|
||||||
|
|
||||||
|
// Access the database to find out where the compressor last got up to
|
||||||
|
let retrieved_state = read_room_compressor_state(&mut client, room_id)
|
||||||
|
.with_context(|| format!("Failed to read compressor state for room {}", room_id,))?;
|
||||||
|
|
||||||
|
// If the database didn't contain any information, then use the default state
|
||||||
|
let (start, level_info) = match retrieved_state {
|
||||||
|
Some((s, l)) => (Some(s), l),
|
||||||
|
None => (None, default_levels.to_vec()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// run the compressor on this chunk
|
||||||
|
let option_chunk_stats = continue_run(start, chunk_size, db_url, room_id, &level_info);
|
||||||
|
|
||||||
|
if option_chunk_stats.is_none() {
|
||||||
|
debug!("No work to do on this room...");
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ok to unwrap because have checked that it's not None
|
||||||
|
let chunk_stats = option_chunk_stats.unwrap();
|
||||||
|
|
||||||
|
debug!("{:?}", chunk_stats);
|
||||||
|
|
||||||
|
// Check to see whether the compressor sent its changes to the database
|
||||||
|
if !chunk_stats.commited {
|
||||||
|
if chunk_stats.new_num_rows - chunk_stats.original_num_rows != 0 {
|
||||||
|
warn!(
|
||||||
|
"The compressor tried to increase the number of rows in {} between {:?} and {}. Skipping...",
|
||||||
|
room_id, start, chunk_stats.last_compressed_group,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip over the failed chunk and set the level info to the default (empty) state
|
||||||
|
write_room_compressor_state(
|
||||||
|
&mut client,
|
||||||
|
room_id,
|
||||||
|
default_levels,
|
||||||
|
chunk_stats.last_compressed_group,
|
||||||
|
)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to skip chunk in room {} between {:?} and {}",
|
||||||
|
room_id, start, chunk_stats.last_compressed_group
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
return Ok(Some(chunk_stats));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save where we got up to after this successful commit
|
||||||
|
write_room_compressor_state(
|
||||||
|
&mut client,
|
||||||
|
room_id,
|
||||||
|
&chunk_stats.new_level_info,
|
||||||
|
chunk_stats.last_compressed_group,
|
||||||
|
)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to save state after compressing chunk in room {} between {:?} and {}",
|
||||||
|
room_id, start, chunk_stats.last_compressed_group
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(Some(chunk_stats))
|
||||||
|
}
|
||||||
@@ -14,6 +14,8 @@ postgres-openssl = "0.5.0"
|
|||||||
rand = "0.8.0"
|
rand = "0.8.0"
|
||||||
synapse_compress_state = { path = "../" }
|
synapse_compress_state = { path = "../" }
|
||||||
auto_compressor = { path = "../auto_compressor/" }
|
auto_compressor = { path = "../auto_compressor/" }
|
||||||
|
env_logger = "0.9.0"
|
||||||
|
log = "0.4.14"
|
||||||
|
|
||||||
[dependencies.state-map]
|
[dependencies.state-map]
|
||||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||||
@@ -1,9 +1,10 @@
|
|||||||
|
use log::LevelFilter;
|
||||||
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
||||||
use postgres::{fallible_iterator::FallibleIterator, Client};
|
use postgres::{fallible_iterator::FallibleIterator, Client};
|
||||||
use postgres_openssl::MakeTlsConnector;
|
use postgres_openssl::MakeTlsConnector;
|
||||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||||
use state_map::StateMap;
|
use state_map::StateMap;
|
||||||
use std::{borrow::Cow, collections::BTreeMap, fmt};
|
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
use synapse_compress_state::StateGroupEntry;
|
use synapse_compress_state::StateGroupEntry;
|
||||||
@@ -352,3 +353,27 @@ fn functions_are_self_consistent() {
|
|||||||
assert!(database_collapsed_states_match_map(&initial));
|
assert!(database_collapsed_states_match_map(&initial));
|
||||||
assert!(database_structure_matches_map(&initial));
|
assert!(database_structure_matches_map(&initial));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn setup_logger() {
|
||||||
|
// setup the logger for the auto_compressor
|
||||||
|
// The default can be overwritten with COMPRESSOR_LOG_LEVEL
|
||||||
|
// see the README for more information <--- TODO
|
||||||
|
if env::var("COMPRESSOR_LOG_LEVEL").is_err() {
|
||||||
|
let mut log_builder = env_logger::builder();
|
||||||
|
// set is_test(true) so that the output is hidden by cargo test (unless the test fails)
|
||||||
|
log_builder.is_test(true);
|
||||||
|
// default to printing the debug information for both packages being tested
|
||||||
|
// (Note that just setting the global level to debug will log every sql transaction)
|
||||||
|
log_builder.filter_module("synapse_compress_state", LevelFilter::Debug);
|
||||||
|
log_builder.filter_module("auto_compressor", LevelFilter::Debug);
|
||||||
|
// use try_init() incase the logger has been setup by some previous test
|
||||||
|
let _ = log_builder.try_init();
|
||||||
|
} else {
|
||||||
|
// If COMPRESSOR_LOG_LEVEL was set then use that
|
||||||
|
let mut log_builder = env_logger::Builder::from_env("COMPRESSOR_LOG_LEVEL");
|
||||||
|
// set is_test(true) so that the output is hidden by cargo test (unless the test fails)
|
||||||
|
log_builder.is_test(true);
|
||||||
|
// use try_init() in case the logger has been setup by some previous test
|
||||||
|
let _ = log_builder.try_init();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,65 @@
|
|||||||
|
use auto_compressor::{
|
||||||
|
manager::run_compressor_on_room_chunk,
|
||||||
|
state_saving::{connect_to_database, create_tables_if_needed},
|
||||||
|
};
|
||||||
|
use compressor_integration_tests::{
|
||||||
|
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
|
||||||
|
database_structure_matches_map, empty_database,
|
||||||
|
map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state},
|
||||||
|
setup_logger, DB_URL,
|
||||||
|
};
|
||||||
|
use serial_test::serial;
|
||||||
|
use synapse_compress_state::Level;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial(db)]
|
||||||
|
fn run_compressor_on_room_chunk_works() {
|
||||||
|
setup_logger();
|
||||||
|
// This starts with the following structure
|
||||||
|
//
|
||||||
|
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
|
||||||
|
//
|
||||||
|
// Each group i has state:
|
||||||
|
// ('node','is', i)
|
||||||
|
// ('group', j, 'seen') - for all j less than i
|
||||||
|
let initial = line_segments_with_state(0, 13);
|
||||||
|
empty_database();
|
||||||
|
add_contents_to_database("room1", &initial);
|
||||||
|
|
||||||
|
let mut client = connect_to_database(DB_URL).unwrap();
|
||||||
|
create_tables_if_needed(&mut client).unwrap();
|
||||||
|
clear_compressor_state();
|
||||||
|
|
||||||
|
// compress in 3,3 level sizes by default
|
||||||
|
let default_levels = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
|
||||||
|
|
||||||
|
// compress the first 7 groups in the room
|
||||||
|
// structure should be the following afterwards
|
||||||
|
// (NOTE: only including compressed groups)
|
||||||
|
//
|
||||||
|
// 0 3\
|
||||||
|
// 1 4 6
|
||||||
|
// 2 5
|
||||||
|
run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap();
|
||||||
|
|
||||||
|
// compress the next 7 groups
|
||||||
|
|
||||||
|
run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap();
|
||||||
|
|
||||||
|
// This should have created the following structure in the database
|
||||||
|
// i.e. groups 6 and 9 should have changed from before
|
||||||
|
// N.B. this saves 11 rows
|
||||||
|
//
|
||||||
|
// 0 3\ 12
|
||||||
|
// 1 4 6\ 13
|
||||||
|
// 2 5 7 9
|
||||||
|
// 8 10
|
||||||
|
// 11
|
||||||
|
let expected = compressed_3_3_from_0_to_13_with_state();
|
||||||
|
|
||||||
|
// Check that the database still gives correct states for each group!
|
||||||
|
assert!(database_collapsed_states_match_map(&initial));
|
||||||
|
|
||||||
|
// Check that the structure of the database matches the expected structure
|
||||||
|
assert!(database_structure_matches_map(&expected));
|
||||||
|
}
|
||||||
@@ -35,7 +35,7 @@ fn continue_run_called_twice_same_as_run() {
|
|||||||
|
|
||||||
// compress in 3,3 level sizes
|
// compress in 3,3 level sizes
|
||||||
// since the compressor hasn't been run before they are empty
|
// since the compressor hasn't been run before they are empty
|
||||||
let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
|
let level_info = vec![Level::new(3), Level::new(3)];
|
||||||
|
|
||||||
// Run the compressor with those settings
|
// Run the compressor with those settings
|
||||||
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
|
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user