Add documentation (#41)
Added documentation to lib.rs and database.rs files
This commit is contained in:
@@ -21,8 +21,21 @@ use std::{borrow::Cow, collections::BTreeMap, fmt};
|
|||||||
|
|
||||||
use super::StateGroupEntry;
|
use super::StateGroupEntry;
|
||||||
|
|
||||||
/// Fetch the entries in state_groups_state (and their prev groups) for the
|
/// Fetch the entries in state_groups_state (and their prev groups) for a
|
||||||
/// given `room_id` by connecting to the postgres database at `db_url`.
|
/// specific room.
|
||||||
|
///
|
||||||
|
/// - Connects to the database
|
||||||
|
/// - Fetches rows with group id lower than max
|
||||||
|
/// - Recursively searches for missing predecessors and adds those
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `room_id` - The ID of the room in the database
|
||||||
|
/// * `db_url` - The URL of a Postgres database. This should be of the
|
||||||
|
/// form: "postgresql://user:pass@domain:port/database"
|
||||||
|
/// * `max_state_group` - If specified, then only fetch the entries for state
|
||||||
|
/// groups lower than or equal to this number. (N.B. all
|
||||||
|
/// predecessors are also fetched)
|
||||||
pub fn get_data_from_db(
|
pub fn get_data_from_db(
|
||||||
db_url: &str,
|
db_url: &str,
|
||||||
room_id: &str,
|
room_id: &str,
|
||||||
@@ -43,6 +56,9 @@ pub fn get_data_from_db(
|
|||||||
// in our DB queries, so we have to fetch any missing groups explicitly.
|
// in our DB queries, so we have to fetch any missing groups explicitly.
|
||||||
// Since the returned groups may themselves reference groups we don't have,
|
// Since the returned groups may themselves reference groups we don't have,
|
||||||
// we need to do this recursively until we don't find any more missing.
|
// we need to do this recursively until we don't find any more missing.
|
||||||
|
//
|
||||||
|
// N.B. This does NOT currently fetch the deltas for the missing groups!
|
||||||
|
// By carefully chosen max_state_group this might cause issues...?
|
||||||
loop {
|
loop {
|
||||||
let mut missing_sgs: Vec<_> = state_group_map
|
let mut missing_sgs: Vec<_> = state_group_map
|
||||||
.iter()
|
.iter()
|
||||||
@@ -76,13 +92,25 @@ pub fn get_data_from_db(
|
|||||||
state_group_map
|
state_group_map
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch the entries in state_groups_state (and their prev groups) for the
|
/// Fetch the entries in state_groups_state and immediate predecessors for
|
||||||
/// given `room_id` by fetching all state with the given `room_id`.
|
/// a specific room.
|
||||||
|
///
|
||||||
|
/// - Fetches rows with group id lower than max
|
||||||
|
/// - Stores the group id, predecessor id and deltas into a map
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `client` - A Postgres client to make requests with
|
||||||
|
/// * `room_id` - The ID of the room in the database
|
||||||
|
/// * `max_state_group` - If specified, then only fetch the entries for state
|
||||||
|
/// groups lower than or equal to this number. (N.B. doesn't
|
||||||
|
/// fetch IMMEDIATE predecessors if ID is above this number)
|
||||||
fn get_initial_data_from_db(
|
fn get_initial_data_from_db(
|
||||||
client: &mut Client,
|
client: &mut Client,
|
||||||
room_id: &str,
|
room_id: &str,
|
||||||
max_state_group: Option<i64>,
|
max_state_group: Option<i64>,
|
||||||
) -> BTreeMap<i64, StateGroupEntry> {
|
) -> BTreeMap<i64, StateGroupEntry> {
|
||||||
|
// Query to get id, predecessor and delta for each state group
|
||||||
let sql = r#"
|
let sql = r#"
|
||||||
SELECT m.id, prev_state_group, type, state_key, s.event_id
|
SELECT m.id, prev_state_group, type, state_key, s.event_id
|
||||||
FROM state_groups AS m
|
FROM state_groups AS m
|
||||||
@@ -91,6 +119,8 @@ fn get_initial_data_from_db(
|
|||||||
WHERE m.room_id = $1
|
WHERE m.room_id = $1
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
|
// Adds additional constraint if a max_state_group has been specified
|
||||||
|
// Then sends query to the datatbase
|
||||||
let mut rows = if let Some(s) = max_state_group {
|
let mut rows = if let Some(s) = max_state_group {
|
||||||
let params: Vec<&dyn ToSql> = vec![&room_id, &s];
|
let params: Vec<&dyn ToSql> = vec![&room_id, &s];
|
||||||
client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params)
|
client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params)
|
||||||
@@ -99,6 +129,8 @@ fn get_initial_data_from_db(
|
|||||||
}
|
}
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Copy the data from the database into a map
|
||||||
|
|
||||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
||||||
|
|
||||||
let pb = ProgressBar::new_spinner();
|
let pb = ProgressBar::new_spinner();
|
||||||
@@ -108,10 +140,13 @@ fn get_initial_data_from_db(
|
|||||||
pb.enable_steady_tick(100);
|
pb.enable_steady_tick(100);
|
||||||
|
|
||||||
while let Some(row) = rows.next().unwrap() {
|
while let Some(row) = rows.next().unwrap() {
|
||||||
|
// The row in the map to copy the data to
|
||||||
let entry = state_group_map.entry(row.get(0)).or_default();
|
let entry = state_group_map.entry(row.get(0)).or_default();
|
||||||
|
|
||||||
|
// Save the predecessor (this may already be there)
|
||||||
entry.prev_state_group = row.get(1);
|
entry.prev_state_group = row.get(1);
|
||||||
|
|
||||||
|
// Copy the single delta from the predecessor stored in this row
|
||||||
if let Some(etype) = row.get::<_, Option<String>>(2) {
|
if let Some(etype) = row.get::<_, Option<String>>(2) {
|
||||||
entry.state_map.insert(
|
entry.state_map.insert(
|
||||||
&etype,
|
&etype,
|
||||||
@@ -129,7 +164,14 @@ fn get_initial_data_from_db(
|
|||||||
state_group_map
|
state_group_map
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get any missing state groups from the database
|
/// Finds the predecessors of missing state groups
|
||||||
|
///
|
||||||
|
/// N.B. this does NOT find their deltas
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `client` - A Postgres client to make requests with
|
||||||
|
/// * `missing_sgs` - An array of missing state_group ids
|
||||||
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
|
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
|
||||||
let mut rows = client
|
let mut rows = client
|
||||||
.query_raw(
|
.query_raw(
|
||||||
|
|||||||
110
src/lib.rs
110
src/lib.rs
@@ -16,15 +16,9 @@
|
|||||||
//! Synapse instance's database. Specifically, it aims to reduce the number of
|
//! Synapse instance's database. Specifically, it aims to reduce the number of
|
||||||
//! rows that a given room takes up in the `state_groups_state` table.
|
//! rows that a given room takes up in the `state_groups_state` table.
|
||||||
|
|
||||||
mod compressor;
|
|
||||||
mod database;
|
|
||||||
|
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||||
|
|
||||||
use compressor::Compressor;
|
|
||||||
use database::PGEscape;
|
|
||||||
|
|
||||||
use clap::{
|
use clap::{
|
||||||
crate_authors, crate_description, crate_name, crate_version, value_t_or_exit, App, Arg,
|
crate_authors, crate_description, crate_name, crate_version, value_t_or_exit, App, Arg,
|
||||||
};
|
};
|
||||||
@@ -34,6 +28,12 @@ use state_map::StateMap;
|
|||||||
use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr};
|
use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr};
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
|
mod compressor;
|
||||||
|
mod database;
|
||||||
|
|
||||||
|
use compressor::Compressor;
|
||||||
|
use database::PGEscape;
|
||||||
|
|
||||||
/// An entry for a state group. Consists of an (optional) previous group and the
|
/// An entry for a state group. Consists of an (optional) previous group and the
|
||||||
/// delta from that previous group (or the full state if no previous group)
|
/// delta from that previous group (or the full state if no previous group)
|
||||||
#[derive(Default, Debug, Clone, PartialEq, Eq)]
|
#[derive(Default, Debug, Clone, PartialEq, Eq)]
|
||||||
@@ -42,33 +42,6 @@ pub struct StateGroupEntry {
|
|||||||
state_map: StateMap<Atom>,
|
state_map: StateMap<Atom>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the full state for a given group from the map (of deltas)
|
|
||||||
fn collapse_state_maps(map: &BTreeMap<i64, StateGroupEntry>, state_group: i64) -> StateMap<Atom> {
|
|
||||||
let mut entry = &map[&state_group];
|
|
||||||
let mut state_map = StateMap::new();
|
|
||||||
|
|
||||||
let mut stack = vec![state_group];
|
|
||||||
|
|
||||||
while let Some(prev_state_group) = entry.prev_state_group {
|
|
||||||
stack.push(prev_state_group);
|
|
||||||
if !map.contains_key(&prev_state_group) {
|
|
||||||
panic!("Missing {}", prev_state_group);
|
|
||||||
}
|
|
||||||
entry = &map[&prev_state_group];
|
|
||||||
}
|
|
||||||
|
|
||||||
for sg in stack.iter().rev() {
|
|
||||||
state_map.extend(
|
|
||||||
map[&sg]
|
|
||||||
.state_map
|
|
||||||
.iter()
|
|
||||||
.map(|((t, s), e)| ((t, s), e.clone())),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
state_map
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Helper struct for parsing the `level_sizes` argument.
|
/// Helper struct for parsing the `level_sizes` argument.
|
||||||
struct LevelSizes(Vec<usize>);
|
struct LevelSizes(Vec<usize>);
|
||||||
|
|
||||||
@@ -89,6 +62,7 @@ impl FromStr for LevelSizes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Contains configuration information for this run of the compressor
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
db_url: String,
|
db_url: String,
|
||||||
output_file: Option<File>,
|
output_file: Option<File>,
|
||||||
@@ -100,6 +74,7 @@ pub struct Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
/// Build up config from command line arguments
|
||||||
pub fn parse_arguments() -> Config {
|
pub fn parse_arguments() -> Config {
|
||||||
let matches = App::new(crate_name!())
|
let matches = App::new(crate_name!())
|
||||||
.version(crate_version!())
|
.version(crate_version!())
|
||||||
@@ -199,9 +174,22 @@ impl Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(mut config: Config) {
|
/// Runs through the steps of the compression:
|
||||||
// let mut config = Config::parse_arguments();
|
///
|
||||||
|
/// - Fetches current state groups for a room and their predecessors
|
||||||
|
/// - Outputs #state groups and #lines in table they occupy
|
||||||
|
/// - Runs the compressor to produce a new predecessor mapping
|
||||||
|
/// - Outputs #lines in table that the new mapping would occupy
|
||||||
|
/// - Outputs info about how the compressor got on
|
||||||
|
/// - Checks that number of lines saved is greater than threshold
|
||||||
|
/// - Ensures new mapping doesn't affect actual state contents
|
||||||
|
/// - Produces SQL code to carry out changes and saves it to file
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `config: Config` - A Config struct that controlls the run
|
||||||
|
|
||||||
|
pub fn run(mut config: Config) {
|
||||||
// First we need to get the current state groups
|
// First we need to get the current state groups
|
||||||
println!("Fetching state from DB for room '{}'...", config.room_id);
|
println!("Fetching state from DB for room '{}'...", config.room_id);
|
||||||
|
|
||||||
@@ -272,6 +260,17 @@ pub fn run(mut config: Config) {
|
|||||||
output_sql(&mut config, &state_group_map, &new_state_group_map);
|
output_sql(&mut config, &state_group_map, &new_state_group_map);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Produces SQL code to carry out changes and saves it to file
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `config` - A Config struct that contains information
|
||||||
|
/// about the run. It's mutable because it contains
|
||||||
|
/// the pointer to the output file (which needs to
|
||||||
|
/// be mutable for the file to be written to)
|
||||||
|
/// * `old_map` - The state group data originally in the database
|
||||||
|
/// * `new_map` - The state group data generated by the compressor to
|
||||||
|
/// replace replace the old contents
|
||||||
fn output_sql(
|
fn output_sql(
|
||||||
config: &mut Config,
|
config: &mut Config,
|
||||||
old_map: &BTreeMap<i64, StateGroupEntry>,
|
old_map: &BTreeMap<i64, StateGroupEntry>,
|
||||||
@@ -353,6 +352,20 @@ fn output_sql(
|
|||||||
pb.finish();
|
pb.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compares two sets of state groups
|
||||||
|
///
|
||||||
|
/// A state group entry contains a predecessor state group and a delta.
|
||||||
|
/// The complete contents of a certain state group can be calculated by
|
||||||
|
/// following this chain of predecessors back to some empty state and
|
||||||
|
/// combining all the deltas together. This is called "collapsing".
|
||||||
|
///
|
||||||
|
/// This function confirms that two state groups mappings lead to the
|
||||||
|
/// exact same entries for each state group after collapsing them down.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `old_map` - The state group data currently in the database
|
||||||
|
/// * `new_map` - The state group data that the old_map is being compared
|
||||||
|
/// to
|
||||||
fn check_that_maps_match(
|
fn check_that_maps_match(
|
||||||
old_map: &BTreeMap<i64, StateGroupEntry>,
|
old_map: &BTreeMap<i64, StateGroupEntry>,
|
||||||
new_map: &BTreeMap<i64, StateGroupEntry>,
|
new_map: &BTreeMap<i64, StateGroupEntry>,
|
||||||
@@ -391,3 +404,30 @@ fn check_that_maps_match(
|
|||||||
|
|
||||||
println!("New state map matches old one");
|
println!("New state map matches old one");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Gets the full state for a given group from the map (of deltas)
|
||||||
|
fn collapse_state_maps(map: &BTreeMap<i64, StateGroupEntry>, state_group: i64) -> StateMap<Atom> {
|
||||||
|
let mut entry = &map[&state_group];
|
||||||
|
let mut state_map = StateMap::new();
|
||||||
|
|
||||||
|
let mut stack = vec![state_group];
|
||||||
|
|
||||||
|
while let Some(prev_state_group) = entry.prev_state_group {
|
||||||
|
stack.push(prev_state_group);
|
||||||
|
if !map.contains_key(&prev_state_group) {
|
||||||
|
panic!("Missing {}", prev_state_group);
|
||||||
|
}
|
||||||
|
entry = &map[&prev_state_group];
|
||||||
|
}
|
||||||
|
|
||||||
|
for sg in stack.iter().rev() {
|
||||||
|
state_map.extend(
|
||||||
|
map[&sg]
|
||||||
|
.state_map
|
||||||
|
.iter()
|
||||||
|
.map(|((t, s), e)| ((t, s), e.clone())),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
state_map
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user