Compare commits
1 Commits
erikj/fix_
...
erikj/cont
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3dcdf01708 |
28
.github/workflows/triage_incoming.yml
vendored
28
.github/workflows/triage_incoming.yml
vendored
@@ -1,28 +0,0 @@
|
|||||||
name: Move new issues into the issue triage board
|
|
||||||
|
|
||||||
on:
|
|
||||||
issues:
|
|
||||||
types: [ opened ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
add_new_issues:
|
|
||||||
name: Add new issues to the triage board
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: octokit/graphql-action@v2.x
|
|
||||||
id: add_to_project
|
|
||||||
with:
|
|
||||||
headers: '{"GraphQL-Features": "projects_next_graphql"}'
|
|
||||||
query: |
|
|
||||||
mutation add_to_project($projectid:ID!,$contentid:ID!) {
|
|
||||||
addProjectV2ItemById(input: {projectId: $projectid contentId: $contentid}) {
|
|
||||||
item {
|
|
||||||
id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
projectid: ${{ env.PROJECT_ID }}
|
|
||||||
contentid: ${{ github.event.issue.node_id }}
|
|
||||||
env:
|
|
||||||
PROJECT_ID: "PVT_kwDOAIB0Bs4AFDdZ"
|
|
||||||
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
|
||||||
44
.github/workflows/triage_labelled.yml
vendored
44
.github/workflows/triage_labelled.yml
vendored
@@ -1,44 +0,0 @@
|
|||||||
name: Move labelled issues to correct projects
|
|
||||||
|
|
||||||
on:
|
|
||||||
issues:
|
|
||||||
types: [ labeled ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
move_needs_info:
|
|
||||||
name: Move X-Needs-Info on the triage board
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
if: >
|
|
||||||
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
|
|
||||||
steps:
|
|
||||||
- uses: actions/add-to-project@main
|
|
||||||
id: add_project
|
|
||||||
with:
|
|
||||||
project-url: "https://github.com/orgs/matrix-org/projects/67"
|
|
||||||
github-token: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
|
||||||
- name: Set status
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
|
||||||
run: |
|
|
||||||
gh api graphql -f query='
|
|
||||||
mutation(
|
|
||||||
$project: ID!
|
|
||||||
$item: ID!
|
|
||||||
$fieldid: ID!
|
|
||||||
$columnid: String!
|
|
||||||
) {
|
|
||||||
updateProjectV2ItemFieldValue(
|
|
||||||
input: {
|
|
||||||
projectId: $project
|
|
||||||
itemId: $item
|
|
||||||
fieldId: $fieldid
|
|
||||||
value: {
|
|
||||||
singleSelectOptionId: $columnid
|
|
||||||
}
|
|
||||||
}
|
|
||||||
) {
|
|
||||||
projectV2Item {
|
|
||||||
id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}' -f project="PVT_kwDOAIB0Bs4AFDdZ" -f item=${{ steps.add_project.outputs.itemId }} -f fieldid="PVTSSF_lADOAIB0Bs4AFDdZzgC6ZA4" -f columnid=ba22e43c --silent
|
|
||||||
528
Cargo.lock
generated
528
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -9,7 +9,7 @@ version = "0.1.0"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
indicatif = "0.17.0"
|
indicatif = "0.16.0"
|
||||||
openssl = "0.10.32"
|
openssl = "0.10.32"
|
||||||
postgres = "0.19.0"
|
postgres = "0.19.0"
|
||||||
postgres-openssl = "0.5.0"
|
postgres-openssl = "0.5.0"
|
||||||
@@ -18,7 +18,7 @@ rayon = "1.3.0"
|
|||||||
string_cache = "0.8.0"
|
string_cache = "0.8.0"
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
pyo3-log = "0.7.0"
|
pyo3-log = "0.6.0"
|
||||||
log-panics = "2.0.0"
|
log-panics = "2.0.0"
|
||||||
|
|
||||||
[dependencies.state-map]
|
[dependencies.state-map]
|
||||||
@@ -29,11 +29,11 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
|
|||||||
crate-type = ["cdylib", "rlib"]
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
|
||||||
[dependencies.clap]
|
[dependencies.clap]
|
||||||
version = "4.0.15"
|
version = "3.1.14"
|
||||||
features = ["cargo"]
|
features = ["cargo"]
|
||||||
|
|
||||||
[dependencies.pyo3]
|
[dependencies.pyo3]
|
||||||
version = "0.17.1"
|
version = "0.16.4"
|
||||||
features = ["extension-module"]
|
features = ["extension-module"]
|
||||||
|
|
||||||
[dependencies.tikv-jemallocator]
|
[dependencies.tikv-jemallocator]
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM docker.io/rust:alpine AS builder
|
FROM rust:alpine AS builder
|
||||||
|
|
||||||
RUN apk add python3 musl-dev pkgconfig openssl-dev make
|
RUN apk add python3 musl-dev pkgconfig openssl-dev make
|
||||||
|
|
||||||
@@ -14,7 +14,7 @@ WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
|
|||||||
|
|
||||||
RUN cargo build
|
RUN cargo build
|
||||||
|
|
||||||
FROM docker.io/alpine
|
FROM alpine
|
||||||
|
|
||||||
RUN apk add --no-cache libgcc
|
RUN apk add --no-cache libgcc
|
||||||
|
|
||||||
|
|||||||
@@ -26,9 +26,6 @@ periodically.
|
|||||||
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
||||||
for instructions on how to do this.
|
for instructions on how to do this.
|
||||||
|
|
||||||
This project follows the deprecation policy of [Synapse](https://matrix-org.github.io/synapse/latest/deprecation_policy.html)
|
|
||||||
on Rust and will assume a recent stable version of Rust and the ability to fetch a more recent one if necessary.
|
|
||||||
|
|
||||||
To build `synapse_auto_compressor`, clone this repository and navigate to the
|
To build `synapse_auto_compressor`, clone this repository and navigate to the
|
||||||
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.
|
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ edition = "2018"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
string_cache = "0.8.0"
|
string_cache = "0.8.0"
|
||||||
serial_test = "0.9.0"
|
serial_test = "0.5.1"
|
||||||
openssl = "0.10.32"
|
openssl = "0.10.32"
|
||||||
postgres = "0.19.0"
|
postgres = "0.19.0"
|
||||||
postgres-openssl = "0.5.0"
|
postgres-openssl = "0.5.0"
|
||||||
|
|||||||
@@ -4,12 +4,7 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
|
|||||||
use postgres_openssl::MakeTlsConnector;
|
use postgres_openssl::MakeTlsConnector;
|
||||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||||
use state_map::StateMap;
|
use state_map::StateMap;
|
||||||
use std::{
|
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
|
||||||
borrow::Cow,
|
|
||||||
collections::BTreeMap,
|
|
||||||
env,
|
|
||||||
fmt::{self, Write as _},
|
|
||||||
};
|
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
use synapse_compress_state::StateGroupEntry;
|
use synapse_compress_state::StateGroupEntry;
|
||||||
@@ -28,48 +23,47 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
|
|||||||
let mut client = Client::connect(DB_URL, connector).unwrap();
|
let mut client = Client::connect(DB_URL, connector).unwrap();
|
||||||
|
|
||||||
// build up the query
|
// build up the query
|
||||||
let mut sql = String::new();
|
let mut sql = "".to_string();
|
||||||
|
|
||||||
let room_id = PGEscape(room_id);
|
|
||||||
let event_id = PGEscape("left_blank");
|
|
||||||
|
|
||||||
for (sg, entry) in state_group_map {
|
for (sg, entry) in state_group_map {
|
||||||
// create the entry for state_groups
|
// create the entry for state_groups
|
||||||
writeln!(
|
sql.push_str(&format!(
|
||||||
sql,
|
"INSERT INTO state_groups (id, room_id, event_id) VALUES ({},{},{});\n",
|
||||||
"INSERT INTO state_groups (id, room_id, event_id) \
|
sg,
|
||||||
VALUES ({sg}, {room_id}, {event_id});",
|
PGEscape(room_id),
|
||||||
)
|
PGEscape("left_blank")
|
||||||
.expect("Writing to a String cannot fail");
|
));
|
||||||
|
|
||||||
// create the entry in state_group_edges IF exists
|
// create the entry in state_group_edges IF exists
|
||||||
if let Some(prev_sg) = entry.prev_state_group {
|
if let Some(prev_sg) = entry.prev_state_group {
|
||||||
writeln!(
|
sql.push_str(&format!(
|
||||||
sql,
|
"INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n",
|
||||||
"INSERT INTO state_group_edges (state_group, prev_state_group) \
|
sg, prev_sg
|
||||||
VALUES ({sg}, {prev_sg});",
|
));
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// write entry for each row in delta
|
// write entry for each row in delta
|
||||||
if !entry.state_map.is_empty() {
|
if !entry.state_map.is_empty() {
|
||||||
sql.push_str(
|
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
|
||||||
"INSERT INTO state_groups_state \
|
|
||||||
(state_group, room_id, type, state_key, event_id) \
|
|
||||||
VALUES\n",
|
|
||||||
);
|
|
||||||
|
|
||||||
|
let mut first = true;
|
||||||
for ((t, s), e) in entry.state_map.iter() {
|
for ((t, s), e) in entry.state_map.iter() {
|
||||||
let t = PGEscape(t);
|
if first {
|
||||||
let s = PGEscape(s);
|
sql.push_str(" ");
|
||||||
let e = PGEscape(e);
|
first = false;
|
||||||
|
} else {
|
||||||
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
|
sql.push_str(" ,");
|
||||||
|
}
|
||||||
|
sql.push_str(&format!(
|
||||||
|
"({}, {}, {}, {}, {})",
|
||||||
|
sg,
|
||||||
|
PGEscape(room_id),
|
||||||
|
PGEscape(t),
|
||||||
|
PGEscape(s),
|
||||||
|
PGEscape(e)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
sql.push_str(";\n");
|
||||||
// Replace the last comma with a semicolon
|
|
||||||
sql.replace_range((sql.len() - 2).., ";\n");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,7 @@
|
|||||||
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use state_map::StateMap;
|
use state_map::StateMap;
|
||||||
use std::{collections::BTreeMap, time::Duration};
|
use std::collections::BTreeMap;
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
use super::{collapse_state_maps, StateGroupEntry};
|
use super::{collapse_state_maps, StateGroupEntry};
|
||||||
@@ -156,7 +156,7 @@ impl<'a> Compressor<'a> {
|
|||||||
) -> Compressor<'a> {
|
) -> Compressor<'a> {
|
||||||
let levels = level_info
|
let levels = level_info
|
||||||
.iter()
|
.iter()
|
||||||
.map(|l| Level::restore(l.max_length, l.current_chain_length, l.head))
|
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).head))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut compressor = Compressor {
|
let mut compressor = Compressor {
|
||||||
@@ -187,12 +187,10 @@ impl<'a> Compressor<'a> {
|
|||||||
ProgressBar::new(self.original_state_map.len() as u64)
|
ProgressBar::new(self.original_state_map.len() as u64)
|
||||||
};
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar()
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
|
||||||
.unwrap(),
|
|
||||||
);
|
);
|
||||||
pb.set_message("state groups");
|
pb.set_message("state groups");
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
for (&state_group, entry) in self.original_state_map {
|
for (&state_group, entry) in self.original_state_map {
|
||||||
// Check whether this entry is in_range or is just present in the map due to being
|
// Check whether this entry is in_range or is just present in the map due to being
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
|||||||
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
|
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
|
||||||
use postgres_openssl::MakeTlsConnector;
|
use postgres_openssl::MakeTlsConnector;
|
||||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||||
use std::{borrow::Cow, collections::BTreeMap, fmt, time::Duration};
|
use std::{borrow::Cow, collections::BTreeMap, fmt};
|
||||||
|
|
||||||
use crate::{compressor::Level, generate_sql};
|
use crate::{compressor::Level, generate_sql};
|
||||||
|
|
||||||
@@ -237,9 +237,15 @@ fn load_map_from_db(
|
|||||||
let mut missing_sgs: Vec<_> = state_group_map
|
let mut missing_sgs: Vec<_> = state_group_map
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(_sg, entry)| {
|
.filter_map(|(_sg, entry)| {
|
||||||
entry
|
if let Some(prev_sg) = entry.prev_state_group {
|
||||||
.prev_state_group
|
if state_group_map.contains_key(&prev_sg) {
|
||||||
.filter(|&prev_sg| !state_group_map.contains_key(&prev_sg))
|
None
|
||||||
|
} else {
|
||||||
|
Some(prev_sg)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -372,11 +378,9 @@ fn get_initial_data_from_db(
|
|||||||
ProgressBar::new_spinner()
|
ProgressBar::new_spinner()
|
||||||
};
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_spinner()
|
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
|
||||||
.template("{spinner} [{elapsed}] {pos} rows retrieved")
|
|
||||||
.unwrap(),
|
|
||||||
);
|
);
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
while let Some(row) = rows.next().unwrap() {
|
while let Some(row) = rows.next().unwrap() {
|
||||||
// The row in the map to copy the data to
|
// The row in the map to copy the data to
|
||||||
@@ -538,12 +542,10 @@ pub fn send_changes_to_db(
|
|||||||
ProgressBar::new(old_map.len() as u64)
|
ProgressBar::new(old_map.len() as u64)
|
||||||
};
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar()
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
|
||||||
.unwrap(),
|
|
||||||
);
|
);
|
||||||
pb.set_message("state groups");
|
pb.set_message("state groups");
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
for sql_transaction in generate_sql(old_map, new_map, room_id) {
|
for sql_transaction in generate_sql(old_map, new_map, room_id) {
|
||||||
if sql_transaction.is_empty() {
|
if sql_transaction.is_empty() {
|
||||||
|
|||||||
148
src/lib.rs
148
src/lib.rs
@@ -27,10 +27,7 @@ use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Com
|
|||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use state_map::StateMap;
|
use state_map::StateMap;
|
||||||
use std::{
|
use std::{collections::BTreeMap, convert::TryInto, fs::File, io::Write, str::FromStr};
|
||||||
collections::BTreeMap, convert::TryInto, fmt::Write as _, fs::File, io::Write, str::FromStr,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
mod compressor;
|
mod compressor;
|
||||||
@@ -52,7 +49,7 @@ pub struct StateGroupEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Helper struct for parsing the `level_sizes` argument.
|
/// Helper struct for parsing the `level_sizes` argument.
|
||||||
#[derive(PartialEq, Debug, Clone)]
|
#[derive(PartialEq, Debug)]
|
||||||
struct LevelSizes(Vec<usize>);
|
struct LevelSizes(Vec<usize>);
|
||||||
|
|
||||||
impl FromStr for LevelSizes {
|
impl FromStr for LevelSizes {
|
||||||
@@ -136,7 +133,7 @@ impl Config {
|
|||||||
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
||||||
"for the full details."
|
"for the full details."
|
||||||
))
|
))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("room_id")
|
Arg::new("room_id")
|
||||||
@@ -147,58 +144,53 @@ impl Config {
|
|||||||
"The room to process. This is the value found in the rooms table of the database",
|
"The room to process. This is the value found in the rooms table of the database",
|
||||||
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
|
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
|
||||||
))
|
))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("min_state_group")
|
Arg::new("min_state_group")
|
||||||
.short('b')
|
.short('b')
|
||||||
.value_name("MIN_STATE_GROUP")
|
.value_name("MIN_STATE_GROUP")
|
||||||
.value_parser(clap::value_parser!(i64))
|
|
||||||
.help("The state group to start processing from (non inclusive)")
|
.help("The state group to start processing from (non inclusive)")
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("min_saved_rows")
|
Arg::new("min_saved_rows")
|
||||||
.short('m')
|
.short('m')
|
||||||
.value_name("COUNT")
|
.value_name("COUNT")
|
||||||
.value_parser(clap::value_parser!(i32))
|
|
||||||
.help("Abort if fewer than COUNT rows would be saved")
|
.help("Abort if fewer than COUNT rows would be saved")
|
||||||
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
|
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("groups_to_compress")
|
Arg::new("groups_to_compress")
|
||||||
.short('n')
|
.short('n')
|
||||||
.value_name("GROUPS_TO_COMPRESS")
|
.value_name("GROUPS_TO_COMPRESS")
|
||||||
.value_parser(clap::value_parser!(i64))
|
|
||||||
.help("How many groups to load into memory to compress")
|
.help("How many groups to load into memory to compress")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"How many groups to load into memory to compress (starting from",
|
"How many groups to load into memory to compress (starting from",
|
||||||
" the 1st group in the room or the group specified by -s)"))
|
" the 1st group in the room or the group specified by -s)"))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("output_file")
|
Arg::new("output_file")
|
||||||
.short('o')
|
.short('o')
|
||||||
.value_name("FILE")
|
.value_name("FILE")
|
||||||
.help("File to output the changes to in SQL")
|
.help("File to output the changes to in SQL")
|
||||||
.num_args(1),
|
.takes_value(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("max_state_group")
|
Arg::new("max_state_group")
|
||||||
.short('s')
|
.short('s')
|
||||||
.value_name("MAX_STATE_GROUP")
|
.value_name("MAX_STATE_GROUP")
|
||||||
.value_parser(clap::value_parser!(i64))
|
|
||||||
.help("The maximum state group to process up to")
|
.help("The maximum state group to process up to")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"If a max_state_group is specified then only state groups with id's lower",
|
"If a max_state_group is specified then only state groups with id's lower",
|
||||||
" than this number are able to be compressed."))
|
" than this number are able to be compressed."))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("level_sizes")
|
Arg::new("level_sizes")
|
||||||
.short('l')
|
.short('l')
|
||||||
.value_name("LEVELS")
|
.value_name("LEVELS")
|
||||||
.value_parser(clap::value_parser!(LevelSizes))
|
|
||||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
||||||
@@ -211,11 +203,10 @@ impl Config {
|
|||||||
" iterations needed to fetch a given set of state.",
|
" iterations needed to fetch a given set of state.",
|
||||||
))
|
))
|
||||||
.default_value("100,50,25")
|
.default_value("100,50,25")
|
||||||
.num_args(1),
|
.takes_value(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("transactions")
|
Arg::new("transactions")
|
||||||
.short('t')
|
.short('t')
|
||||||
.action(clap::ArgAction::SetTrue)
|
|
||||||
.help("Whether to wrap each state group change in a transaction")
|
.help("Whether to wrap each state group change in a transaction")
|
||||||
.long_help(concat!("If this flag is set then then each change to a particular",
|
.long_help(concat!("If this flag is set then then each change to a particular",
|
||||||
" state group is wrapped in a transaction. This should be done if you wish to",
|
" state group is wrapped in a transaction. This should be done if you wish to",
|
||||||
@@ -224,7 +215,6 @@ impl Config {
|
|||||||
).arg(
|
).arg(
|
||||||
Arg::new("graphs")
|
Arg::new("graphs")
|
||||||
.short('g')
|
.short('g')
|
||||||
.action(clap::ArgAction::SetTrue)
|
|
||||||
.help("Output before and after graphs")
|
.help("Output before and after graphs")
|
||||||
.long_help(concat!("If this flag is set then output the node and edge information for",
|
.long_help(concat!("If this flag is set then output the node and edge information for",
|
||||||
" the state_group directed graph built up from the predecessor state_group links.",
|
" the state_group directed graph built up from the predecessor state_group links.",
|
||||||
@@ -232,7 +222,6 @@ impl Config {
|
|||||||
).arg(
|
).arg(
|
||||||
Arg::new("commit_changes")
|
Arg::new("commit_changes")
|
||||||
.short('c')
|
.short('c')
|
||||||
.action(clap::ArgAction::SetTrue)
|
|
||||||
.help("Commit changes to the database")
|
.help("Commit changes to the database")
|
||||||
.long_help(concat!("If this flag is set then the changes the compressor makes will",
|
.long_help(concat!("If this flag is set then the changes the compressor makes will",
|
||||||
" be committed to the database. This should be safe to use while synapse is running",
|
" be committed to the database. This should be safe to use while synapse is running",
|
||||||
@@ -240,7 +229,6 @@ impl Config {
|
|||||||
).arg(
|
).arg(
|
||||||
Arg::new("no_verify")
|
Arg::new("no_verify")
|
||||||
.short('N')
|
.short('N')
|
||||||
.action(clap::ArgAction::SetTrue)
|
|
||||||
.help("Do not double-check that the compression was performed correctly")
|
.help("Do not double-check that the compression was performed correctly")
|
||||||
.long_help(concat!("If this flag is set then the verification of the compressed",
|
.long_help(concat!("If this flag is set then the verification of the compressed",
|
||||||
" state groups, which compares them to the original groups, is skipped. This",
|
" state groups, which compares them to the original groups, is skipped. This",
|
||||||
@@ -248,27 +236,44 @@ impl Config {
|
|||||||
).get_matches();
|
).get_matches();
|
||||||
|
|
||||||
let db_url = matches
|
let db_url = matches
|
||||||
.get_one::<String>("postgres-url")
|
.value_of("postgres-url")
|
||||||
.expect("db url should be required");
|
.expect("db url should be required");
|
||||||
|
|
||||||
let output_file = matches.get_one::<String>("output_file").map(|path| {
|
let output_file = matches.value_of("output_file").map(|path| {
|
||||||
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
|
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
|
||||||
});
|
});
|
||||||
|
|
||||||
let room_id = matches
|
let room_id = matches
|
||||||
.get_one::<String>("room_id")
|
.value_of("room_id")
|
||||||
.expect("room_id should be required since no file");
|
.expect("room_id should be required since no file");
|
||||||
|
|
||||||
let min_state_group = matches.get_one("min_state_group").copied();
|
let min_state_group = matches
|
||||||
let groups_to_compress = matches.get_one("groups_to_compress").copied();
|
.value_of("min_state_group")
|
||||||
let min_saved_rows = matches.get_one("min_saved_rows").copied();
|
.map(|s| s.parse().expect("min_state_group must be an integer"));
|
||||||
let max_state_group = matches.get_one("max_state_group").copied();
|
|
||||||
let level_sizes = matches.get_one("level_sizes").cloned().unwrap();
|
|
||||||
|
|
||||||
let transactions = matches.get_flag("transactions");
|
let groups_to_compress = matches
|
||||||
let graphs = matches.get_flag("graphs");
|
.value_of("groups_to_compress")
|
||||||
let commit_changes = matches.get_flag("commit_changes");
|
.map(|s| s.parse().expect("groups_to_compress must be an integer"));
|
||||||
let verify = !matches.get_flag("no_verify");
|
|
||||||
|
let min_saved_rows = matches
|
||||||
|
.value_of("min_saved_rows")
|
||||||
|
.map(|v| v.parse().expect("COUNT must be an integer"));
|
||||||
|
|
||||||
|
let max_state_group = matches
|
||||||
|
.value_of("max_state_group")
|
||||||
|
.map(|s| s.parse().expect("max_state_group must be an integer"));
|
||||||
|
|
||||||
|
let level_sizes = matches
|
||||||
|
.value_of_t::<LevelSizes>("level_sizes")
|
||||||
|
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
|
||||||
|
|
||||||
|
let transactions = matches.is_present("transactions");
|
||||||
|
|
||||||
|
let graphs = matches.is_present("graphs");
|
||||||
|
|
||||||
|
let commit_changes = matches.is_present("commit_changes");
|
||||||
|
|
||||||
|
let verify = !matches.is_present("no_verify");
|
||||||
|
|
||||||
Config {
|
Config {
|
||||||
db_url: String::from(db_url),
|
db_url: String::from(db_url),
|
||||||
@@ -419,7 +424,8 @@ fn generate_sql<'a>(
|
|||||||
new_map: &'a BTreeMap<i64, StateGroupEntry>,
|
new_map: &'a BTreeMap<i64, StateGroupEntry>,
|
||||||
room_id: &'a str,
|
room_id: &'a str,
|
||||||
) -> impl Iterator<Item = String> + 'a {
|
) -> impl Iterator<Item = String> + 'a {
|
||||||
old_map.iter().map(move |(sg, old_entry)| {
|
old_map.iter().map(move |(sg,old_entry)| {
|
||||||
|
|
||||||
let new_entry = &new_map[sg];
|
let new_entry = &new_map[sg];
|
||||||
|
|
||||||
// Check if the new map has a different entry for this state group
|
// Check if the new map has a different entry for this state group
|
||||||
@@ -429,50 +435,48 @@ fn generate_sql<'a>(
|
|||||||
let mut sql = String::new();
|
let mut sql = String::new();
|
||||||
|
|
||||||
// remove the current edge
|
// remove the current edge
|
||||||
writeln!(
|
sql.push_str(&format!(
|
||||||
sql,
|
"DELETE FROM state_group_edges WHERE state_group = {};\n",
|
||||||
"DELETE FROM state_group_edges WHERE state_group = {sg};",
|
sg
|
||||||
)
|
));
|
||||||
.expect("Writing to a String cannot fail");
|
|
||||||
|
|
||||||
// if the new entry has a predecessor then put that into state_group_edges
|
// if the new entry has a predecessor then put that into state_group_edges
|
||||||
if let Some(prev_sg) = new_entry.prev_state_group {
|
if let Some(prev_sg) = new_entry.prev_state_group {
|
||||||
writeln!(
|
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
|
||||||
sql,
|
|
||||||
"INSERT INTO state_group_edges (state_group, prev_state_group) \
|
|
||||||
VALUES ({sg}, {prev_sg});",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the current deltas for this state group
|
// remove the current deltas for this state group
|
||||||
writeln!(
|
sql.push_str(&format!(
|
||||||
sql,
|
"DELETE FROM state_groups_state WHERE state_group = {};\n",
|
||||||
"DELETE FROM state_groups_state WHERE state_group = {sg};",
|
sg
|
||||||
)
|
));
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
if !new_entry.state_map.is_empty() {
|
if !new_entry.state_map.is_empty() {
|
||||||
// place all the deltas for the state group in the new map into state_groups_state
|
// place all the deltas for the state group in the new map into state_groups_state
|
||||||
sql.push_str(
|
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
|
||||||
"INSERT INTO state_groups_state \
|
|
||||||
(state_group, room_id, type, state_key, event_id) \
|
|
||||||
VALUES\n",
|
|
||||||
);
|
|
||||||
|
|
||||||
let room_id = PGEscape(room_id);
|
let mut first = true;
|
||||||
for ((t, s), e) in new_entry.state_map.iter() {
|
for ((t, s), e) in new_entry.state_map.iter() {
|
||||||
let t = PGEscape(t);
|
// Add a comma at the start if not the first row to be inserted
|
||||||
let s = PGEscape(s);
|
if first {
|
||||||
let e = PGEscape(e);
|
sql.push_str(" ");
|
||||||
|
first = false;
|
||||||
|
} else {
|
||||||
|
sql.push_str(" ,");
|
||||||
|
}
|
||||||
|
|
||||||
// write the row to be inserted of the form:
|
// write the row to be insterted of the form:
|
||||||
// (state_group, room_id, type, state_key, event_id)
|
// (state_group, room_id, type, state_key, event_id)
|
||||||
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
|
sql.push_str(&format!(
|
||||||
|
"({}, {}, {}, {}, {})",
|
||||||
|
sg,
|
||||||
|
PGEscape(room_id),
|
||||||
|
PGEscape(t),
|
||||||
|
PGEscape(s),
|
||||||
|
PGEscape(e)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
sql.push_str(";\n");
|
||||||
// Replace the last comma with a semicolon
|
|
||||||
sql.replace_range((sql.len() - 2).., ";\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sql
|
sql
|
||||||
@@ -510,12 +514,10 @@ fn output_sql(
|
|||||||
ProgressBar::new(old_map.len() as u64)
|
ProgressBar::new(old_map.len() as u64)
|
||||||
};
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar()
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
|
||||||
.unwrap(),
|
|
||||||
);
|
);
|
||||||
pb.set_message("state groups");
|
pb.set_message("state groups");
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
if let Some(output) = &mut config.output_file {
|
if let Some(output) = &mut config.output_file {
|
||||||
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
|
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
|
||||||
@@ -563,7 +565,7 @@ pub fn continue_run(
|
|||||||
let (state_group_map, max_group_found) =
|
let (state_group_map, max_group_found) =
|
||||||
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
|
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
|
||||||
|
|
||||||
let original_num_rows = state_group_map.values().map(|v| v.state_map.len()).sum();
|
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
|
||||||
|
|
||||||
// Now we actually call the compression algorithm.
|
// Now we actually call the compression algorithm.
|
||||||
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
|
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
|
||||||
@@ -626,12 +628,10 @@ fn check_that_maps_match(
|
|||||||
ProgressBar::new(old_map.len() as u64)
|
ProgressBar::new(old_map.len() as u64)
|
||||||
};
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar()
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
|
||||||
.unwrap(),
|
|
||||||
);
|
);
|
||||||
pb.set_message("state groups");
|
pb.set_message("state groups");
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
// Now let's iterate through and assert that the state for each group
|
// Now let's iterate through and assert that the state for each group
|
||||||
// matches between the two versions.
|
// matches between the two versions.
|
||||||
|
|||||||
@@ -16,31 +16,24 @@ classifier = [
|
|||||||
openssl = "0.10.32"
|
openssl = "0.10.32"
|
||||||
postgres = "0.19.0"
|
postgres = "0.19.0"
|
||||||
postgres-openssl = "0.5.0"
|
postgres-openssl = "0.5.0"
|
||||||
|
tikv-jemallocator = "0.5.0"
|
||||||
rand = "0.8.0"
|
rand = "0.8.0"
|
||||||
serial_test = "0.9.0"
|
serial_test = "0.5.1"
|
||||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
log-panics = "2.0.0"
|
log-panics = "2.0.0"
|
||||||
anyhow = "1.0.42"
|
anyhow = "1.0.42"
|
||||||
pyo3-log = "0.7.0"
|
pyo3-log = "0.6.0"
|
||||||
|
|
||||||
# Needed for pyo3 support
|
# Needed for pyo3 support
|
||||||
[lib]
|
[lib]
|
||||||
crate-type = ["cdylib", "rlib"]
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
|
||||||
[dependencies.clap]
|
[dependencies.clap]
|
||||||
version = "4.0.15"
|
version = "3.1.14"
|
||||||
features = ["cargo"]
|
features = ["cargo"]
|
||||||
|
|
||||||
[dependencies.pyo3]
|
[dependencies.pyo3]
|
||||||
version = "0.17.1"
|
version = "0.16.4"
|
||||||
features = ["extension-module"]
|
features = ["extension-module"]
|
||||||
|
|
||||||
[dependencies.tikv-jemallocator]
|
|
||||||
version = "0.5.0"
|
|
||||||
optional = true
|
|
||||||
|
|
||||||
[features]
|
|
||||||
default = ["jemalloc"]
|
|
||||||
jemalloc = ["tikv-jemallocator"]
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ pub mod state_saving;
|
|||||||
///
|
///
|
||||||
/// This is needed since FromStr cannot be implemented for structs
|
/// This is needed since FromStr cannot be implemented for structs
|
||||||
/// that aren't defined in this scope
|
/// that aren't defined in this scope
|
||||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
#[derive(PartialEq, Debug)]
|
||||||
pub struct LevelInfo(pub Vec<Level>);
|
pub struct LevelInfo(pub Vec<Level>);
|
||||||
|
|
||||||
// Implement FromStr so that an argument of the form "100,50,25"
|
// Implement FromStr so that an argument of the form "100,50,25"
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
//! the state_compressor_state table so that the compressor can seemlesly
|
//! the state_compressor_state table so that the compressor can seemlesly
|
||||||
//! continue from where it left off.
|
//! continue from where it left off.
|
||||||
|
|
||||||
#[cfg(feature = "jemalloc")]
|
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||||
|
|
||||||
@@ -67,13 +66,12 @@ fn main() {
|
|||||||
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
||||||
"for the full details."
|
"for the full details."
|
||||||
))
|
))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("chunk_size")
|
Arg::new("chunk_size")
|
||||||
.short('c')
|
.short('c')
|
||||||
.value_name("COUNT")
|
.value_name("COUNT")
|
||||||
.value_parser(clap::value_parser!(i64))
|
|
||||||
.help("The maximum number of state groups to load into memroy at once")
|
.help("The maximum number of state groups to load into memroy at once")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"The number of state_groups to work on at once. All of the entries",
|
"The number of state_groups to work on at once. All of the entries",
|
||||||
@@ -84,13 +82,12 @@ fn main() {
|
|||||||
" chunk as a whole (which may well happen in rooms with lots",
|
" chunk as a whole (which may well happen in rooms with lots",
|
||||||
" of backfill in) then the entire chunk is skipped.)",
|
" of backfill in) then the entire chunk is skipped.)",
|
||||||
))
|
))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("default_levels")
|
Arg::new("default_levels")
|
||||||
.short('l')
|
.short('l')
|
||||||
.value_name("LEVELS")
|
.value_name("LEVELS")
|
||||||
.value_parser(clap::value_parser!(LevelInfo))
|
|
||||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
||||||
@@ -103,43 +100,41 @@ fn main() {
|
|||||||
" iterations needed to fetch a given set of state.",
|
" iterations needed to fetch a given set of state.",
|
||||||
))
|
))
|
||||||
.default_value("100,50,25")
|
.default_value("100,50,25")
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::new("number_of_chunks")
|
Arg::new("number_of_chunks")
|
||||||
.short('n')
|
.short('n')
|
||||||
.value_name("CHUNKS_TO_COMPRESS")
|
.value_name("CHUNKS_TO_COMPRESS")
|
||||||
.value_parser(clap::value_parser!(i64))
|
|
||||||
.help("The number of chunks to compress")
|
.help("The number of chunks to compress")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"This many chunks of the database will be compressed. The higher this number is set to, ",
|
"This many chunks of the database will be compressed. The higher this number is set to, ",
|
||||||
"the longer the compressor will run for."
|
"the longer the compressor will run for."
|
||||||
))
|
))
|
||||||
.num_args(1)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).get_matches();
|
).get_matches();
|
||||||
|
|
||||||
// The URL of the database
|
// The URL of the database
|
||||||
let db_url = arguments
|
let db_url = arguments
|
||||||
.get_one::<String>("postgres-url")
|
.value_of("postgres-url")
|
||||||
.expect("A database url is required");
|
.expect("A database url is required");
|
||||||
|
|
||||||
// The number of state groups to work on at once
|
// The number of state groups to work on at once
|
||||||
let chunk_size = arguments
|
let chunk_size = arguments
|
||||||
.get_one("chunk_size")
|
.value_of("chunk_size")
|
||||||
.copied()
|
.map(|s| s.parse().expect("chunk_size must be an integer"))
|
||||||
.expect("A chunk size is required");
|
.expect("A chunk size is required");
|
||||||
|
|
||||||
// The default structure to use when compressing
|
// The default structure to use when compressing
|
||||||
let default_levels = arguments
|
let default_levels = arguments
|
||||||
.get_one::<LevelInfo>("default_levels")
|
.value_of_t::<LevelInfo>("default_levels")
|
||||||
.cloned()
|
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// The number of rooms to compress with this tool
|
// The number of rooms to compress with this tool
|
||||||
let number_of_chunks = arguments
|
let number_of_chunks = arguments
|
||||||
.get_one("number_of_chunks")
|
.value_of("number_of_chunks")
|
||||||
.copied()
|
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
|
||||||
.expect("number_of_chunks is required");
|
.expect("number_of_chunks is required");
|
||||||
|
|
||||||
// Connect to the database and create the 2 tables this tool needs
|
// Connect to the database and create the 2 tables this tool needs
|
||||||
|
|||||||
Reference in New Issue
Block a user