Files
rust-synapse-compress-state/src/compressor.rs
2018-09-18 17:51:28 +01:00

257 lines
8.0 KiB
Rust

//! This is the actual compression algorithm.
//!
//! The algorithm attempts to make a tree of deltas for the state group maps.
//! This is done by having multiple "levels", where each level has a maximum
//! size. The state groups are iterated over, with deltas being calculated
//! against the smallest level that isn't yet full. When a state group is
//! inserted into a level, or lower levels are reset to have their current
//! "head" at the new state group.
//!
//! This produces graphs that look roughly like, for two levels:
//!
//! ```
//! L2 <-------------------- L2 <---------- ...
//! ^--- L1 <--- L1 <--- L1 ^--- L1 <--- L1 <--- L1
//! ```
use rust_matrix_lib::state_map::StateMap;
use string_cache::DefaultAtom as Atom;
use std::collections::BTreeMap;
use {collapse_state_maps, StateGroupEntry};
/// Holds information about a particular level.
struct Level {
/// The maximum size this level is allowed to be
max_length: usize,
/// The (approximate) current chain length of this level. This is equivalent
/// to recursively following `current`
current_chain_length: usize,
/// The head of this level
current: Option<i64>,
}
impl Level {
/// Creates a new Level with the given maximum length
pub fn new(max_length: usize) -> Level {
Level {
max_length,
current_chain_length: 0,
current: None,
}
}
/// Update the current head of this level. If delta is true then it means
/// that given state group will (probably) reference the previous head.
///
/// Panics if `delta` is true and the level is already full.
pub fn update(&mut self, current: i64, delta: bool) {
self.current = Some(current);
if delta {
// If we're referencing the previous head then increment our chain
// length estimate
if !self.has_space() {
panic!("Tried to add to a already full level");
}
self.current_chain_length += 1;
} else {
// Otherwise, we've started a new chain with a single entry.
self.current_chain_length = 1;
}
}
/// Get the current head of the level
pub fn get_current(&self) -> Option<i64> {
self.current
}
/// Whether there is space in the current chain at this level. If not then a
/// new chain should be started.
pub fn has_space(&self) -> bool {
self.current_chain_length < self.max_length
}
}
/// Keeps track of some statistics of a compression run.
#[derive(Default)]
pub struct Stats {
/// How many state groups we couldn't find a delta for, despite trying.
pub resets_no_suitable_prev: usize,
/// The sum of the rows of the state groups counted by
/// `resets_no_suitable_prev`.
pub resets_no_suitable_prev_size: usize,
/// How many state groups we have changed.
pub state_groups_changed: usize,
}
/// Attempts to compress a set of state deltas using the given level sizes.
pub struct Compressor<'a> {
original_state_map: &'a BTreeMap<i64, StateGroupEntry>,
pub new_state_group_map: BTreeMap<i64, StateGroupEntry>,
levels: Vec<Level>,
pub stats: Stats,
}
impl<'a> Compressor<'a> {
/// Creates a compressor and runs the compression algorithm.
pub fn compress(
original_state_map: &'a BTreeMap<i64, StateGroupEntry>,
level_sizes: &[usize],
) -> Compressor<'a> {
let mut compressor = Compressor {
original_state_map,
new_state_group_map: BTreeMap::new(),
levels: level_sizes.iter().map(|size| Level::new(*size)).collect(),
stats: Stats::default(),
};
compressor.create_new_tree();
compressor
}
/// Actually runs the compression algorithm
fn create_new_tree(&mut self) {
if !self.new_state_group_map.is_empty() {
panic!("Can only call `create_new_tree` once");
}
for (&state_group, entry) in self.original_state_map {
let mut prev_state_group = None;
for level in &mut self.levels {
if level.has_space() {
prev_state_group = level.get_current();
level.update(state_group, true);
break;
} else {
level.update(state_group, false);
}
}
let (delta, prev_state_group) = if entry.prev_state_group == prev_state_group {
(entry.state_map.clone(), prev_state_group)
} else {
self.stats.state_groups_changed += 1;
self.get_delta(prev_state_group, state_group)
};
self.new_state_group_map.insert(
state_group,
StateGroupEntry {
prev_state_group,
state_map: delta,
},
);
}
}
/// Attempts to calculate the delta between two state groups.
///
/// This is not always possible if the given candidate previous state group
/// have state keys that are not in the new state group. In this case the
/// function will try and iterate back up the current tree to find a state
/// group that can be used as a base for a delta.
///
/// Returns the state map and the actual base state group (if any) used.
fn get_delta(&mut self, prev_sg: Option<i64>, sg: i64) -> (StateMap<Atom>, Option<i64>) {
let state_map = collapse_state_maps(&self.original_state_map, sg);
let mut prev_sg = if let Some(prev_sg) = prev_sg {
prev_sg
} else {
return (state_map, None);
};
// This is a loop to go through to find the first prev_sg which can be
// a valid base for the state group.
let mut prev_state_map;
'outer: loop {
prev_state_map = collapse_state_maps(&self.original_state_map, prev_sg);
for (t, s) in prev_state_map.keys() {
if !state_map.contains_key(t, s) {
// This is not a valid base as it contains key the new state
// group doesn't have. Attempt to walk up the tree to find a
// better base.
if let Some(psg) = self.new_state_group_map[&prev_sg].prev_state_group {
prev_sg = psg;
continue 'outer;
}
// Couldn't find a new base, so we give up and just persist
// a full state group here.
self.stats.resets_no_suitable_prev += 1;
self.stats.resets_no_suitable_prev_size += state_map.len();
return (state_map, None);
}
}
break;
}
// We've found a valid base, now we just need to calculate the delta.
let mut delta_map = StateMap::new();
for ((t, s), e) in state_map.iter() {
if prev_state_map.get(t, s) != Some(e) {
delta_map.insert(t, s, e.clone());
}
}
(delta_map, Some(prev_sg))
}
}
#[test]
fn test_new_map() {
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let mut prev = None;
for i in 0i64..=13i64 {
initial.insert(
i,
StateGroupEntry {
prev_state_group: prev,
state_map: StateMap::new(),
},
);
prev = Some(i)
}
let compressor = Compressor::compress(&initial, &[3, 3]);
let new_state = compressor.new_state_group_map;
let expected_edges: BTreeMap<i64, i64> = vec![
(1, 0),
(2, 1),
(4, 3),
(5, 4),
(6, 3),
(7, 6),
(8, 7),
(9, 6),
(10, 9),
(11, 10),
(13, 12),
].into_iter()
.collect();
for sg in 0i64..=13i64 {
assert_eq!(
expected_edges.get(&sg).cloned(),
new_state[&sg].prev_state_group,
"state group {} did not match expected",
sg,
);
}
}