17 Commits

Author SHA1 Message Date
Erik Johnston
1ffb727c28 Fmt 2023-03-27 11:18:36 +01:00
Erik Johnston
737d7adc48 Fix clippy
(This was fixed via `cargo clippy --fix`)
2023-03-27 11:16:33 +01:00
David Robertson
13882d7654 Merge pull request #113 from matrix-org/dependabot/cargo/tokio-1.25.0
Bump tokio from 1.24.1 to 1.25.0
2023-02-05 00:22:20 +00:00
dependabot[bot]
c0dac572c1 Bump tokio from 1.24.1 to 1.25.0
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.24.1 to 1.25.0.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.24.1...tokio-1.25.0)

---
updated-dependencies:
- dependency-name: tokio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-02-04 00:58:15 +00:00
David Robertson
856b799c53 Merge pull request #112 from matrix-org/dependabot/cargo/tokio-1.24.1
Bump tokio from 1.21.2 to 1.24.1
2023-01-09 14:24:56 +00:00
dependabot[bot]
aab4d37123 Bump tokio from 1.21.2 to 1.24.1
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.21.2 to 1.24.1.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.21.2...tokio-1.24.1)

---
updated-dependencies:
- dependency-name: tokio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-01-06 21:49:13 +00:00
Shay
fce2a7eee8 Merge pull request #111 from matrix-org/shay/rust_min_version
Update READ.me with information about Rust minimum version
2022-12-13 11:03:56 -08:00
Shay
74bd719262 Update README.md
Co-authored-by: David Robertson <davidr@element.io>
2022-12-02 10:28:16 -08:00
Shay
e3075d1451 Update READ.me with information about Rust minimum version 2022-12-02 10:19:49 -08:00
David Robertson
d22acc6906 Merge pull request #109 from kittykat/patch-2 2022-11-02 15:19:09 +00:00
Kat Gerasimova
88d97ea413 Add automation to move X-Needs-Info issues
When an issue is labelled with X-Needs-Info, it should move to the correct column on the issue triage board.
2022-11-02 15:05:04 +00:00
Jan Alexander Steffens
152808baca Fix clippy warnings, update dependencies (3) (#106) 2022-10-17 13:43:39 +01:00
Jelmer Vernooij
2596f25eea Qualify docker image name. (#104) 2022-10-05 10:45:08 +01:00
Kat Gerasimova
4d3049d3ed Add issue automation for triage (#103)
Move new issues to https://github.com/orgs/matrix-org/projects/67 for triage
2022-09-02 16:52:36 +01:00
Erik Johnston
9ff021f32e Add contributing guide (#102) 2022-08-03 15:18:54 +01:00
Landry Breuil
019b100521 make jemalloc dependency really optional (#101)
Signed-off-by: Sebastien Marie <semarie@online.fr>
2022-08-03 10:57:00 +01:00
Jan Alexander Steffens
da6271a331 Fix clippy warnings, update dependencies (again) (#100) 2022-08-03 10:52:47 +01:00
14 changed files with 532 additions and 381 deletions

28
.github/workflows/triage_incoming.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: Move new issues into the issue triage board
on:
issues:
types: [ opened ]
jobs:
add_new_issues:
name: Add new issues to the triage board
runs-on: ubuntu-latest
steps:
- uses: octokit/graphql-action@v2.x
id: add_to_project
with:
headers: '{"GraphQL-Features": "projects_next_graphql"}'
query: |
mutation add_to_project($projectid:ID!,$contentid:ID!) {
addProjectV2ItemById(input: {projectId: $projectid contentId: $contentid}) {
item {
id
}
}
}
projectid: ${{ env.PROJECT_ID }}
contentid: ${{ github.event.issue.node_id }}
env:
PROJECT_ID: "PVT_kwDOAIB0Bs4AFDdZ"
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}

44
.github/workflows/triage_labelled.yml vendored Normal file
View File

@@ -0,0 +1,44 @@
name: Move labelled issues to correct projects
on:
issues:
types: [ labeled ]
jobs:
move_needs_info:
name: Move X-Needs-Info on the triage board
runs-on: ubuntu-latest
if: >
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
steps:
- uses: actions/add-to-project@main
id: add_project
with:
project-url: "https://github.com/orgs/matrix-org/projects/67"
github-token: ${{ secrets.ELEMENT_BOT_TOKEN }}
- name: Set status
env:
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
run: |
gh api graphql -f query='
mutation(
$project: ID!
$item: ID!
$fieldid: ID!
$columnid: String!
) {
updateProjectV2ItemFieldValue(
input: {
projectId: $project
itemId: $item
fieldId: $fieldid
value: {
singleSelectOptionId: $columnid
}
}
) {
projectV2Item {
id
}
}
}' -f project="PVT_kwDOAIB0Bs4AFDdZ" -f item=${{ steps.add_project.outputs.itemId }} -f fieldid="PVTSSF_lADOAIB0Bs4AFDdZzgC6ZA4" -f columnid=ba22e43c --silent

530
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@ version = "0.1.0"
edition = "2018"
[dependencies]
indicatif = "0.16.0"
indicatif = "0.17.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
@@ -18,7 +18,7 @@ rayon = "1.3.0"
string_cache = "0.8.0"
env_logger = "0.9.0"
log = "0.4.14"
pyo3-log = "0.6.0"
pyo3-log = "0.7.0"
log-panics = "2.0.0"
[dependencies.state-map]
@@ -29,11 +29,11 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "3.1.14"
version = "4.0.15"
features = ["cargo"]
[dependencies.pyo3]
version = "0.16.4"
version = "0.17.1"
features = ["extension-module"]
[dependencies.tikv-jemallocator]

View File

@@ -1,4 +1,4 @@
FROM rust:alpine AS builder
FROM docker.io/rust:alpine AS builder
RUN apk add python3 musl-dev pkgconfig openssl-dev make
@@ -14,7 +14,7 @@ WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
RUN cargo build
FROM alpine
FROM docker.io/alpine
RUN apk add --no-cache libgcc

View File

@@ -26,6 +26,9 @@ periodically.
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
for instructions on how to do this.
This project follows the deprecation policy of [Synapse](https://matrix-org.github.io/synapse/latest/deprecation_policy.html)
on Rust and will assume a recent stable version of Rust and the ability to fetch a more recent one if necessary.
To build `synapse_auto_compressor`, clone this repository and navigate to the
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.

View File

@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
string_cache = "0.8.0"
serial_test = "0.5.1"
serial_test = "0.9.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"

View File

@@ -4,7 +4,12 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use state_map::StateMap;
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
use std::{
borrow::Cow,
collections::BTreeMap,
env,
fmt::{self, Write as _},
};
use string_cache::DefaultAtom as Atom;
use synapse_compress_state::StateGroupEntry;
@@ -23,47 +28,48 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
let mut client = Client::connect(DB_URL, connector).unwrap();
// build up the query
let mut sql = "".to_string();
let mut sql = String::new();
let room_id = PGEscape(room_id);
let event_id = PGEscape("left_blank");
for (sg, entry) in state_group_map {
// create the entry for state_groups
sql.push_str(&format!(
"INSERT INTO state_groups (id, room_id, event_id) VALUES ({},{},{});\n",
sg,
PGEscape(room_id),
PGEscape("left_blank")
));
writeln!(
sql,
"INSERT INTO state_groups (id, room_id, event_id) \
VALUES ({sg}, {room_id}, {event_id});",
)
.expect("Writing to a String cannot fail");
// create the entry in state_group_edges IF exists
if let Some(prev_sg) = entry.prev_state_group {
sql.push_str(&format!(
"INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n",
sg, prev_sg
));
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
}
// write entry for each row in delta
if !entry.state_map.is_empty() {
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
let mut first = true;
for ((t, s), e) in entry.state_map.iter() {
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
}
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
}
sql.push_str(";\n");
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
}
}

View File

@@ -30,7 +30,7 @@
use indicatif::{ProgressBar, ProgressStyle};
use state_map::StateMap;
use std::collections::BTreeMap;
use std::{collections::BTreeMap, time::Duration};
use string_cache::DefaultAtom as Atom;
use super::{collapse_state_maps, StateGroupEntry};
@@ -156,7 +156,7 @@ impl<'a> Compressor<'a> {
) -> Compressor<'a> {
let levels = level_info
.iter()
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).head))
.map(|l| Level::restore(l.max_length, l.current_chain_length, l.head))
.collect();
let mut compressor = Compressor {
@@ -187,10 +187,12 @@ impl<'a> Compressor<'a> {
ProgressBar::new(self.original_state_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
for (&state_group, entry) in self.original_state_map {
// Check whether this entry is in_range or is just present in the map due to being

View File

@@ -18,7 +18,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{borrow::Cow, collections::BTreeMap, fmt};
use std::{borrow::Cow, collections::BTreeMap, fmt, time::Duration};
use crate::{compressor::Level, generate_sql};
@@ -237,15 +237,9 @@ fn load_map_from_db(
let mut missing_sgs: Vec<_> = state_group_map
.iter()
.filter_map(|(_sg, entry)| {
if let Some(prev_sg) = entry.prev_state_group {
if state_group_map.contains_key(&prev_sg) {
None
} else {
Some(prev_sg)
}
} else {
None
}
entry
.prev_state_group
.filter(|&prev_sg| !state_group_map.contains_key(&prev_sg))
})
.collect();
@@ -378,9 +372,11 @@ fn get_initial_data_from_db(
ProgressBar::new_spinner()
};
pb.set_style(
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
ProgressStyle::default_spinner()
.template("{spinner} [{elapsed}] {pos} rows retrieved")
.unwrap(),
);
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
while let Some(row) = rows.next().unwrap() {
// The row in the map to copy the data to
@@ -542,10 +538,12 @@ pub fn send_changes_to_db(
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
for sql_transaction in generate_sql(old_map, new_map, room_id) {
if sql_transaction.is_empty() {

View File

@@ -27,7 +27,10 @@ use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Com
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use state_map::StateMap;
use std::{collections::BTreeMap, convert::TryInto, fs::File, io::Write, str::FromStr};
use std::{
collections::BTreeMap, convert::TryInto, fmt::Write as _, fs::File, io::Write, str::FromStr,
time::Duration,
};
use string_cache::DefaultAtom as Atom;
mod compressor;
@@ -49,7 +52,7 @@ pub struct StateGroupEntry {
}
/// Helper struct for parsing the `level_sizes` argument.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
struct LevelSizes(Vec<usize>);
impl FromStr for LevelSizes {
@@ -133,7 +136,7 @@ impl Config {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::new("room_id")
@@ -144,53 +147,58 @@ impl Config {
"The room to process. This is the value found in the rooms table of the database",
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::new("min_state_group")
.short('b')
.value_name("MIN_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The state group to start processing from (non inclusive)")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::new("min_saved_rows")
.short('m')
.value_name("COUNT")
.value_parser(clap::value_parser!(i32))
.help("Abort if fewer than COUNT rows would be saved")
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::new("groups_to_compress")
.short('n')
.value_name("GROUPS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("How many groups to load into memory to compress")
.long_help(concat!(
"How many groups to load into memory to compress (starting from",
" the 1st group in the room or the group specified by -s)"))
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::new("output_file")
.short('o')
.value_name("FILE")
.help("File to output the changes to in SQL")
.takes_value(true),
.num_args(1),
).arg(
Arg::new("max_state_group")
.short('s')
.value_name("MAX_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The maximum state group to process up to")
.long_help(concat!(
"If a max_state_group is specified then only state groups with id's lower",
" than this number are able to be compressed."))
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::new("level_sizes")
.short('l')
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelSizes))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -203,10 +211,11 @@ impl Config {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.takes_value(true),
.num_args(1),
).arg(
Arg::new("transactions")
.short('t')
.action(clap::ArgAction::SetTrue)
.help("Whether to wrap each state group change in a transaction")
.long_help(concat!("If this flag is set then then each change to a particular",
" state group is wrapped in a transaction. This should be done if you wish to",
@@ -215,6 +224,7 @@ impl Config {
).arg(
Arg::new("graphs")
.short('g')
.action(clap::ArgAction::SetTrue)
.help("Output before and after graphs")
.long_help(concat!("If this flag is set then output the node and edge information for",
" the state_group directed graph built up from the predecessor state_group links.",
@@ -222,6 +232,7 @@ impl Config {
).arg(
Arg::new("commit_changes")
.short('c')
.action(clap::ArgAction::SetTrue)
.help("Commit changes to the database")
.long_help(concat!("If this flag is set then the changes the compressor makes will",
" be committed to the database. This should be safe to use while synapse is running",
@@ -229,6 +240,7 @@ impl Config {
).arg(
Arg::new("no_verify")
.short('N')
.action(clap::ArgAction::SetTrue)
.help("Do not double-check that the compression was performed correctly")
.long_help(concat!("If this flag is set then the verification of the compressed",
" state groups, which compares them to the original groups, is skipped. This",
@@ -236,44 +248,27 @@ impl Config {
).get_matches();
let db_url = matches
.value_of("postgres-url")
.get_one::<String>("postgres-url")
.expect("db url should be required");
let output_file = matches.value_of("output_file").map(|path| {
let output_file = matches.get_one::<String>("output_file").map(|path| {
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
});
let room_id = matches
.value_of("room_id")
.get_one::<String>("room_id")
.expect("room_id should be required since no file");
let min_state_group = matches
.value_of("min_state_group")
.map(|s| s.parse().expect("min_state_group must be an integer"));
let min_state_group = matches.get_one("min_state_group").copied();
let groups_to_compress = matches.get_one("groups_to_compress").copied();
let min_saved_rows = matches.get_one("min_saved_rows").copied();
let max_state_group = matches.get_one("max_state_group").copied();
let level_sizes = matches.get_one("level_sizes").cloned().unwrap();
let groups_to_compress = matches
.value_of("groups_to_compress")
.map(|s| s.parse().expect("groups_to_compress must be an integer"));
let min_saved_rows = matches
.value_of("min_saved_rows")
.map(|v| v.parse().expect("COUNT must be an integer"));
let max_state_group = matches
.value_of("max_state_group")
.map(|s| s.parse().expect("max_state_group must be an integer"));
let level_sizes = matches
.value_of_t::<LevelSizes>("level_sizes")
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
let transactions = matches.is_present("transactions");
let graphs = matches.is_present("graphs");
let commit_changes = matches.is_present("commit_changes");
let verify = !matches.is_present("no_verify");
let transactions = matches.get_flag("transactions");
let graphs = matches.get_flag("graphs");
let commit_changes = matches.get_flag("commit_changes");
let verify = !matches.get_flag("no_verify");
Config {
db_url: String::from(db_url),
@@ -424,8 +419,7 @@ fn generate_sql<'a>(
new_map: &'a BTreeMap<i64, StateGroupEntry>,
room_id: &'a str,
) -> impl Iterator<Item = String> + 'a {
old_map.iter().map(move |(sg,old_entry)| {
old_map.iter().map(move |(sg, old_entry)| {
let new_entry = &new_map[sg];
// Check if the new map has a different entry for this state group
@@ -435,48 +429,50 @@ fn generate_sql<'a>(
let mut sql = String::new();
// remove the current edge
sql.push_str(&format!(
"DELETE FROM state_group_edges WHERE state_group = {};\n",
sg
));
writeln!(
sql,
"DELETE FROM state_group_edges WHERE state_group = {sg};",
)
.expect("Writing to a String cannot fail");
// if the new entry has a predecessor then put that into state_group_edges
if let Some(prev_sg) = new_entry.prev_state_group {
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
}
// remove the current deltas for this state group
sql.push_str(&format!(
"DELETE FROM state_groups_state WHERE state_group = {};\n",
sg
));
writeln!(
sql,
"DELETE FROM state_groups_state WHERE state_group = {sg};",
)
.unwrap();
if !new_entry.state_map.is_empty() {
// place all the deltas for the state group in the new map into state_groups_state
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
let mut first = true;
let room_id = PGEscape(room_id);
for ((t, s), e) in new_entry.state_map.iter() {
// Add a comma at the start if not the first row to be inserted
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
// write the row to be inserted of the form:
// (state_group, room_id, type, state_key, event_id)
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
}
// write the row to be insterted of the form:
// (state_group, room_id, type, state_key, event_id)
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
}
sql.push_str(";\n");
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
}
sql
@@ -514,10 +510,12 @@ fn output_sql(
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
if let Some(output) = &mut config.output_file {
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
@@ -565,7 +563,7 @@ pub fn continue_run(
let (state_group_map, max_group_found) =
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
let original_num_rows = state_group_map.values().map(|v| v.state_map.len()).sum();
// Now we actually call the compression algorithm.
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
@@ -628,10 +626,12 @@ fn check_that_maps_match(
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
// Now let's iterate through and assert that the state for each group
// matches between the two versions.

View File

@@ -16,24 +16,31 @@ classifier = [
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
tikv-jemallocator = "0.5.0"
rand = "0.8.0"
serial_test = "0.5.1"
serial_test = "0.9.0"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
env_logger = "0.9.0"
log = "0.4.14"
log-panics = "2.0.0"
anyhow = "1.0.42"
pyo3-log = "0.6.0"
pyo3-log = "0.7.0"
# Needed for pyo3 support
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "3.1.14"
version = "4.0.15"
features = ["cargo"]
[dependencies.pyo3]
version = "0.16.4"
version = "0.17.1"
features = ["extension-module"]
[dependencies.tikv-jemallocator]
version = "0.5.0"
optional = true
[features]
default = ["jemalloc"]
jemalloc = ["tikv-jemallocator"]

View File

@@ -26,7 +26,7 @@ pub mod state_saving;
///
/// This is needed since FromStr cannot be implemented for structs
/// that aren't defined in this scope
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct LevelInfo(pub Vec<Level>);
// Implement FromStr so that an argument of the form "100,50,25"

View File

@@ -16,6 +16,7 @@
//! the state_compressor_state table so that the compressor can seemlesly
//! continue from where it left off.
#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@@ -66,12 +67,13 @@ fn main() {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::new("chunk_size")
.short('c')
.value_name("COUNT")
.value_parser(clap::value_parser!(i64))
.help("The maximum number of state groups to load into memroy at once")
.long_help(concat!(
"The number of state_groups to work on at once. All of the entries",
@@ -82,12 +84,13 @@ fn main() {
" chunk as a whole (which may well happen in rooms with lots",
" of backfill in) then the entire chunk is skipped.)",
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::new("default_levels")
.short('l')
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelInfo))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -100,41 +103,43 @@ fn main() {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::new("number_of_chunks")
.short('n')
.value_name("CHUNKS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("The number of chunks to compress")
.long_help(concat!(
"This many chunks of the database will be compressed. The higher this number is set to, ",
"the longer the compressor will run for."
))
.takes_value(true)
.num_args(1)
.required(true),
).get_matches();
// The URL of the database
let db_url = arguments
.value_of("postgres-url")
.get_one::<String>("postgres-url")
.expect("A database url is required");
// The number of state groups to work on at once
let chunk_size = arguments
.value_of("chunk_size")
.map(|s| s.parse().expect("chunk_size must be an integer"))
.get_one("chunk_size")
.copied()
.expect("A chunk size is required");
// The default structure to use when compressing
let default_levels = arguments
.value_of_t::<LevelInfo>("default_levels")
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
.get_one::<LevelInfo>("default_levels")
.cloned()
.unwrap();
// The number of rooms to compress with this tool
let number_of_chunks = arguments
.value_of("number_of_chunks")
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
.get_one("number_of_chunks")
.copied()
.expect("number_of_chunks is required");
// Connect to the database and create the 2 tables this tool needs