1184
Cargo.lock
generated
1184
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
16
Cargo.toml
16
Cargo.toml
@@ -3,16 +3,16 @@ authors = ["Erik Johnston"]
|
||||
description = "A tool to compress some state in a Synapse instance's database"
|
||||
name = "synapse-compress-state"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
clap = "2.32.0"
|
||||
fallible-iterator = "0.1.5"
|
||||
indicatif = "0.11.0"
|
||||
jemallocator = "0.3.0"
|
||||
postgres = "0.15.2"
|
||||
rand = "0.7.0"
|
||||
rayon = "1.0.2"
|
||||
string_cache = "0.7.3"
|
||||
clap = "2.33.0"
|
||||
indicatif = "0.14.0"
|
||||
jemallocator = "0.3.2"
|
||||
postgres = "0.17.0"
|
||||
rand = "0.7.2"
|
||||
rayon = "1.3.0"
|
||||
string_cache = "0.8.0"
|
||||
|
||||
[dependencies.state-map]
|
||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||
|
||||
@@ -30,11 +30,10 @@
|
||||
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use state_map::StateMap;
|
||||
use std::collections::BTreeMap;
|
||||
use string_cache::DefaultAtom as Atom;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use {collapse_state_maps, StateGroupEntry};
|
||||
use super::{collapse_state_maps, StateGroupEntry};
|
||||
|
||||
/// Holds information about a particular level.
|
||||
struct Level {
|
||||
|
||||
@@ -12,17 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use postgres::{Connection, TlsMode};
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::{thread_rng, Rng};
|
||||
use postgres::{fallible_iterator::FallibleIterator, Client};
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
use std::{borrow::Cow, collections::BTreeMap, fmt, iter};
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
|
||||
use StateGroupEntry;
|
||||
use super::StateGroupEntry;
|
||||
|
||||
/// Fetch the entries in state_groups_state (and their prev groups) for the
|
||||
/// given `room_id` by connecting to the postgres database at `db_url`.
|
||||
@@ -31,9 +26,9 @@ pub fn get_data_from_db(
|
||||
room_id: &str,
|
||||
max_state_group: Option<i64>,
|
||||
) -> BTreeMap<i64, StateGroupEntry> {
|
||||
let conn = Connection::connect(db_url, TlsMode::None).unwrap();
|
||||
let mut client = Client::connect(db_url, postgres::NoTls).unwrap();
|
||||
|
||||
let mut state_group_map = get_initial_data_from_db(&conn, room_id, max_state_group);
|
||||
let mut state_group_map = get_initial_data_from_db(&mut client, room_id, max_state_group);
|
||||
|
||||
println!("Got initial state from database. Checking for any missing state groups...");
|
||||
|
||||
@@ -68,7 +63,7 @@ pub fn get_data_from_db(
|
||||
|
||||
println!("Missing {} state groups", missing_sgs.len());
|
||||
|
||||
let map = get_missing_from_db(&conn, &missing_sgs);
|
||||
let map = get_missing_from_db(&mut client, &missing_sgs);
|
||||
state_group_map.extend(map.into_iter());
|
||||
}
|
||||
|
||||
@@ -78,33 +73,25 @@ pub fn get_data_from_db(
|
||||
/// Fetch the entries in state_groups_state (and their prev groups) for the
|
||||
/// given `room_id` by fetching all state with the given `room_id`.
|
||||
fn get_initial_data_from_db(
|
||||
conn: &Connection,
|
||||
client: &mut Client,
|
||||
room_id: &str,
|
||||
max_state_group: Option<i64>,
|
||||
) -> BTreeMap<i64, StateGroupEntry> {
|
||||
let sql = format!(
|
||||
r#"
|
||||
let sql = r#"
|
||||
SELECT m.id, prev_state_group, type, state_key, s.event_id
|
||||
FROM state_groups AS m
|
||||
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
|
||||
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
|
||||
WHERE m.room_id = $1 {}
|
||||
"#,
|
||||
if max_state_group.is_some() {
|
||||
"AND m.id <= $2"
|
||||
} else {
|
||||
""
|
||||
}
|
||||
);
|
||||
|
||||
let stmt = conn.prepare(&sql).unwrap();
|
||||
|
||||
let trans = conn.transaction().unwrap();
|
||||
WHERE m.room_id = $1
|
||||
"#;
|
||||
|
||||
let mut rows = if let Some(s) = max_state_group {
|
||||
stmt.lazy_query(&trans, &[&room_id, &s], 1000)
|
||||
client.query_raw(
|
||||
format!(r"{} AND m.id <= $2", sql).as_str(),
|
||||
vec![&room_id as _, &s as _],
|
||||
)
|
||||
} else {
|
||||
stmt.lazy_query(&trans, &[&room_id], 1000)
|
||||
client.query_raw(sql, iter::once(&room_id as _))
|
||||
}
|
||||
.unwrap();
|
||||
|
||||
@@ -116,16 +103,12 @@ fn get_initial_data_from_db(
|
||||
);
|
||||
pb.enable_steady_tick(100);
|
||||
|
||||
let mut num_rows = 0;
|
||||
while let Some(row) = rows.next().unwrap() {
|
||||
let state_group = row.get(0);
|
||||
|
||||
let entry = state_group_map.entry(state_group).or_default();
|
||||
let entry = state_group_map.entry(row.get(0)).or_default();
|
||||
|
||||
entry.prev_state_group = row.get(1);
|
||||
let etype: Option<String> = row.get(2);
|
||||
|
||||
if let Some(etype) = etype {
|
||||
if let Some(etype) = row.get::<_, Option<String>>(2) {
|
||||
entry.state_map.insert(
|
||||
&etype,
|
||||
&row.get::<_, String>(3),
|
||||
@@ -134,34 +117,31 @@ fn get_initial_data_from_db(
|
||||
}
|
||||
|
||||
pb.inc(1);
|
||||
num_rows += 1;
|
||||
}
|
||||
|
||||
pb.set_length(num_rows);
|
||||
pb.set_length(pb.position());
|
||||
pb.finish();
|
||||
|
||||
state_group_map
|
||||
}
|
||||
|
||||
/// Get any missing state groups from the database
|
||||
fn get_missing_from_db(conn: &Connection, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
|
||||
let stmt = conn
|
||||
.prepare(
|
||||
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
|
||||
let mut rows = client
|
||||
.query_raw(
|
||||
r#"
|
||||
SELECT state_group, prev_state_group
|
||||
FROM state_group_edges
|
||||
WHERE state_group = ANY($1)
|
||||
"#,
|
||||
iter::once(&missing_sgs as _),
|
||||
)
|
||||
.unwrap();
|
||||
let trans = conn.transaction().unwrap();
|
||||
|
||||
let mut rows = stmt.lazy_query(&trans, &[&missing_sgs], 100).unwrap();
|
||||
|
||||
// initialise the map with empty entries (the missing group may not
|
||||
// have a prev_state_group either)
|
||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> =
|
||||
missing_sgs.iter()
|
||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = missing_sgs
|
||||
.iter()
|
||||
.map(|sg| (*sg, StateGroupEntry::default()))
|
||||
.collect();
|
||||
|
||||
@@ -175,10 +155,10 @@ fn get_missing_from_db(conn: &Connection, missing_sgs: &[i64]) -> BTreeMap<i64,
|
||||
}
|
||||
|
||||
/// Helper function that escapes the wrapped text when writing SQL
|
||||
pub struct PGEscapse<'a>(pub &'a str);
|
||||
pub struct PGEscape<'a>(pub &'a str);
|
||||
|
||||
impl<'a> fmt::Display for PGEscapse<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
impl<'a> fmt::Display for PGEscape<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let mut delim = Cow::from("$$");
|
||||
while self.0.contains(&delim as &str) {
|
||||
let s: String = thread_rng().sample_iter(&Alphanumeric).take(10).collect();
|
||||
@@ -192,12 +172,12 @@ impl<'a> fmt::Display for PGEscapse<'a> {
|
||||
|
||||
#[test]
|
||||
fn test_pg_escape() {
|
||||
let s = format!("{}", PGEscapse("test"));
|
||||
let s = format!("{}", PGEscape("test"));
|
||||
assert_eq!(s, "$$test$$");
|
||||
|
||||
let dodgy_string = "test$$ing";
|
||||
|
||||
let s = format!("{}", PGEscapse(dodgy_string));
|
||||
let s = format!("{}", PGEscape(dodgy_string));
|
||||
|
||||
// prefix and suffixes should match
|
||||
let start_pos = s.find(dodgy_string).expect("expected to find dodgy string");
|
||||
|
||||
59
src/main.rs
59
src/main.rs
@@ -16,17 +16,6 @@
|
||||
//! Synapse instance's database. Specifically, it aims to reduce the number of
|
||||
//! rows that a given room takes up in the `state_groups_state` table.
|
||||
|
||||
#[macro_use]
|
||||
extern crate clap;
|
||||
extern crate fallible_iterator;
|
||||
extern crate indicatif;
|
||||
extern crate jemallocator;
|
||||
extern crate postgres;
|
||||
extern crate rand;
|
||||
extern crate rayon;
|
||||
extern crate state_map;
|
||||
extern crate string_cache;
|
||||
|
||||
mod compressor;
|
||||
mod database;
|
||||
|
||||
@@ -34,19 +23,17 @@ mod database;
|
||||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||
|
||||
use compressor::Compressor;
|
||||
use database::PGEscapse;
|
||||
use database::PGEscape;
|
||||
|
||||
use clap::{App, Arg};
|
||||
use clap::{
|
||||
crate_authors, crate_description, crate_name, crate_version, value_t_or_exit, App, Arg,
|
||||
};
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use rayon::prelude::*;
|
||||
use state_map::StateMap;
|
||||
use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr};
|
||||
use string_cache::DefaultAtom as Atom;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// An entry for a state group. Consists of an (optional) previous group and the
|
||||
/// delta from that previous group (or the full state if no previous group)
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq)]
|
||||
@@ -94,7 +81,7 @@ impl FromStr for LevelSizes {
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut sizes = Vec::new();
|
||||
|
||||
for size_str in s.split(",") {
|
||||
for size_str in s.split(',') {
|
||||
let size: usize = size_str
|
||||
.parse()
|
||||
.map_err(|_| "Not a comma separated list of numbers")?;
|
||||
@@ -106,6 +93,7 @@ impl FromStr for LevelSizes {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
#[allow(deprecated)]
|
||||
let matches = App::new(crate_name!())
|
||||
.version(crate_version!())
|
||||
.author(crate_authors!("\n"))
|
||||
@@ -127,9 +115,17 @@ fn main() {
|
||||
).arg(
|
||||
Arg::with_name("max_state_group")
|
||||
.short("s")
|
||||
.value_name("MAX_STATE_GROUP")
|
||||
.help("The maximum state group to process up to")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::with_name("min_saved_rows")
|
||||
.short("m")
|
||||
.value_name("COUNT")
|
||||
.help("Suppress output if fewer than COUNT rows would be saved")
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::with_name("output_file")
|
||||
.short("o")
|
||||
@@ -176,6 +172,10 @@ fn main() {
|
||||
.value_of("max_state_group")
|
||||
.map(|s| s.parse().expect("max_state_group must be an integer"));
|
||||
|
||||
let min_saved_rows = matches
|
||||
.value_of("min_saved_rows")
|
||||
.map(|v| v.parse().expect("COUNT must be an integer"));
|
||||
|
||||
let transactions = matches.is_present("transactions");
|
||||
|
||||
let level_sizes = value_t_or_exit!(matches, "level_sizes", LevelSizes);
|
||||
@@ -228,6 +228,17 @@ fn main() {
|
||||
compressor.stats.state_groups_changed
|
||||
);
|
||||
|
||||
if let Some(min) = min_saved_rows {
|
||||
let saving = (original_summed_size - compressed_summed_size) as i32;
|
||||
if saving < min {
|
||||
println!(
|
||||
"Only {} rows would be saved by this compression. Skipping output.",
|
||||
saving
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// If we are given an output file, we output the changes as SQL. If the
|
||||
// `transactions` argument is set we wrap each change to a state group in a
|
||||
// transaction.
|
||||
@@ -267,7 +278,7 @@ fn main() {
|
||||
sg
|
||||
)
|
||||
.unwrap();
|
||||
if new_entry.state_map.len() > 0 {
|
||||
if !new_entry.state_map.is_empty() {
|
||||
writeln!(output, "INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES").unwrap();
|
||||
let mut first = true;
|
||||
for ((t, s), e) in new_entry.state_map.iter() {
|
||||
@@ -281,10 +292,10 @@ fn main() {
|
||||
output,
|
||||
"({}, {}, {}, {}, {})",
|
||||
sg,
|
||||
PGEscapse(room_id),
|
||||
PGEscapse(t),
|
||||
PGEscapse(s),
|
||||
PGEscapse(e)
|
||||
PGEscape(room_id),
|
||||
PGEscape(t),
|
||||
PGEscape(s),
|
||||
PGEscape(e)
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user