diff --git a/.gitignore b/.gitignore index c01caa1..d96511f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ *.data *.old **.sql -*.csv \ No newline at end of file +*.csv +**.log \ No newline at end of file diff --git a/auto_compressor/src/lib.rs b/auto_compressor/src/lib.rs index 4e28941..3be0c10 100644 --- a/auto_compressor/src/lib.rs +++ b/auto_compressor/src/lib.rs @@ -6,5 +6,47 @@ //! to the database and uses these to enable it to incrementally work //! on space reductions +use std::str::FromStr; + +use anyhow::Result; +use synapse_compress_state::Level; + pub mod manager; pub mod state_saving; + +/// Helper struct for parsing the `default_levels` argument. +/// +/// The compressor keeps track of a number of Levels, each of which have a maximum length, +/// current length, and an optional current head (None if level is empty, Some if a head +/// exists). +/// +/// This is needed since FromStr cannot be implemented for structs +/// that aren't defined in this scope +#[derive(PartialEq, Debug)] +pub struct LevelInfo(pub Vec); + +// Implement FromStr so that an argument of the form "100,50,25" +// can be used to create a vector of levels with max sizes 100, 50 and 25 +// For more info see the LevelState documentation in lib.rs +impl FromStr for LevelInfo { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + // Stores the max sizes of each level + let mut level_info: Vec = Vec::new(); + + // Split the string up at each comma + for size_str in s.split(',') { + // try and convert each section into a number + // panic if that fails + let size: usize = size_str + .parse() + .map_err(|_| "Not a comma separated list of numbers")?; + // add this parsed number to the sizes struct + level_info.push(Level::new(size)); + } + + // Return the built up vector inside a LevelInfo struct + Ok(LevelInfo(level_info)) + } +} diff --git a/auto_compressor/src/main.rs b/auto_compressor/src/main.rs new file mode 100644 index 0000000..7ece373 --- /dev/null +++ b/auto_compressor/src/main.rs @@ -0,0 +1,153 @@ +//! This is a tool that uses the synapse_compress_state library to +//! reduce the size of the synapse state_groups_state table in a postgres +//! database. +//! +//! It adds the tables state_compressor_state and state_compressor_progress +//! to the database and uses these to enable it to incrementally work +//! on space reductions. +//! +//! This binary calls manager::compress_largest_rooms() with the arguments +//! provided. That is, it compresses (in batches) the top N rooms ranked by +//! amount of "uncompressed" state. This is measured by the number of rows in +//! the state_groups_state table. +//! +//! After each batch, the rows processed are marked as "compressed" (using +//! the state_compressor_progress table), and the program state is saved into +//! the state_compressor_state table so that the compressor can seemlesly +//! continue from where it left off. + +#[global_allocator] +static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; + +use auto_compressor::{manager, state_saving, LevelInfo}; +use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg}; +use log::LevelFilter; +use std::{env, fs::OpenOptions}; + +/// Execution starts here +fn main() { + // setup the logger for the auto_compressor + // The default can be overwritten with RUST_LOG + // see the README for more information + let log_file = OpenOptions::new() + .append(true) + .create(true) + .open("auto_compressor.log") + .unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e)); + + if env::var("RUST_LOG").is_err() { + let mut log_builder = env_logger::builder(); + log_builder.target(env_logger::Target::Pipe(Box::new(log_file))); + // Ensure panics still come through + log_builder.filter_module("panic", LevelFilter::Error); + // Only output errors from the synapse_compress state library + log_builder.filter_module("synapse_compress_state", LevelFilter::Error); + // Output log levels info and above from auto_compressor + log_builder.filter_module("auto_compressor", LevelFilter::Info); + log_builder.init(); + } else { + // If RUST_LOG was set then use that + let mut log_builder = env_logger::Builder::from_env("RUST_LOG"); + log_builder.target(env_logger::Target::Pipe(Box::new(log_file))); + // Ensure panics still come through + log_builder.filter_module("panic", LevelFilter::Error); + log_builder.init(); + } + log_panics::init(); + // Announce the start of the program to the logs + log::info!("auto_compressor started"); + + // parse the command line arguments using the clap crate + let arguments = App::new(crate_name!()) + .version(crate_version!()) + .author(crate_authors!("\n")) + .about(crate_description!()) + .arg( + Arg::with_name("postgres-url") + .short("p") + .value_name("URL") + .help("The url for connecting to the postgres database.") + .long_help(concat!( + "The url for connecting to the postgres database.This should be of", + " the form \"postgresql://username:password@mydomain.com/database\"")) + .takes_value(true) + .required(true), + ).arg( + Arg::with_name("chunk_size") + .short("c") + .value_name("COUNT") + .help("The maximum number of state groups to load into memroy at once") + .long_help(concat!( + "The number of state_groups to work on at once. All of the entries", + " from state_groups_state are requested from the database", + " for state groups that are worked on. Therefore small", + " chunk sizes may be needed on machines with low memory.", + " (Note: if the compressor fails to find space savings on the", + " chunk as a whole (which may well happen in rooms with lots", + " of backfill in) then the entire chunk is skipped.)", + )) + .takes_value(true) + .required(true), + ).arg( + Arg::with_name("default_levels") + .short("l") + .value_name("LEVELS") + .help("Sizes of each new level in the compression algorithm, as a comma separated list.") + .long_help(concat!( + "Sizes of each new level in the compression algorithm, as a comma separated list.", + " The first entry in the list is for the lowest, most granular level,", + " with each subsequent entry being for the next highest level.", + " The number of entries in the list determines the number of levels", + " that will be used.", + "\nThe sum of the sizes of the levels effect the performance of fetching the state", + " from the database, as the sum of the sizes is the upper bound on number of", + " iterations needed to fetch a given set of state.", + )) + .default_value("100,50,25") + .takes_value(true) + .required(false), + ).arg( + Arg::with_name("number_of_chunks") + .short("n") + .value_name("CHUNKS_TO_COMPRESS") + .help("The number of chunks to compress") + .long_help("This many chunks of the database will be compressed ") + .takes_value(true) + .required(true), + ).get_matches(); + + // The URL of the database + let db_url = arguments + .value_of("postgres-url") + .expect("A database url is required"); + + // The number of state groups to work on at once + let chunk_size = arguments + .value_of("chunk_size") + .map(|s| s.parse().expect("chunk_size must be an integer")) + .expect("A chunk size is required"); + + // The default structure to use when compressing + let default_levels = value_t!(arguments, "default_levels", LevelInfo) + .unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e)); + + // The number of rooms to compress with this tool + let number_of_chunks = arguments + .value_of("number_of_chunks") + .map(|s| s.parse().expect("number_of_chunks must be an integer")) + .expect("number_of_chunks is required"); + + // Connect to the database and create the 2 tables this tool needs + // (Note: if they already exist then this does nothing) + let mut client = state_saving::connect_to_database(db_url) + .unwrap_or_else(|e| panic!("Error occured while connecting to {}: {}", db_url, e)); + state_saving::create_tables_if_needed(&mut client) + .unwrap_or_else(|e| panic!("Error occured while creating tables in database: {}", e)); + + // call compress_largest_rooms with the arguments supplied + // panic if an error is produced + manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks) + .unwrap(); + + log::info!("auto_compressor finished"); +}