1 Commits

Author SHA1 Message Date
Sean Quah
98f02f2667 Configure @matrix-org/synapse-core to be the code owner for the repo
Signed-off-by: Sean Quah <seanq@element.io>
2021-10-22 19:43:24 +01:00
16 changed files with 463 additions and 546 deletions

View File

@@ -1,3 +0,0 @@
.git
.github
/target

734
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,9 @@ version = "0.1.0"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
clap = "2.33.0"
indicatif = "0.16.0" indicatif = "0.16.0"
jemallocator = "0.3.2"
openssl = "0.10.32" openssl = "0.10.32"
postgres = "0.19.0" postgres = "0.19.0"
postgres-openssl = "0.5.0" postgres-openssl = "0.5.0"
@@ -18,7 +20,7 @@ rayon = "1.3.0"
string_cache = "0.8.0" string_cache = "0.8.0"
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.14" log = "0.4.14"
pyo3-log = "0.6.0" pyo3-log = "0.4.0"
log-panics = "2.0.0" log-panics = "2.0.0"
[dependencies.state-map] [dependencies.state-map]
@@ -28,19 +30,11 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
[lib] [lib]
crate-type = ["cdylib", "rlib"] crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "3.1.14"
features = ["cargo"]
[dependencies.pyo3] [dependencies.pyo3]
version = "0.16.4" version = "0.14.1"
features = ["extension-module"] features = ["extension-module","abi3-py36"]
[dependencies.tikv-jemallocator]
version = "0.5.0"
optional = true
[features] [features]
default = ["jemalloc"] default = ["jemalloc"]
jemalloc = ["tikv-jemallocator"] jemalloc = []
no-progress-bars = [] no-progress-bars = []

View File

@@ -1,22 +0,0 @@
FROM rust:alpine AS builder
RUN apk add python3 musl-dev pkgconfig openssl-dev make
ENV RUSTFLAGS="-C target-feature=-crt-static"
WORKDIR /opt/synapse-compressor/
COPY . .
RUN cargo build
WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
RUN cargo build
FROM alpine
RUN apk add --no-cache libgcc
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_compress_state /usr/local/bin/synapse_compress_state
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_auto_compressor /usr/local/bin/synapse_auto_compressor

View File

@@ -197,7 +197,7 @@ $ docker-compose down
# Using the synapse_compress_state library # Using the synapse_compress_state library
If you want to use the compressor in another project, it is recomended that you If you want to use the compressor in another project, it is recomended that you
use jemalloc `https://github.com/tikv/jemallocator`. use jemalloc `https://github.com/gnzlbg/jemallocator`.
To prevent the progress bars from being shown, use the `no-progress-bars` feature. To prevent the progress bars from being shown, use the `no-progress-bars` feature.
(See `synapse_auto_compressor/Cargo.toml` for an example) (See `synapse_auto_compressor/Cargo.toml` for an example)

View File

@@ -46,11 +46,10 @@ fn run_succeeds_without_crashing() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = false; let commit_changes = false;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url.clone(),
room_id, room_id.clone(),
output_file, output_file,
min_state_group, min_state_group,
groups_to_compress, groups_to_compress,
@@ -60,7 +59,6 @@ fn run_succeeds_without_crashing() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -96,7 +94,6 @@ fn changes_commited_if_no_min_saved_rows() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -110,7 +107,6 @@ fn changes_commited_if_no_min_saved_rows() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -164,7 +160,6 @@ fn changes_commited_if_min_saved_rows_exceeded() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -178,7 +173,6 @@ fn changes_commited_if_min_saved_rows_exceeded() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -233,7 +227,6 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -247,7 +240,6 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -288,7 +280,6 @@ fn run_panics_if_invalid_db_url() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -302,7 +293,6 @@ fn run_panics_if_invalid_db_url() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -346,7 +336,6 @@ fn run_only_affects_given_room_id() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -360,7 +349,6 @@ fn run_only_affects_given_room_id() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -418,7 +406,6 @@ fn run_respects_groups_to_compress() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -432,7 +419,6 @@ fn run_respects_groups_to_compress() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -506,7 +492,6 @@ fn run_is_idempotent_when_run_on_whole_room() {
let transactions = true; let transactions = true;
let graphs = false; let graphs = false;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config1 = Config::new( let config1 = Config::new(
db_url.clone(), db_url.clone(),
@@ -520,23 +505,21 @@ fn run_is_idempotent_when_run_on_whole_room() {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
let config2 = Config::new( let config2 = Config::new(
db_url, db_url.clone(),
room_id, room_id.clone(),
output_file2, output_file2,
min_state_group, min_state_group,
groups_to_compress, groups_to_compress,
min_saved_rows, min_saved_rows,
max_state_group, max_state_group,
level_sizes, level_sizes.clone(),
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();

View File

@@ -56,7 +56,7 @@ fn continue_run_called_twice_same_as_run() {
let start = Some(6); let start = Some(6);
let chunk_size = 7; let chunk_size = 7;
let level_info = chunk_stats_1.new_level_info; let level_info = chunk_stats_1.new_level_info.clone();
// Run the compressor with those settings // Run the compressor with those settings
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap(); let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();

View File

@@ -181,11 +181,12 @@ impl<'a> Compressor<'a> {
panic!("Can only call `create_new_tree` once"); panic!("Can only call `create_new_tree` once");
} }
let pb = if cfg!(feature = "no-progress-bars") { let pb: ProgressBar;
ProgressBar::hidden() if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else { } else {
ProgressBar::new(self.original_state_map.len() as u64) pb = ProgressBar::new(self.original_state_map.len() as u64);
}; }
pb.set_style( pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"), ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
); );

View File

@@ -96,7 +96,11 @@ fn create_new_tree_does_nothing_if_already_compressed() {
let pred_group = initial_edges.get(&i); let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64> // Need Option<i64> not Option<&i64>
let prev = pred_group.copied(); let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
// insert that edge into the initial map // insert that edge into the initial map
initial.insert( initial.insert(

View File

@@ -54,7 +54,7 @@ fn get_head_returns_head() {
#[test] #[test]
fn has_space_returns_true_if_empty() { fn has_space_returns_true_if_empty() {
let l = Level::new(15); let l = Level::new(15);
assert!(l.has_space()); assert_eq!(l.has_space(), true);
} }
#[test] #[test]
@@ -65,7 +65,7 @@ fn has_space_returns_true_if_part_full() {
l.update(1, true); l.update(1, true);
l.update(143, true); l.update(143, true);
l.update(15, true); l.update(15, true);
assert!(l.has_space()); assert_eq!(l.has_space(), true);
} }
#[test] #[test]
@@ -76,5 +76,5 @@ fn has_space_returns_false_if_full() {
l.update(3, true); l.update(3, true);
l.update(4, true); l.update(4, true);
l.update(5, true); l.update(5, true);
assert!(!l.has_space()); assert_eq!(l.has_space(), false);
} }

View File

@@ -145,7 +145,11 @@ fn stats_correct_if_no_changes() {
let pred_group = initial_edges.get(&i); let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64> // Need Option<i64> not Option<&i64>
let prev = pred_group.copied(); let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
// insert that edge into the initial map // insert that edge into the initial map
initial.insert( initial.insert(

View File

@@ -372,11 +372,12 @@ fn get_initial_data_from_db(
// Copy the data from the database into a map // Copy the data from the database into a map
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new(); let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let pb = if cfg!(feature = "no-progress-bars") { let pb: ProgressBar;
ProgressBar::hidden() if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else { } else {
ProgressBar::new_spinner() pb = ProgressBar::new_spinner();
}; }
pb.set_style( pb.set_style(
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"), ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
); );
@@ -536,11 +537,12 @@ pub fn send_changes_to_db(
debug!("Writing changes..."); debug!("Writing changes...");
// setup the progress bar // setup the progress bar
let pb = if cfg!(feature = "no-progress-bars") { let pb: ProgressBar;
ProgressBar::hidden() if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else { } else {
ProgressBar::new(old_map.len() as u64) pb = ProgressBar::new(old_map.len() as u64);
}; }
pb.set_style( pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"), ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
); );

View File

@@ -23,7 +23,7 @@
use log::{info, warn, LevelFilter}; use log::{info, warn, LevelFilter};
use pyo3::{exceptions, prelude::*}; use pyo3::{exceptions, prelude::*};
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command}; use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*; use rayon::prelude::*;
use state_map::StateMap; use state_map::StateMap;
@@ -109,21 +109,18 @@ pub struct Config {
// Whether or not to commit changes to the database automatically // Whether or not to commit changes to the database automatically
// N.B. currently assumes transactions is true (to be on the safe side) // N.B. currently assumes transactions is true (to be on the safe side)
commit_changes: bool, commit_changes: bool,
// Whether to verify the correctness of the compressed state groups by
// comparing them to the original groups
verify: bool,
} }
impl Config { impl Config {
/// Build up config from command line arguments /// Build up config from command line arguments
pub fn parse_arguments() -> Config { pub fn parse_arguments() -> Config {
let matches = Command::new(crate_name!()) let matches = App::new(crate_name!())
.version(crate_version!()) .version(crate_version!())
.author(crate_authors!("\n")) .author(crate_authors!("\n"))
.about(crate_description!()) .about(crate_description!())
.arg( .arg(
Arg::new("postgres-url") Arg::with_name("postgres-url")
.short('p') .short("p")
.value_name("POSTGRES_LOCATION") .value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.") .help("The configruation for connecting to the postgres database.")
.long_help(concat!( .long_help(concat!(
@@ -136,8 +133,8 @@ impl Config {
.takes_value(true) .takes_value(true)
.required(true), .required(true),
).arg( ).arg(
Arg::new("room_id") Arg::with_name("room_id")
.short('r') .short("r")
.value_name("ROOM_ID") .value_name("ROOM_ID")
.help("The room to process") .help("The room to process")
.long_help(concat!( .long_help(concat!(
@@ -147,23 +144,23 @@ impl Config {
.takes_value(true) .takes_value(true)
.required(true), .required(true),
).arg( ).arg(
Arg::new("min_state_group") Arg::with_name("min_state_group")
.short('b') .short("b")
.value_name("MIN_STATE_GROUP") .value_name("MIN_STATE_GROUP")
.help("The state group to start processing from (non inclusive)") .help("The state group to start processing from (non inclusive)")
.takes_value(true) .takes_value(true)
.required(false), .required(false),
).arg( ).arg(
Arg::new("min_saved_rows") Arg::with_name("min_saved_rows")
.short('m') .short("m")
.value_name("COUNT") .value_name("COUNT")
.help("Abort if fewer than COUNT rows would be saved") .help("Abort if fewer than COUNT rows would be saved")
.long_help("If the compressor cannot save this many rows from the database then it will stop early") .long_help("If the compressor cannot save this many rows from the database then it will stop early")
.takes_value(true) .takes_value(true)
.required(false), .required(false),
).arg( ).arg(
Arg::new("groups_to_compress") Arg::with_name("groups_to_compress")
.short('n') .short("n")
.value_name("GROUPS_TO_COMPRESS") .value_name("GROUPS_TO_COMPRESS")
.help("How many groups to load into memory to compress") .help("How many groups to load into memory to compress")
.long_help(concat!( .long_help(concat!(
@@ -172,14 +169,14 @@ impl Config {
.takes_value(true) .takes_value(true)
.required(false), .required(false),
).arg( ).arg(
Arg::new("output_file") Arg::with_name("output_file")
.short('o') .short("o")
.value_name("FILE") .value_name("FILE")
.help("File to output the changes to in SQL") .help("File to output the changes to in SQL")
.takes_value(true), .takes_value(true),
).arg( ).arg(
Arg::new("max_state_group") Arg::with_name("max_state_group")
.short('s') .short("s")
.value_name("MAX_STATE_GROUP") .value_name("MAX_STATE_GROUP")
.help("The maximum state group to process up to") .help("The maximum state group to process up to")
.long_help(concat!( .long_help(concat!(
@@ -188,8 +185,8 @@ impl Config {
.takes_value(true) .takes_value(true)
.required(false), .required(false),
).arg( ).arg(
Arg::new("level_sizes") Arg::with_name("level_sizes")
.short('l') .short("l")
.value_name("LEVELS") .value_name("LEVELS")
.help("Sizes of each new level in the compression algorithm, as a comma separated list.") .help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!( .long_help(concat!(
@@ -205,34 +202,27 @@ impl Config {
.default_value("100,50,25") .default_value("100,50,25")
.takes_value(true), .takes_value(true),
).arg( ).arg(
Arg::new("transactions") Arg::with_name("transactions")
.short('t') .short("t")
.help("Whether to wrap each state group change in a transaction") .help("Whether to wrap each state group change in a transaction")
.long_help(concat!("If this flag is set then then each change to a particular", .long_help(concat!("If this flag is set then then each change to a particular",
" state group is wrapped in a transaction. This should be done if you wish to", " state group is wrapped in a transaction. This should be done if you wish to",
" apply the changes while synapse is still running.")) " apply the changes while synapse is still running."))
.requires("output_file"), .requires("output_file"),
).arg( ).arg(
Arg::new("graphs") Arg::with_name("graphs")
.short('g') .short("g")
.help("Output before and after graphs") .help("Output before and after graphs")
.long_help(concat!("If this flag is set then output the node and edge information for", .long_help(concat!("If this flag is set then output the node and edge information for",
" the state_group directed graph built up from the predecessor state_group links.", " the state_group directed graph built up from the predecessor state_group links.",
" These can be looked at in something like Gephi (https://gephi.org)")), " These can be looked at in something like Gephi (https://gephi.org)")),
).arg( ).arg(
Arg::new("commit_changes") Arg::with_name("commit_changes")
.short('c') .short("c")
.help("Commit changes to the database") .help("Commit changes to the database")
.long_help(concat!("If this flag is set then the changes the compressor makes will", .long_help(concat!("If this flag is set then the changes the compressor makes will",
" be committed to the database. This should be safe to use while synapse is running", " be committed to the database. This should be safe to use while synapse is running",
" as it assumes by default that the transactions flag is set")), " as it assumes by default that the transactions flag is set")),
).arg(
Arg::new("no_verify")
.short('N')
.help("Do not double-check that the compression was performed correctly")
.long_help(concat!("If this flag is set then the verification of the compressed",
" state groups, which compares them to the original groups, is skipped. This",
" saves time at the cost of potentially generating mismatched state.")),
).get_matches(); ).get_matches();
let db_url = matches let db_url = matches
@@ -263,8 +253,7 @@ impl Config {
.value_of("max_state_group") .value_of("max_state_group")
.map(|s| s.parse().expect("max_state_group must be an integer")); .map(|s| s.parse().expect("max_state_group must be an integer"));
let level_sizes = matches let level_sizes = value_t!(matches, "level_sizes", LevelSizes)
.value_of_t::<LevelSizes>("level_sizes")
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e)); .unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
let transactions = matches.is_present("transactions"); let transactions = matches.is_present("transactions");
@@ -273,8 +262,6 @@ impl Config {
let commit_changes = matches.is_present("commit_changes"); let commit_changes = matches.is_present("commit_changes");
let verify = !matches.is_present("no_verify");
Config { Config {
db_url: String::from(db_url), db_url: String::from(db_url),
output_file, output_file,
@@ -287,7 +274,6 @@ impl Config {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
} }
} }
} }
@@ -386,9 +372,7 @@ pub fn run(mut config: Config) {
} }
} }
if config.verify { check_that_maps_match(&state_group_map, new_state_group_map);
check_that_maps_match(&state_group_map, new_state_group_map);
}
// If we are given an output file, we output the changes as SQL. If the // If we are given an output file, we output the changes as SQL. If the
// `transactions` argument is set we wrap each change to a state group in a // `transactions` argument is set we wrap each change to a state group in a
@@ -508,11 +492,12 @@ fn output_sql(
info!("Writing changes..."); info!("Writing changes...");
let pb = if cfg!(feature = "no-progress-bars") { let pb: ProgressBar;
ProgressBar::hidden() if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else { } else {
ProgressBar::new(old_map.len() as u64) pb = ProgressBar::new(old_map.len() as u64);
}; }
pb.set_style( pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"), ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
); );
@@ -622,11 +607,12 @@ fn check_that_maps_match(
) { ) {
info!("Checking that state maps match..."); info!("Checking that state maps match...");
let pb = if cfg!(feature = "no-progress-bars") { let pb: ProgressBar;
ProgressBar::hidden() if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else { } else {
ProgressBar::new(old_map.len() as u64) pb = ProgressBar::new(old_map.len() as u64);
}; }
pb.set_style( pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"), ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
); );
@@ -709,7 +695,6 @@ impl Config {
transactions: bool, transactions: bool,
graphs: bool, graphs: bool,
commit_changes: bool, commit_changes: bool,
verify: bool,
) -> Result<Config, String> { ) -> Result<Config, String> {
let mut output: Option<File> = None; let mut output: Option<File> = None;
if let Some(file) = output_file { if let Some(file) = output_file {
@@ -737,7 +722,6 @@ impl Config {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
}) })
} }
} }
@@ -762,7 +746,6 @@ impl Config {
transactions = true, transactions = true,
graphs = false, graphs = false,
commit_changes = false, commit_changes = false,
verify = true,
)] )]
fn run_compression( fn run_compression(
db_url: String, db_url: String,
@@ -776,7 +759,6 @@ fn run_compression(
transactions: bool, transactions: bool,
graphs: bool, graphs: bool,
commit_changes: bool, commit_changes: bool,
verify: bool,
) -> PyResult<()> { ) -> PyResult<()> {
let config = Config::new( let config = Config::new(
db_url, db_url,
@@ -790,7 +772,6 @@ fn run_compression(
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
); );
match config { match config {
Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)), Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)),
@@ -974,6 +955,7 @@ mod lib_tests {
#[test] #[test]
fn check_that_maps_match_returns_if_both_empty() { fn check_that_maps_match_returns_if_both_empty() {
check_that_maps_match(&BTreeMap::new(), &BTreeMap::new()); check_that_maps_match(&BTreeMap::new(), &BTreeMap::new());
assert!(true);
} }
#[test] #[test]
@@ -1006,6 +988,7 @@ mod lib_tests {
} }
check_that_maps_match(&old_map, &BTreeMap::new()); check_that_maps_match(&old_map, &BTreeMap::new());
assert!(true);
} }
#[test] #[test]
@@ -1041,6 +1024,7 @@ mod lib_tests {
} }
check_that_maps_match(&BTreeMap::new(), &new_map); check_that_maps_match(&BTreeMap::new(), &new_map);
assert!(true);
} }
#[test] #[test]
@@ -1072,6 +1056,7 @@ mod lib_tests {
} }
check_that_maps_match(&BTreeMap::new(), &old_map.clone()); check_that_maps_match(&BTreeMap::new(), &old_map.clone());
assert!(true);
} }
#[test] #[test]
@@ -1134,6 +1119,7 @@ mod lib_tests {
} }
check_that_maps_match(&old_map, &new_map); check_that_maps_match(&old_map, &new_map);
assert!(true);
} }
#[test] #[test]
@@ -1215,6 +1201,7 @@ mod lib_tests {
); );
check_that_maps_match(&old_map, &new_map); check_that_maps_match(&old_map, &new_map);
assert!(true);
} }
//TODO: tests for correct SQL code produced by output_sql //TODO: tests for correct SQL code produced by output_sql
@@ -1237,7 +1224,6 @@ mod pyo3_tests {
let transactions = false; let transactions = false;
let graphs = false; let graphs = false;
let commit_changes = false; let commit_changes = false;
let verify = true;
let config = Config::new( let config = Config::new(
db_url.clone(), db_url.clone(),
@@ -1251,7 +1237,6 @@ mod pyo3_tests {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
@@ -1285,7 +1270,6 @@ mod pyo3_tests {
let transactions = true; let transactions = true;
let graphs = true; let graphs = true;
let commit_changes = true; let commit_changes = true;
let verify = true;
let config = Config::new( let config = Config::new(
db_url.clone(), db_url.clone(),
@@ -1299,12 +1283,11 @@ mod pyo3_tests {
transactions, transactions,
graphs, graphs,
commit_changes, commit_changes,
verify,
) )
.unwrap(); .unwrap();
assert_eq!(config.db_url, db_url); assert_eq!(config.db_url, db_url);
assert!(config.output_file.is_some()); assert!(!config.output_file.is_none());
assert_eq!(config.room_id, room_id); assert_eq!(config.room_id, room_id);
assert_eq!(config.min_state_group, Some(3225)); assert_eq!(config.min_state_group, Some(3225));
assert_eq!(config.groups_to_compress, Some(970)); assert_eq!(config.groups_to_compress, Some(970));

View File

@@ -18,7 +18,7 @@
#[cfg(feature = "jemalloc")] #[cfg(feature = "jemalloc")]
#[global_allocator] #[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
use log::LevelFilter; use log::LevelFilter;
use std::env; use std::env;

View File

@@ -1,11 +1,11 @@
[package] [package]
name = "synapse_auto_compressor" name = "synapse_auto_compressor"
authors = ["William Ashton"] authors = ["William Ashton"]
version = "0.1.3" version = "0.1.2"
edition = "2018" edition = "2018"
[package.metadata.maturin] [package.metadata.maturin]
requires-python = ">=3.7" requires-python = ">=3.6"
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"} project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
classifier = [ classifier = [
"Development Status :: 4 - Beta", "Development Status :: 4 - Beta",
@@ -13,10 +13,11 @@ classifier = [
] ]
[dependencies] [dependencies]
clap = "2.33.0"
openssl = "0.10.32" openssl = "0.10.32"
postgres = "0.19.0" postgres = "0.19.0"
postgres-openssl = "0.5.0" postgres-openssl = "0.5.0"
tikv-jemallocator = "0.5.0" jemallocator = "0.3.2"
rand = "0.8.0" rand = "0.8.0"
serial_test = "0.5.1" serial_test = "0.5.1"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] } synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
@@ -24,16 +25,12 @@ env_logger = "0.9.0"
log = "0.4.14" log = "0.4.14"
log-panics = "2.0.0" log-panics = "2.0.0"
anyhow = "1.0.42" anyhow = "1.0.42"
pyo3-log = "0.6.0" pyo3-log = "0.4.0"
# Needed for pyo3 support # Needed for pyo3 support
[lib] [lib]
crate-type = ["cdylib", "rlib"] crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "3.1.14"
features = ["cargo"]
[dependencies.pyo3] [dependencies.pyo3]
version = "0.16.4" version = "0.14.1"
features = ["extension-module"] features = ["extension-module","abi3-py36"]

View File

@@ -17,11 +17,11 @@
//! continue from where it left off. //! continue from where it left off.
#[global_allocator] #[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command}; use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use log::LevelFilter; use log::LevelFilter;
use std::env; use std::{env, fs::OpenOptions};
use synapse_auto_compressor::{manager, state_saving, LevelInfo}; use synapse_auto_compressor::{manager, state_saving, LevelInfo};
/// Execution starts here /// Execution starts here
@@ -29,6 +29,12 @@ fn main() {
// setup the logger for the synapse_auto_compressor // setup the logger for the synapse_auto_compressor
// The default can be overwritten with RUST_LOG // The default can be overwritten with RUST_LOG
// see the README for more information // see the README for more information
let log_file = OpenOptions::new()
.append(true)
.create(true)
.open("synapse_auto_compressor.log")
.unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e));
if env::var("RUST_LOG").is_err() { if env::var("RUST_LOG").is_err() {
let mut log_builder = env_logger::builder(); let mut log_builder = env_logger::builder();
// Ensure panics still come through // Ensure panics still come through
@@ -41,6 +47,7 @@ fn main() {
} else { } else {
// If RUST_LOG was set then use that // If RUST_LOG was set then use that
let mut log_builder = env_logger::Builder::from_env("RUST_LOG"); let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
// Ensure panics still come through // Ensure panics still come through
log_builder.filter_module("panic", LevelFilter::Error); log_builder.filter_module("panic", LevelFilter::Error);
log_builder.init(); log_builder.init();
@@ -50,13 +57,13 @@ fn main() {
log::info!("synapse_auto_compressor started"); log::info!("synapse_auto_compressor started");
// parse the command line arguments using the clap crate // parse the command line arguments using the clap crate
let arguments = Command::new(crate_name!()) let arguments = App::new(crate_name!())
.version(crate_version!()) .version(crate_version!())
.author(crate_authors!("\n")) .author(crate_authors!("\n"))
.about(crate_description!()) .about(crate_description!())
.arg( .arg(
Arg::new("postgres-url") Arg::with_name("postgres-url")
.short('p') .short("p")
.value_name("POSTGRES_LOCATION") .value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.") .help("The configruation for connecting to the postgres database.")
.long_help(concat!( .long_help(concat!(
@@ -69,8 +76,8 @@ fn main() {
.takes_value(true) .takes_value(true)
.required(true), .required(true),
).arg( ).arg(
Arg::new("chunk_size") Arg::with_name("chunk_size")
.short('c') .short("c")
.value_name("COUNT") .value_name("COUNT")
.help("The maximum number of state groups to load into memroy at once") .help("The maximum number of state groups to load into memroy at once")
.long_help(concat!( .long_help(concat!(
@@ -85,8 +92,8 @@ fn main() {
.takes_value(true) .takes_value(true)
.required(true), .required(true),
).arg( ).arg(
Arg::new("default_levels") Arg::with_name("default_levels")
.short('l') .short("l")
.value_name("LEVELS") .value_name("LEVELS")
.help("Sizes of each new level in the compression algorithm, as a comma separated list.") .help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!( .long_help(concat!(
@@ -103,8 +110,8 @@ fn main() {
.takes_value(true) .takes_value(true)
.required(false), .required(false),
).arg( ).arg(
Arg::new("number_of_chunks") Arg::with_name("number_of_chunks")
.short('n') .short("n")
.value_name("CHUNKS_TO_COMPRESS") .value_name("CHUNKS_TO_COMPRESS")
.help("The number of chunks to compress") .help("The number of chunks to compress")
.long_help(concat!( .long_help(concat!(
@@ -127,8 +134,7 @@ fn main() {
.expect("A chunk size is required"); .expect("A chunk size is required");
// The default structure to use when compressing // The default structure to use when compressing
let default_levels = arguments let default_levels = value_t!(arguments, "default_levels", LevelInfo)
.value_of_t::<LevelInfo>("default_levels")
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e)); .unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
// The number of rooms to compress with this tool // The number of rooms to compress with this tool