Improve algorithm and documentation

This commit is contained in:
Erik Johnston
2018-09-18 15:40:58 +01:00
parent 3ba940c743
commit 096467853d
6 changed files with 821 additions and 290 deletions

255
src/compressor.rs Normal file
View File

@@ -0,0 +1,255 @@
//! This is the actual compression algorithm.
//!
//! The algorithm attempts to make a tree of deltas for the state group maps.
//! This is done by having multiple "levels", where each level has a maximum
//! size. The state groups are iterated over, with deltas being calculated
//! against the smallest level that isn't yet full. When a state group is
//! inserted into a level, or lower levels are reset to have their current
//! "head" at the new state group.
//!
//! This produces graphs that look roughly like, for two levels:
//!
//! ```
//! L2 <-------------------- L2 <---------- ...
//! ^--- L1 <--- L1 <--- L1 ^--- L1 <--- L1 <--- L1
//! ```
use rust_matrix_lib::state_map::StateMap;
use std::collections::BTreeMap;
use {collapse_state_maps, StateGroupEntry};
/// Holds information about a particular level.
struct Level {
/// The maximum size this level is allowed to be
max_length: usize,
/// The (approximate) current chain length of this level. This is equivalent
/// to recursively following `current`
current_chain_length: usize,
/// The head of this level
current: Option<i64>,
}
impl Level {
/// Creates a new Level with the given maximum length
pub fn new(max_length: usize) -> Level {
Level {
max_length,
current_chain_length: 0,
current: None,
}
}
/// Update the current head of this level. If delta is true then it means
/// that given state group will (probably) reference the previous head.
///
/// Panics if `delta` is true and the level is already full.
pub fn update(&mut self, current: i64, delta: bool) {
self.current = Some(current);
if delta {
// If we're referencing the previous head then increment our chain
// length estimate
if !self.has_space() {
panic!("Tried to add to a already full level");
}
self.current_chain_length += 1;
} else {
// Otherwise, we've started a new chain with a single entry.
self.current_chain_length = 1;
}
}
/// Get the current head of the level
pub fn get_current(&self) -> Option<i64> {
self.current
}
/// Whether there is space in the current chain at this level. If not then a
/// new chain should be started.
pub fn has_space(&self) -> bool {
self.current_chain_length < self.max_length
}
}
/// Keeps track of some statistics of a compression run.
#[derive(Default)]
pub struct Stats {
/// How many state groups we couldn't find a delta for, despite trying.
pub resets_no_suitable_prev: usize,
/// The sum of the rows of the state groups counted by
/// `resets_no_suitable_prev`.
pub resets_no_suitable_prev_size: usize,
/// How many state groups we have changed.
pub state_groups_changed: usize,
}
/// Attempts to compress a set of state deltas using the given level sizes.
pub struct Compressor<'a> {
original_state_map: &'a BTreeMap<i64, StateGroupEntry>,
pub new_state_group_map: BTreeMap<i64, StateGroupEntry>,
levels: Vec<Level>,
pub stats: Stats,
}
impl<'a> Compressor<'a> {
/// Creates a compressor and runs the compression algorithm.
pub fn compress(
original_state_map: &'a BTreeMap<i64, StateGroupEntry>,
level_sizes: &[usize],
) -> Compressor<'a> {
let mut compressor = Compressor {
original_state_map,
new_state_group_map: BTreeMap::new(),
levels: level_sizes.iter().map(|size| Level::new(*size)).collect(),
stats: Stats::default(),
};
compressor.create_new_tree();
compressor
}
/// Actually runs the compression algorithm
fn create_new_tree(&mut self) {
if !self.new_state_group_map.is_empty() {
panic!("Can only call `create_new_tree` once");
}
for (&state_group, entry) in self.original_state_map {
let mut prev_state_group = None;
for level in &mut self.levels {
if level.has_space() {
prev_state_group = level.get_current();
level.update(state_group, true);
break;
} else {
level.update(state_group, false);
}
}
let (delta, prev_state_group) = if entry.prev_state_group == prev_state_group {
(entry.state_map.clone(), prev_state_group)
} else {
self.stats.state_groups_changed += 1;
self.get_delta(prev_state_group, state_group)
};
self.new_state_group_map.insert(
state_group,
StateGroupEntry {
prev_state_group,
state_map: delta,
},
);
}
}
/// Attempts to calculate the delta between two state groups.
///
/// This is not always possible if the given candidate previous state group
/// have state keys that are not in the new state group. In this case the
/// function will try and iterate back up the current tree to find a state
/// group that can be used as a base for a delta.
///
/// Returns the state map and the actual base state group (if any) used.
fn get_delta(&mut self, prev_sg: Option<i64>, sg: i64) -> (StateMap<String>, Option<i64>) {
let state_map = collapse_state_maps(&self.original_state_map, sg);
let mut prev_sg = if let Some(prev_sg) = prev_sg {
prev_sg
} else {
return (state_map, None);
};
// This is a loop to go through to find the first prev_sg which can be
// a valid base for the state group.
let mut prev_state_map;
'outer: loop {
prev_state_map = collapse_state_maps(&self.original_state_map, prev_sg);
for (t, s) in prev_state_map.keys() {
if !state_map.contains_key(t, s) {
// This is not a valid base as it contains key the new state
// group doesn't have. Attempt to walk up the tree to find a
// better base.
if let Some(psg) = self.new_state_group_map[&prev_sg].prev_state_group {
prev_sg = psg;
continue 'outer;
}
// Couldn't find a new base, so we give up and just persist
// a full state group here.
self.stats.resets_no_suitable_prev += 1;
self.stats.resets_no_suitable_prev_size += state_map.len();
return (state_map, None);
}
}
break;
}
// We've found a valid base, now we just need to calculate the delta.
let mut delta_map = StateMap::new();
for ((t, s), e) in state_map.iter() {
if prev_state_map.get(t, s) != Some(e) {
delta_map.insert(t, s, e.clone());
}
}
(delta_map, Some(prev_sg))
}
}
#[test]
fn test_new_map() {
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let mut prev = None;
for i in 0i64..=13i64 {
initial.insert(
i,
StateGroupEntry {
prev_state_group: prev,
state_map: StateMap::new(),
},
);
prev = Some(i)
}
let compressor = Compressor::compress(&initial, &[3, 3]);
let new_state = compressor.new_state_group_map;
let expected_edges: BTreeMap<i64, i64> = vec![
(1, 0),
(2, 1),
(4, 3),
(5, 4),
(6, 3),
(7, 6),
(8, 7),
(9, 6),
(10, 9),
(11, 10),
(13, 12),
].into_iter()
.collect();
for sg in 0i64..=13i64 {
assert_eq!(
expected_edges.get(&sg).cloned(),
new_state[&sg].prev_state_group,
"state group {} did not match expected",
sg,
);
}
}

125
src/database.rs Normal file
View File

@@ -0,0 +1,125 @@
use fallible_iterator::FallibleIterator;
use indicatif::{ProgressBar, ProgressStyle};
use postgres::{Connection, TlsMode};
use std::collections::BTreeMap;
use StateGroupEntry;
/// Fetch the entries in state_groups_state (and their prev groups) for the
/// given `room_id` by connecting to the postgres database at `db_url`.
pub fn get_data_from_db(db_url: &str, room_id: &str) -> BTreeMap<i64, StateGroupEntry> {
let conn = Connection::connect(db_url, TlsMode::None).unwrap();
let mut state_group_map = get_initial_data_from_db(&conn, room_id);
println!("Got initial state from database. Checking for any missing state groups...");
// Due to reasons some of the state groups appear in the edges table, but
// not in the state_groups_state table. This means they don't get included
// in our DB queries, so we have to fetch any missing groups explicitly.
// Since the returned groups may themselves reference groups we don't have,
// we need to do this recursively until we don't find any more missing.
loop {
let missing_sgs: Vec<_> = state_group_map
.iter()
.filter_map(|(_sg, entry)| {
if let Some(prev_sg) = entry.prev_state_group {
if state_group_map.contains_key(&prev_sg) {
None
} else {
Some(prev_sg)
}
} else {
None
}
}).collect();
if missing_sgs.is_empty() {
break;
}
println!("Missing {} state groups", missing_sgs.len());
let map = get_missing_from_db(&conn, &missing_sgs);
state_group_map.extend(map.into_iter());
}
state_group_map
}
/// Fetch the entries in state_groups_state (and their prev groups) for the
/// given `room_id` by fetching all state with the given `room_id`.
fn get_initial_data_from_db(conn: &Connection, room_id: &str) -> BTreeMap<i64, StateGroupEntry> {
let stmt = conn
.prepare(
r#"
SELECT m.id, prev_state_group, type, state_key, s.event_id
FROM state_groups AS m
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
WHERE m.room_id = $1
"#,
).unwrap();
let trans = conn.transaction().unwrap();
let mut rows = stmt.lazy_query(&trans, &[&room_id], 1000).unwrap();
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let pb = ProgressBar::new_spinner();
pb.set_style(ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos}"));
pb.enable_steady_tick(100);
let mut num_rows = 0;
while let Some(row) = rows.next().unwrap() {
let state_group = row.get(0);
let entry = state_group_map.entry(state_group).or_default();
entry.prev_state_group = row.get(1);
let etype: Option<String> = row.get(2);
if let Some(etype) = etype {
entry
.state_map
.insert(&etype, &row.get::<_, String>(3), row.get(4));
}
pb.inc(1);
num_rows += 1;
}
pb.set_length(num_rows);
pb.finish();
state_group_map
}
/// Get any missing state groups from the database
fn get_missing_from_db(conn: &Connection, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
let stmt = conn
.prepare(
r#"
SELECT state_group, prev_state_group
FROM state_group_edges
WHERE state_group = ANY($1)
"#,
).unwrap();
let trans = conn.transaction().unwrap();
let mut rows = stmt.lazy_query(&trans, &[&missing_sgs], 100).unwrap();
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
while let Some(row) = rows.next().unwrap() {
let state_group = row.get(0);
let entry = state_group_map.entry(state_group).or_default();
entry.prev_state_group = row.get(1);
}
state_group_map
}

View File

@@ -1,149 +1,42 @@
//! This is a tool that attempts to further compress state maps within a
//! Synapse instance's database. Specifically, it aims to reduce the number of
//! rows that a given room takes up in the `state_groups_state` table.
#[macro_use]
extern crate clap;
extern crate fallible_iterator;
extern crate indicatif;
extern crate postgres;
extern crate rayon;
extern crate rust_matrix_lib;
use clap::{App, Arg, ArgGroup};
use fallible_iterator::FallibleIterator;
use postgres::{Connection, TlsMode};
mod compressor;
mod database;
use compressor::Compressor;
use clap::{App, Arg};
use rayon::prelude::*;
use rust_matrix_lib::state_map::StateMap;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::io::Write;
use std::str::FromStr;
/// An entry for a state group. Consists of an (optional) previous group and the
/// delta from that previous group (or the full state if no previous group)
#[derive(Default, Debug, Clone)]
struct StateGroupEntry {
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct StateGroupEntry {
prev_state_group: Option<i64>,
state_map: StateMap<String>,
}
/// Fetch the entries in state_groups_state (and their prev groups) for the
/// given `room_id` by connecting to the postgres database at `db_url`.
fn get_data_from_db(db_url: &str, room_id: &str) -> BTreeMap<i64, StateGroupEntry> {
let conn = Connection::connect(db_url, TlsMode::None).unwrap();
let stmt = conn
.prepare(
r#"
SELECT state_group, prev_state_group, type, state_key, event_id
FROM state_groups_state
LEFT JOIN state_group_edges USING (state_group)
WHERE room_id = $1
"#,
).unwrap();
let trans = conn.transaction().unwrap();
let mut rows = stmt.lazy_query(&trans, &[&room_id], 100).unwrap();
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let mut started = false;
while let Some(row) = rows.next().unwrap() {
if !started {
started = true;
println!("Started streaming from DB!");
}
let state_group = row.get(0);
let entry = state_group_map.entry(state_group).or_default();
entry.prev_state_group = row.get(1);
entry.state_map.insert(
&row.get::<_, String>(2),
&row.get::<_, String>(3),
row.get(4),
);
}
state_group_map
}
/// Get any missing state groups from the database
fn get_missing_from_db(db_url: &str, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
let conn = Connection::connect(db_url, TlsMode::None).unwrap();
let stmt = conn
.prepare(
r#"
SELECT state_group, prev_state_group
FROM state_group_edges
WHERE state_group = ANY($1)
"#,
).unwrap();
let trans = conn.transaction().unwrap();
let mut rows = stmt.lazy_query(&trans, &[&missing_sgs], 100).unwrap();
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
while let Some(row) = rows.next().unwrap() {
let state_group = row.get(0);
let entry = state_group_map.entry(state_group).or_default();
entry.prev_state_group = row.get(1);
}
state_group_map
}
/// Get state group entries from the file at `path`.
///
/// This should be formatted as `|` separated values, with the empty string
/// representing null. (Yes, this is a bit dodgy by means its trivial to get
/// from postgres. We should use a better format).
///
/// The following invocation produces the correct output:
///
/// ```bash
/// psql -At synapse > test.data <<EOF
/// SELECT state_group, prev_state_group, type, state_key, event_id
/// FROM state_groups_state
/// LEFT JOIN state_group_edges USING (state_group)
/// WHERE room_id = '!some_room:example.com';
/// EOF
/// ```
fn get_data_from_file(path: &str) -> BTreeMap<i64, StateGroupEntry> {
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let f = File::open(path).unwrap();
let f = BufReader::new(f);
for line in f.lines() {
let line = line.unwrap();
let mut iter = line.split('|');
let state_group = iter.next().unwrap().parse().unwrap();
let entry = state_group_map.entry(state_group).or_default();
let prev_state_group_str = iter.next().unwrap();
entry.prev_state_group = if prev_state_group_str.is_empty() {
None
} else {
Some(prev_state_group_str.parse().unwrap())
};
entry.state_map.insert(
iter.next().unwrap(),
iter.next().unwrap(),
iter.next().unwrap().to_string(),
);
}
state_group_map
}
/// Gets the full state for a given group from the map (of deltas)
fn collapse_state_maps(map: &BTreeMap<i64, StateGroupEntry>, state_group: i64) -> StateMap<String> {
pub fn collapse_state_maps(
map: &BTreeMap<i64, StateGroupEntry>,
state_group: i64,
) -> StateMap<String> {
let mut entry = &map[&state_group];
let mut state_map = StateMap::new();
@@ -169,6 +62,26 @@ fn collapse_state_maps(map: &BTreeMap<i64, StateGroupEntry>, state_group: i64) -
state_map
}
/// Helper struct for parsing the `level_sizes` argument.
struct LevelSizes(Vec<usize>);
impl FromStr for LevelSizes {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut sizes = Vec::new();
for size_str in s.split(",") {
let size: usize = size_str
.parse()
.map_err(|_| "Not a comma separated list of numbers")?;
sizes.push(size);
}
Ok(LevelSizes(sizes))
}
}
fn main() {
let matches = App::new(crate_name!())
.version(crate_version!())
@@ -186,23 +99,26 @@ fn main() {
.short("r")
.value_name("ROOM_ID")
.help("The room to process")
.takes_value(true),
.takes_value(true)
.required(true),
).arg(
Arg::with_name("input")
.short("f")
.value_name("FILE")
.help("File containing dumped state groups")
.takes_value(true),
).arg(
Arg::with_name("output_diff")
Arg::with_name("output_file")
.short("o")
.value_name("FILE")
.help("File to output the changes to")
.help("File to output the changes to in SQL")
.takes_value(true),
).arg(
Arg::with_name("individual_transactions")
.short("t")
.help("Whether to wrap each state group change in a transaction, when writing to file")
.requires("output_file"),
).arg(
Arg::with_name("level_sizes")
.short("l")
.value_name("LEVELS")
.help("Sizes of each new level in the compression algorithm, as a comma separate list")
.default_value("100,50,25")
.takes_value(true),
).group(
ArgGroup::with_name("target")
.args(&["input", "room_id"])
.required(true),
).get_matches();
let db_url = matches
@@ -210,157 +126,113 @@ fn main() {
.expect("db url should be required");
let mut output_file = matches
.value_of("output_diff")
.value_of("output_file")
.map(|path| File::create(path).unwrap());
let room_id = matches
.value_of("room_id")
.expect("room_id should be required since no file");
let individual_transactions = matches.is_present("individual_transactions");
let level_sizes = value_t_or_exit!(matches, "level_sizes", LevelSizes);
// First we need to get the current state groups
let mut state_group_map = if let Some(path) = matches.value_of("input") {
get_data_from_file(path)
} else {
let room_id = matches
.value_of("room_id")
.expect("room_id should be required since no file");
get_data_from_db(db_url, room_id)
};
println!("Fetching state from DB for room '{}'...", room_id);
let state_group_map = database::get_data_from_db(db_url, room_id);
// For reasons that escape me some of the state groups appear in the edges
// table, but not in the state_groups_state table. This means they don't
// get included in our DB queries, so we have to fetch any missing groups
// explicitly. Since the returned groups may themselves reference groups
// we don't have we need to do this recursively until we don't find any
// more
loop {
let missing_sgs: Vec<_> = state_group_map
.iter()
.filter_map(|(_sg, entry)| {
if let Some(prev_sg) = entry.prev_state_group {
if state_group_map.contains_key(&prev_sg) {
None
} else {
Some(prev_sg)
}
} else {
None
}
}).collect();
println!("Number of state groups: {}", state_group_map.len());
if missing_sgs.is_empty() {
break;
}
println!("Missing {} state groups", missing_sgs.len());
let map = get_missing_from_db(db_url, &missing_sgs);
state_group_map.extend(map.into_iter());
}
println!("Number of entries: {}", state_group_map.len());
let summed_size = state_group_map
let original_summed_size = state_group_map
.iter()
.fold(0, |acc, (_, v)| acc + v.state_map.len());
println!("Number of rows: {}", summed_size);
println!("Number of rows in current table: {}", original_summed_size);
let mut new_state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
// Now we actually call the compression algorithm.
// Now we loop through and create our new state maps from the existing
// ones.
// The existing table is made up of chains of groups at most 100 nodes
// long. At the start of each chain there is a copy of the full state at
// that point. This algorithm adds edges between such "checkpoint" nodes,
// so that there are chains between them. We cap such checkpoint chains to
// a length of 50.
//
// The idea here is that between checkpoint nodes only small subsets of
// state will have actually changed.
//
// (This approach can be generalised by adding more and more layers)
let compressor = Compressor::compress(&state_group_map, &level_sizes.0);
let mut last_checkpoint_opt = None;
let mut checkpoint_length = 0;
let new_state_group_map = compressor.new_state_group_map;
for (state_group, entry) in &state_group_map {
if entry.prev_state_group.is_none() {
// We're at a checkpoint node. If this is our first checkpoint
// node then there isn't much to do other than mark it.
let mut added_to_chain = false;
if let Some(ref last_checkpoint) = last_checkpoint_opt {
let checkpoint_entry = &state_group_map[last_checkpoint];
// Done! Now to print a bunch of stats.
// We need to ensure that that aren't any entries in the
// previous checkpoint node that aren't in the state at this
// point, since the table schema doesn't support the idea of
// "deleting" state in the deltas.
//
// Note: The entry.state_map will be the full state here, rather
// than just the delta since prev_state_group is None.
if checkpoint_entry
.state_map
.keys()
.all(|(t, s)| entry.state_map.contains_key(t, s))
{
// We create the new map by filtering out entries that match
// those in the previous checkpoint state.
let new_map: StateMap<String> = entry
.state_map
.iter()
.filter(|((t, s), e)| checkpoint_entry.state_map.get(t, s) != Some(e))
.map(|((t, s), e)| ((t, s), e.clone()))
.collect();
let compressed_summed_size = new_state_group_map
.iter()
.fold(0, |acc, (_, v)| acc + v.state_map.len());
// If we have an output file write the changes we've made
if let Some(ref mut fs) = output_file {
writeln!(fs, "edge_addition {} {}", state_group, *last_checkpoint).unwrap();
for ((t, s), e) in new_map.iter() {
writeln!(fs, "state_replace {} {} {} {}", state_group, t, s, e)
.unwrap();
let ratio = (compressed_summed_size as f64) / (original_summed_size as f64);
println!(
"Number of rows after compression: {} ({:.2}%)",
compressed_summed_size,
ratio * 100.
);
println!("Compression Statistics:");
println!(
" Number of forced resets due to lacking prev: {}",
compressor.stats.resets_no_suitable_prev
);
println!(
" Number of compressed rows caused by the above: {}",
compressor.stats.resets_no_suitable_prev_size
);
println!(
" Number of state groups changed: {}",
compressor.stats.state_groups_changed
);
// If we are given an output file, we output the changes as SQL. If the
// `individual_transactions` argument is set we wrap each change to a state
// group in a transaction.
if let Some(output) = &mut output_file {
for (sg, old_entry) in &state_group_map {
let new_entry = &new_state_group_map[sg];
if old_entry != new_entry {
if individual_transactions {
writeln!(output, "BEGIN;");
}
writeln!(
output,
"DELETE FROM state_group_edges WHERE state_group = {};",
sg
);
if let Some(prev_sg) = new_entry.prev_state_group {
writeln!(output, "INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});", sg, prev_sg);
}
writeln!(
output,
"DELETE FROM state_groups_state WHERE state_group = {};",
sg
);
if new_entry.state_map.len() > 0 {
writeln!(output, "INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
let mut first = true;
for ((t, s), e) in new_entry.state_map.iter() {
if first {
write!(output, " ");
first = false;
} else {
write!(output, " ,");
}
writeln!(output, "({}, '{}', '{}', '{}', '{}')", sg, room_id, t, s, e);
}
new_state_group_map.insert(
*state_group,
StateGroupEntry {
prev_state_group: Some(*last_checkpoint),
state_map: new_map,
},
);
added_to_chain = true;
} else {
new_state_group_map.insert(*state_group, entry.clone());
writeln!(output, ";");
}
} else {
new_state_group_map.insert(*state_group, entry.clone());
}
last_checkpoint_opt = Some(*state_group);
// If we've added to the checkpoint chain we increment the length,
// otherwise it gets reset to zero.
if added_to_chain {
checkpoint_length += 1;
} else {
checkpoint_length = 0;
if individual_transactions {
writeln!(output, "COMMIT;");
}
writeln!(output);
}
// If the chain is longer than 50 then lets reset to create a new
// chain.
if checkpoint_length >= 50 {
checkpoint_length = 0;
last_checkpoint_opt = None;
}
} else {
new_state_group_map.insert(*state_group, entry.clone());
}
}
let summed_size = new_state_group_map
.iter()
.fold(0, |acc, (_, v)| acc + v.state_map.len());
println!("Number of rows compressed: {}", summed_size);
// Now let's iterate through and assert that the state for each group
// matches between the two versions.
state_group_map