20 Commits

Author SHA1 Message Date
Erik Johnston
1ffb727c28 Fmt 2023-03-27 11:18:36 +01:00
Erik Johnston
737d7adc48 Fix clippy
(This was fixed via `cargo clippy --fix`)
2023-03-27 11:16:33 +01:00
David Robertson
13882d7654 Merge pull request #113 from matrix-org/dependabot/cargo/tokio-1.25.0
Bump tokio from 1.24.1 to 1.25.0
2023-02-05 00:22:20 +00:00
dependabot[bot]
c0dac572c1 Bump tokio from 1.24.1 to 1.25.0
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.24.1 to 1.25.0.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.24.1...tokio-1.25.0)

---
updated-dependencies:
- dependency-name: tokio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-02-04 00:58:15 +00:00
David Robertson
856b799c53 Merge pull request #112 from matrix-org/dependabot/cargo/tokio-1.24.1
Bump tokio from 1.21.2 to 1.24.1
2023-01-09 14:24:56 +00:00
dependabot[bot]
aab4d37123 Bump tokio from 1.21.2 to 1.24.1
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.21.2 to 1.24.1.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.21.2...tokio-1.24.1)

---
updated-dependencies:
- dependency-name: tokio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-01-06 21:49:13 +00:00
Shay
fce2a7eee8 Merge pull request #111 from matrix-org/shay/rust_min_version
Update READ.me with information about Rust minimum version
2022-12-13 11:03:56 -08:00
Shay
74bd719262 Update README.md
Co-authored-by: David Robertson <davidr@element.io>
2022-12-02 10:28:16 -08:00
Shay
e3075d1451 Update READ.me with information about Rust minimum version 2022-12-02 10:19:49 -08:00
David Robertson
d22acc6906 Merge pull request #109 from kittykat/patch-2 2022-11-02 15:19:09 +00:00
Kat Gerasimova
88d97ea413 Add automation to move X-Needs-Info issues
When an issue is labelled with X-Needs-Info, it should move to the correct column on the issue triage board.
2022-11-02 15:05:04 +00:00
Jan Alexander Steffens
152808baca Fix clippy warnings, update dependencies (3) (#106) 2022-10-17 13:43:39 +01:00
Jelmer Vernooij
2596f25eea Qualify docker image name. (#104) 2022-10-05 10:45:08 +01:00
Kat Gerasimova
4d3049d3ed Add issue automation for triage (#103)
Move new issues to https://github.com/orgs/matrix-org/projects/67 for triage
2022-09-02 16:52:36 +01:00
Erik Johnston
9ff021f32e Add contributing guide (#102) 2022-08-03 15:18:54 +01:00
Landry Breuil
019b100521 make jemalloc dependency really optional (#101)
Signed-off-by: Sebastien Marie <semarie@online.fr>
2022-08-03 10:57:00 +01:00
Jan Alexander Steffens
da6271a331 Fix clippy warnings, update dependencies (again) (#100) 2022-08-03 10:52:47 +01:00
David Robertson
dd62afb3d5 Update lockfile; drop Python 3.6 support
Python 3.6 EOLed at the end of 2021, see https://endoflife.date/python.
(pyO3 was refusing to build against 3.6).
2022-07-07 19:23:33 +01:00
David Robertson
65ffce2362 Tag v0.1.3 2022-07-07 19:13:47 +01:00
Jan Alexander Steffens
b4f3d8adbd Fix clippy warnings, update dependencies (#91) 2022-06-06 10:34:07 +01:00
20 changed files with 844 additions and 593 deletions

28
.github/workflows/triage_incoming.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: Move new issues into the issue triage board
on:
issues:
types: [ opened ]
jobs:
add_new_issues:
name: Add new issues to the triage board
runs-on: ubuntu-latest
steps:
- uses: octokit/graphql-action@v2.x
id: add_to_project
with:
headers: '{"GraphQL-Features": "projects_next_graphql"}'
query: |
mutation add_to_project($projectid:ID!,$contentid:ID!) {
addProjectV2ItemById(input: {projectId: $projectid contentId: $contentid}) {
item {
id
}
}
}
projectid: ${{ env.PROJECT_ID }}
contentid: ${{ github.event.issue.node_id }}
env:
PROJECT_ID: "PVT_kwDOAIB0Bs4AFDdZ"
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}

44
.github/workflows/triage_labelled.yml vendored Normal file
View File

@@ -0,0 +1,44 @@
name: Move labelled issues to correct projects
on:
issues:
types: [ labeled ]
jobs:
move_needs_info:
name: Move X-Needs-Info on the triage board
runs-on: ubuntu-latest
if: >
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
steps:
- uses: actions/add-to-project@main
id: add_project
with:
project-url: "https://github.com/orgs/matrix-org/projects/67"
github-token: ${{ secrets.ELEMENT_BOT_TOKEN }}
- name: Set status
env:
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
run: |
gh api graphql -f query='
mutation(
$project: ID!
$item: ID!
$fieldid: ID!
$columnid: String!
) {
updateProjectV2ItemFieldValue(
input: {
projectId: $project
itemId: $item
fieldId: $fieldid
value: {
singleSelectOptionId: $columnid
}
}
) {
projectV2Item {
id
}
}
}' -f project="PVT_kwDOAIB0Bs4AFDdZ" -f item=${{ steps.add_project.outputs.itemId }} -f fieldid="PVTSSF_lADOAIB0Bs4AFDdZzgC6ZA4" -f columnid=ba22e43c --silent

80
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,80 @@
# Contributing
## Sign off
In order to have a concrete record that your contribution is intentional
and you agree to license it under the same terms as the project's license, we've adopted the
same lightweight approach that the Linux Kernel
[submitting patches process](
https://www.kernel.org/doc/html/latest/process/submitting-patches.html#sign-your-work-the-developer-s-certificate-of-origin>),
[Docker](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), and many other
projects use: the DCO ([Developer Certificate of Origin](http://developercertificate.org/)).
This is a simple declaration that you wrote
the contribution or otherwise have the right to contribute it to Matrix:
```
Developer Certificate of Origin
Version 1.1
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
660 York Street, Suite 102,
San Francisco, CA 94110 USA
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.
Developer's Certificate of Origin 1.1
By making a contribution to this project, I certify that:
(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or
(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or
(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.
(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
```
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment:
```
Signed-off-by: Your Name <your@email.example.org>
```
We accept contributions under a legally identifiable name, such as
your name on government documentation or common-law names (names
claimed by legitimate usage or repute). Unfortunately, we cannot
accept anonymous contributions at this time.
Git allows you to add this signoff automatically when using the `-s`
flag to `git commit`, which uses the name and email set in your
`user.name` and `user.email` git configs.
### Private Sign off
If you would like to provide your legal name privately to the Matrix.org
Foundation (instead of in a public commit or comment), you can do so
by emailing your legal name and a link to the pull request to
[dco@matrix.org](mailto:dco@matrix.org?subject=Private%20sign%20off).
It helps to include "sign off" or similar in the subject line. You will then
be instructed further.
Once private sign off is complete, doing so for future contributions will not
be required.

803
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,8 +9,7 @@ version = "0.1.0"
edition = "2018"
[dependencies]
clap = "2.33.0"
indicatif = "0.16.0"
indicatif = "0.17.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
@@ -19,7 +18,7 @@ rayon = "1.3.0"
string_cache = "0.8.0"
env_logger = "0.9.0"
log = "0.4.14"
pyo3-log = "0.4.0"
pyo3-log = "0.7.0"
log-panics = "2.0.0"
[dependencies.state-map]
@@ -29,12 +28,16 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "4.0.15"
features = ["cargo"]
[dependencies.pyo3]
version = "0.14.1"
features = ["extension-module","abi3-py36"]
version = "0.17.1"
features = ["extension-module"]
[dependencies.tikv-jemallocator]
version = "0.4.1"
version = "0.5.0"
optional = true
[features]

View File

@@ -1,4 +1,4 @@
FROM rust:alpine AS builder
FROM docker.io/rust:alpine AS builder
RUN apk add python3 musl-dev pkgconfig openssl-dev make
@@ -14,7 +14,7 @@ WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
RUN cargo build
FROM alpine
FROM docker.io/alpine
RUN apk add --no-cache libgcc

View File

@@ -26,6 +26,9 @@ periodically.
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
for instructions on how to do this.
This project follows the deprecation policy of [Synapse](https://matrix-org.github.io/synapse/latest/deprecation_policy.html)
on Rust and will assume a recent stable version of Rust and the ability to fetch a more recent one if necessary.
To build `synapse_auto_compressor`, clone this repository and navigate to the
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.

View File

@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
string_cache = "0.8.0"
serial_test = "0.5.1"
serial_test = "0.9.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"

View File

@@ -4,7 +4,12 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use state_map::StateMap;
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
use std::{
borrow::Cow,
collections::BTreeMap,
env,
fmt::{self, Write as _},
};
use string_cache::DefaultAtom as Atom;
use synapse_compress_state::StateGroupEntry;
@@ -23,47 +28,48 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
let mut client = Client::connect(DB_URL, connector).unwrap();
// build up the query
let mut sql = "".to_string();
let mut sql = String::new();
let room_id = PGEscape(room_id);
let event_id = PGEscape("left_blank");
for (sg, entry) in state_group_map {
// create the entry for state_groups
sql.push_str(&format!(
"INSERT INTO state_groups (id, room_id, event_id) VALUES ({},{},{});\n",
sg,
PGEscape(room_id),
PGEscape("left_blank")
));
writeln!(
sql,
"INSERT INTO state_groups (id, room_id, event_id) \
VALUES ({sg}, {room_id}, {event_id});",
)
.expect("Writing to a String cannot fail");
// create the entry in state_group_edges IF exists
if let Some(prev_sg) = entry.prev_state_group {
sql.push_str(&format!(
"INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n",
sg, prev_sg
));
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
}
// write entry for each row in delta
if !entry.state_map.is_empty() {
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
let mut first = true;
for ((t, s), e) in entry.state_map.iter() {
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
}
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
}
sql.push_str(";\n");
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
}
}

View File

@@ -49,8 +49,8 @@ fn run_succeeds_without_crashing() {
let verify = true;
let config = Config::new(
db_url.clone(),
room_id.clone(),
db_url,
room_id,
output_file,
min_state_group,
groups_to_compress,
@@ -525,14 +525,14 @@ fn run_is_idempotent_when_run_on_whole_room() {
.unwrap();
let config2 = Config::new(
db_url.clone(),
room_id.clone(),
db_url,
room_id,
output_file2,
min_state_group,
groups_to_compress,
min_saved_rows,
max_state_group,
level_sizes.clone(),
level_sizes,
transactions,
graphs,
commit_changes,

View File

@@ -56,7 +56,7 @@ fn continue_run_called_twice_same_as_run() {
let start = Some(6);
let chunk_size = 7;
let level_info = chunk_stats_1.new_level_info.clone();
let level_info = chunk_stats_1.new_level_info;
// Run the compressor with those settings
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();

View File

@@ -30,7 +30,7 @@
use indicatif::{ProgressBar, ProgressStyle};
use state_map::StateMap;
use std::collections::BTreeMap;
use std::{collections::BTreeMap, time::Duration};
use string_cache::DefaultAtom as Atom;
use super::{collapse_state_maps, StateGroupEntry};
@@ -156,7 +156,7 @@ impl<'a> Compressor<'a> {
) -> Compressor<'a> {
let levels = level_info
.iter()
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).head))
.map(|l| Level::restore(l.max_length, l.current_chain_length, l.head))
.collect();
let mut compressor = Compressor {
@@ -181,17 +181,18 @@ impl<'a> Compressor<'a> {
panic!("Can only call `create_new_tree` once");
}
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
} else {
pb = ProgressBar::new(self.original_state_map.len() as u64);
}
ProgressBar::new(self.original_state_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
for (&state_group, entry) in self.original_state_map {
// Check whether this entry is in_range or is just present in the map due to being

View File

@@ -96,11 +96,7 @@ fn create_new_tree_does_nothing_if_already_compressed() {
let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64>
let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
let prev = pred_group.copied();
// insert that edge into the initial map
initial.insert(

View File

@@ -54,7 +54,7 @@ fn get_head_returns_head() {
#[test]
fn has_space_returns_true_if_empty() {
let l = Level::new(15);
assert_eq!(l.has_space(), true);
assert!(l.has_space());
}
#[test]
@@ -65,7 +65,7 @@ fn has_space_returns_true_if_part_full() {
l.update(1, true);
l.update(143, true);
l.update(15, true);
assert_eq!(l.has_space(), true);
assert!(l.has_space());
}
#[test]
@@ -76,5 +76,5 @@ fn has_space_returns_false_if_full() {
l.update(3, true);
l.update(4, true);
l.update(5, true);
assert_eq!(l.has_space(), false);
assert!(!l.has_space());
}

View File

@@ -145,11 +145,7 @@ fn stats_correct_if_no_changes() {
let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64>
let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
let prev = pred_group.copied();
// insert that edge into the initial map
initial.insert(

View File

@@ -18,7 +18,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{borrow::Cow, collections::BTreeMap, fmt};
use std::{borrow::Cow, collections::BTreeMap, fmt, time::Duration};
use crate::{compressor::Level, generate_sql};
@@ -237,15 +237,9 @@ fn load_map_from_db(
let mut missing_sgs: Vec<_> = state_group_map
.iter()
.filter_map(|(_sg, entry)| {
if let Some(prev_sg) = entry.prev_state_group {
if state_group_map.contains_key(&prev_sg) {
None
} else {
Some(prev_sg)
}
} else {
None
}
entry
.prev_state_group
.filter(|&prev_sg| !state_group_map.contains_key(&prev_sg))
})
.collect();
@@ -372,16 +366,17 @@ fn get_initial_data_from_db(
// Copy the data from the database into a map
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
} else {
pb = ProgressBar::new_spinner();
}
ProgressBar::new_spinner()
};
pb.set_style(
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
ProgressStyle::default_spinner()
.template("{spinner} [{elapsed}] {pos} rows retrieved")
.unwrap(),
);
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
while let Some(row) = rows.next().unwrap() {
// The row in the map to copy the data to
@@ -537,17 +532,18 @@ pub fn send_changes_to_db(
debug!("Writing changes...");
// setup the progress bar
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
} else {
pb = ProgressBar::new(old_map.len() as u64);
}
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
for sql_transaction in generate_sql(old_map, new_map, room_id) {
if sql_transaction.is_empty() {

View File

@@ -23,11 +23,14 @@
use log::{info, warn, LevelFilter};
use pyo3::{exceptions, prelude::*};
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use state_map::StateMap;
use std::{collections::BTreeMap, convert::TryInto, fs::File, io::Write, str::FromStr};
use std::{
collections::BTreeMap, convert::TryInto, fmt::Write as _, fs::File, io::Write, str::FromStr,
time::Duration,
};
use string_cache::DefaultAtom as Atom;
mod compressor;
@@ -49,7 +52,7 @@ pub struct StateGroupEntry {
}
/// Helper struct for parsing the `level_sizes` argument.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
struct LevelSizes(Vec<usize>);
impl FromStr for LevelSizes {
@@ -117,13 +120,13 @@ pub struct Config {
impl Config {
/// Build up config from command line arguments
pub fn parse_arguments() -> Config {
let matches = App::new(crate_name!())
let matches = Command::new(crate_name!())
.version(crate_version!())
.author(crate_authors!("\n"))
.about(crate_description!())
.arg(
Arg::with_name("postgres-url")
.short("p")
Arg::new("postgres-url")
.short('p')
.value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.")
.long_help(concat!(
@@ -133,64 +136,69 @@ impl Config {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::with_name("room_id")
.short("r")
Arg::new("room_id")
.short('r')
.value_name("ROOM_ID")
.help("The room to process")
.long_help(concat!(
"The room to process. This is the value found in the rooms table of the database",
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::with_name("min_state_group")
.short("b")
Arg::new("min_state_group")
.short('b')
.value_name("MIN_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The state group to start processing from (non inclusive)")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::with_name("min_saved_rows")
.short("m")
Arg::new("min_saved_rows")
.short('m')
.value_name("COUNT")
.value_parser(clap::value_parser!(i32))
.help("Abort if fewer than COUNT rows would be saved")
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::with_name("groups_to_compress")
.short("n")
Arg::new("groups_to_compress")
.short('n')
.value_name("GROUPS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("How many groups to load into memory to compress")
.long_help(concat!(
"How many groups to load into memory to compress (starting from",
" the 1st group in the room or the group specified by -s)"))
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::with_name("output_file")
.short("o")
Arg::new("output_file")
.short('o')
.value_name("FILE")
.help("File to output the changes to in SQL")
.takes_value(true),
.num_args(1),
).arg(
Arg::with_name("max_state_group")
.short("s")
Arg::new("max_state_group")
.short('s')
.value_name("MAX_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The maximum state group to process up to")
.long_help(concat!(
"If a max_state_group is specified then only state groups with id's lower",
" than this number are able to be compressed."))
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::with_name("level_sizes")
.short("l")
Arg::new("level_sizes")
.short('l')
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelSizes))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -203,32 +211,36 @@ impl Config {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.takes_value(true),
.num_args(1),
).arg(
Arg::with_name("transactions")
.short("t")
Arg::new("transactions")
.short('t')
.action(clap::ArgAction::SetTrue)
.help("Whether to wrap each state group change in a transaction")
.long_help(concat!("If this flag is set then then each change to a particular",
" state group is wrapped in a transaction. This should be done if you wish to",
" apply the changes while synapse is still running."))
.requires("output_file"),
).arg(
Arg::with_name("graphs")
.short("g")
Arg::new("graphs")
.short('g')
.action(clap::ArgAction::SetTrue)
.help("Output before and after graphs")
.long_help(concat!("If this flag is set then output the node and edge information for",
" the state_group directed graph built up from the predecessor state_group links.",
" These can be looked at in something like Gephi (https://gephi.org)")),
).arg(
Arg::with_name("commit_changes")
.short("c")
Arg::new("commit_changes")
.short('c')
.action(clap::ArgAction::SetTrue)
.help("Commit changes to the database")
.long_help(concat!("If this flag is set then the changes the compressor makes will",
" be committed to the database. This should be safe to use while synapse is running",
" as it assumes by default that the transactions flag is set")),
).arg(
Arg::with_name("no_verify")
.short("N")
Arg::new("no_verify")
.short('N')
.action(clap::ArgAction::SetTrue)
.help("Do not double-check that the compression was performed correctly")
.long_help(concat!("If this flag is set then the verification of the compressed",
" state groups, which compares them to the original groups, is skipped. This",
@@ -236,43 +248,27 @@ impl Config {
).get_matches();
let db_url = matches
.value_of("postgres-url")
.get_one::<String>("postgres-url")
.expect("db url should be required");
let output_file = matches.value_of("output_file").map(|path| {
let output_file = matches.get_one::<String>("output_file").map(|path| {
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
});
let room_id = matches
.value_of("room_id")
.get_one::<String>("room_id")
.expect("room_id should be required since no file");
let min_state_group = matches
.value_of("min_state_group")
.map(|s| s.parse().expect("min_state_group must be an integer"));
let min_state_group = matches.get_one("min_state_group").copied();
let groups_to_compress = matches.get_one("groups_to_compress").copied();
let min_saved_rows = matches.get_one("min_saved_rows").copied();
let max_state_group = matches.get_one("max_state_group").copied();
let level_sizes = matches.get_one("level_sizes").cloned().unwrap();
let groups_to_compress = matches
.value_of("groups_to_compress")
.map(|s| s.parse().expect("groups_to_compress must be an integer"));
let min_saved_rows = matches
.value_of("min_saved_rows")
.map(|v| v.parse().expect("COUNT must be an integer"));
let max_state_group = matches
.value_of("max_state_group")
.map(|s| s.parse().expect("max_state_group must be an integer"));
let level_sizes = value_t!(matches, "level_sizes", LevelSizes)
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
let transactions = matches.is_present("transactions");
let graphs = matches.is_present("graphs");
let commit_changes = matches.is_present("commit_changes");
let verify = !matches.is_present("no_verify");
let transactions = matches.get_flag("transactions");
let graphs = matches.get_flag("graphs");
let commit_changes = matches.get_flag("commit_changes");
let verify = !matches.get_flag("no_verify");
Config {
db_url: String::from(db_url),
@@ -423,8 +419,7 @@ fn generate_sql<'a>(
new_map: &'a BTreeMap<i64, StateGroupEntry>,
room_id: &'a str,
) -> impl Iterator<Item = String> + 'a {
old_map.iter().map(move |(sg,old_entry)| {
old_map.iter().map(move |(sg, old_entry)| {
let new_entry = &new_map[sg];
// Check if the new map has a different entry for this state group
@@ -434,48 +429,50 @@ fn generate_sql<'a>(
let mut sql = String::new();
// remove the current edge
sql.push_str(&format!(
"DELETE FROM state_group_edges WHERE state_group = {};\n",
sg
));
writeln!(
sql,
"DELETE FROM state_group_edges WHERE state_group = {sg};",
)
.expect("Writing to a String cannot fail");
// if the new entry has a predecessor then put that into state_group_edges
if let Some(prev_sg) = new_entry.prev_state_group {
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
}
// remove the current deltas for this state group
sql.push_str(&format!(
"DELETE FROM state_groups_state WHERE state_group = {};\n",
sg
));
writeln!(
sql,
"DELETE FROM state_groups_state WHERE state_group = {sg};",
)
.unwrap();
if !new_entry.state_map.is_empty() {
// place all the deltas for the state group in the new map into state_groups_state
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
let mut first = true;
let room_id = PGEscape(room_id);
for ((t, s), e) in new_entry.state_map.iter() {
// Add a comma at the start if not the first row to be inserted
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
}
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
// write the row to be insterted of the form:
// write the row to be inserted of the form:
// (state_group, room_id, type, state_key, event_id)
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
}
sql.push_str(";\n");
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
}
sql
@@ -507,17 +504,18 @@ fn output_sql(
info!("Writing changes...");
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
} else {
pb = ProgressBar::new(old_map.len() as u64);
}
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
if let Some(output) = &mut config.output_file {
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
@@ -565,7 +563,7 @@ pub fn continue_run(
let (state_group_map, max_group_found) =
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
let original_num_rows = state_group_map.values().map(|v| v.state_map.len()).sum();
// Now we actually call the compression algorithm.
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
@@ -622,17 +620,18 @@ fn check_that_maps_match(
) {
info!("Checking that state maps match...");
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
} else {
pb = ProgressBar::new(old_map.len() as u64);
}
ProgressBar::new(old_map.len() as u64)
};
pb.set_style(
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
);
pb.set_message("state groups");
pb.enable_steady_tick(100);
pb.enable_steady_tick(Duration::from_millis(100));
// Now let's iterate through and assert that the state for each group
// matches between the two versions.
@@ -975,7 +974,6 @@ mod lib_tests {
#[test]
fn check_that_maps_match_returns_if_both_empty() {
check_that_maps_match(&BTreeMap::new(), &BTreeMap::new());
assert!(true);
}
#[test]
@@ -1008,7 +1006,6 @@ mod lib_tests {
}
check_that_maps_match(&old_map, &BTreeMap::new());
assert!(true);
}
#[test]
@@ -1044,7 +1041,6 @@ mod lib_tests {
}
check_that_maps_match(&BTreeMap::new(), &new_map);
assert!(true);
}
#[test]
@@ -1076,7 +1072,6 @@ mod lib_tests {
}
check_that_maps_match(&BTreeMap::new(), &old_map.clone());
assert!(true);
}
#[test]
@@ -1139,7 +1134,6 @@ mod lib_tests {
}
check_that_maps_match(&old_map, &new_map);
assert!(true);
}
#[test]
@@ -1221,7 +1215,6 @@ mod lib_tests {
);
check_that_maps_match(&old_map, &new_map);
assert!(true);
}
//TODO: tests for correct SQL code produced by output_sql
@@ -1311,7 +1304,7 @@ mod pyo3_tests {
.unwrap();
assert_eq!(config.db_url, db_url);
assert!(!config.output_file.is_none());
assert!(config.output_file.is_some());
assert_eq!(config.room_id, room_id);
assert_eq!(config.min_state_group, Some(3225));
assert_eq!(config.groups_to_compress, Some(970));

View File

@@ -1,11 +1,11 @@
[package]
name = "synapse_auto_compressor"
authors = ["William Ashton"]
version = "0.1.2"
version = "0.1.3"
edition = "2018"
[package.metadata.maturin]
requires-python = ">=3.6"
requires-python = ">=3.7"
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
classifier = [
"Development Status :: 4 - Beta",
@@ -13,24 +13,34 @@ classifier = [
]
[dependencies]
clap = "2.33.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
tikv-jemallocator = "0.4.1"
rand = "0.8.0"
serial_test = "0.5.1"
serial_test = "0.9.0"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
env_logger = "0.9.0"
log = "0.4.14"
log-panics = "2.0.0"
anyhow = "1.0.42"
pyo3-log = "0.4.0"
pyo3-log = "0.7.0"
# Needed for pyo3 support
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "4.0.15"
features = ["cargo"]
[dependencies.pyo3]
version = "0.14.1"
features = ["extension-module","abi3-py36"]
version = "0.17.1"
features = ["extension-module"]
[dependencies.tikv-jemallocator]
version = "0.5.0"
optional = true
[features]
default = ["jemalloc"]
jemalloc = ["tikv-jemallocator"]

View File

@@ -26,7 +26,7 @@ pub mod state_saving;
///
/// This is needed since FromStr cannot be implemented for structs
/// that aren't defined in this scope
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct LevelInfo(pub Vec<Level>);
// Implement FromStr so that an argument of the form "100,50,25"

View File

@@ -16,10 +16,11 @@
//! the state_compressor_state table so that the compressor can seemlesly
//! continue from where it left off.
#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
use log::LevelFilter;
use std::env;
use synapse_auto_compressor::{manager, state_saving, LevelInfo};
@@ -50,13 +51,13 @@ fn main() {
log::info!("synapse_auto_compressor started");
// parse the command line arguments using the clap crate
let arguments = App::new(crate_name!())
let arguments = Command::new(crate_name!())
.version(crate_version!())
.author(crate_authors!("\n"))
.about(crate_description!())
.arg(
Arg::with_name("postgres-url")
.short("p")
Arg::new("postgres-url")
.short('p')
.value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.")
.long_help(concat!(
@@ -66,12 +67,13 @@ fn main() {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::with_name("chunk_size")
.short("c")
Arg::new("chunk_size")
.short('c')
.value_name("COUNT")
.value_parser(clap::value_parser!(i64))
.help("The maximum number of state groups to load into memroy at once")
.long_help(concat!(
"The number of state_groups to work on at once. All of the entries",
@@ -82,12 +84,13 @@ fn main() {
" chunk as a whole (which may well happen in rooms with lots",
" of backfill in) then the entire chunk is skipped.)",
))
.takes_value(true)
.num_args(1)
.required(true),
).arg(
Arg::with_name("default_levels")
.short("l")
Arg::new("default_levels")
.short('l')
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelInfo))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -100,40 +103,43 @@ fn main() {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.takes_value(true)
.num_args(1)
.required(false),
).arg(
Arg::with_name("number_of_chunks")
.short("n")
Arg::new("number_of_chunks")
.short('n')
.value_name("CHUNKS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("The number of chunks to compress")
.long_help(concat!(
"This many chunks of the database will be compressed. The higher this number is set to, ",
"the longer the compressor will run for."
))
.takes_value(true)
.num_args(1)
.required(true),
).get_matches();
// The URL of the database
let db_url = arguments
.value_of("postgres-url")
.get_one::<String>("postgres-url")
.expect("A database url is required");
// The number of state groups to work on at once
let chunk_size = arguments
.value_of("chunk_size")
.map(|s| s.parse().expect("chunk_size must be an integer"))
.get_one("chunk_size")
.copied()
.expect("A chunk size is required");
// The default structure to use when compressing
let default_levels = value_t!(arguments, "default_levels", LevelInfo)
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
let default_levels = arguments
.get_one::<LevelInfo>("default_levels")
.cloned()
.unwrap();
// The number of rooms to compress with this tool
let number_of_chunks = arguments
.value_of("number_of_chunks")
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
.get_one("number_of_chunks")
.copied()
.expect("number_of_chunks is required");
// Connect to the database and create the 2 tables this tool needs