Add method to run the compressor starting from a particular compressor-state (#55)

This commit is contained in:
Azrenbeth
2021-09-13 10:25:52 +01:00
committed by GitHub
parent d32f49303b
commit 4c3d6bd346
4 changed files with 370 additions and 18 deletions

View File

@@ -19,17 +19,13 @@ use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{borrow::Cow, collections::BTreeMap, fmt};
use crate::{generate_sql, Config};
use crate::{compressor::Level, generate_sql};
use super::StateGroupEntry;
/// Fetch the entries in state_groups_state (and their prev groups) for a
/// specific room.
///
/// - Connects to the database
/// - Fetches the first [group] rows with group id after [min]
/// - Recursively searches for missing predecessors and adds those
///
/// Returns with the state_group map and the id of the last group that was used
///
/// # Arguments
@@ -41,8 +37,7 @@ use super::StateGroupEntry;
/// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number.
pub fn get_data_from_db(
db_url: &str,
room_id: &str,
@@ -58,19 +53,165 @@ pub fn get_data_from_db(
let mut client = Client::connect(db_url, connector)
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
load_map_from_db(
&mut client,
room_id,
min_state_group,
groups_to_compress,
max_state_group,
state_group_map,
)
}
/// Fetch the entries in state_groups_state (and their prev groups) for a
/// specific room. This method should only be called if resuming the compressor from
/// where it last finished - and as such also loads in the state groups from the heads
/// of each of the levels (as they were at the end of the last run of the compressor)
///
/// Returns with the state_group map and the id of the last group that was used
///
/// # Arguments
///
/// * `room_id` - The ID of the room in the database
/// * `db_url` - The URL of a Postgres database. This should be of the
/// form: "postgresql://user:pass@domain:port/database"
/// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number.
/// * 'level_info' - The maximum size, current length and current head for each
/// level (as it was when the compressor last finished for this
/// room)
pub fn reload_data_from_db(
db_url: &str,
room_id: &str,
min_state_group: Option<i64>,
groups_to_compress: Option<i64>,
level_info: &[Level],
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
// connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let mut client = Client::connect(db_url, connector)
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
// load just the state_groups at the head of each level
// this doesn't load their predecessors as that will be done at the end of
// load_map_from_db()
let state_group_map: BTreeMap<i64, StateGroupEntry> = load_level_heads(&mut client, level_info);
load_map_from_db(
&mut client,
room_id,
min_state_group,
groups_to_compress,
// max state group not used when saving and loading
None,
state_group_map,
)
}
/// Finds the state_groups that are at the head of each compressor level
/// NOTE this does not also retrieve their predecessors
///
/// # Arguments
///
/// * `client' - A Postgres client to make requests with
/// * `levels' - The levels who's heads are being requested
fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap<i64, StateGroupEntry> {
// obtain all of the heads that aren't None from level_info
let level_heads: Vec<i64> = level_info
.iter()
.filter_map(|l| (*l).get_current())
.collect();
// Query to get id, predecessor and deltas for each state group
let sql = r#"
SELECT m.id, prev_state_group, type, state_key, s.event_id
FROM state_groups AS m
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
WHERE m.id = ANY($1)
"#;
// Actually do the query
let mut rows = client.query_raw(sql, &[&level_heads]).unwrap();
// Copy the data from the database into a map
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
while let Some(row) = rows.next().unwrap() {
// The row in the map to copy the data to
// NOTE: default StateGroupEntry has in_range as false
// This is what we want since as a level head, it has already been compressed by the
// previous run!
let entry = state_group_map.entry(row.get(0)).or_default();
// Save the predecessor (this may already be there)
entry.prev_state_group = row.get(1);
// Copy the single delta from the predecessor stored in this row
if let Some(etype) = row.get::<_, Option<String>>(2) {
entry.state_map.insert(
&etype,
&row.get::<_, String>(3),
row.get::<_, String>(4).into(),
);
}
}
state_group_map
}
/// Fetch the entries in state_groups_state (and their prev groups) for a
/// specific room within a certain range. These are appended onto the provided
/// map.
///
/// - Fetches the first [group] rows with group id after [min]
/// - Recursively searches for missing predecessors and adds those
///
/// Returns with the state_group map and the id of the last group that was used
///
/// # Arguments
///
/// * `client` - A Postgres client to make requests with
/// * `room_id` - The ID of the room in the database
/// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
/// * 'state_group_map' - The map to populate with the entries from the database
fn load_map_from_db(
client: &mut Client,
room_id: &str,
min_state_group: Option<i64>,
groups_to_compress: Option<i64>,
max_state_group: Option<i64>,
mut state_group_map: BTreeMap<i64, StateGroupEntry>,
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
// Search for the group id of the groups_to_compress'th group after min_state_group
// If this is saved, then the compressor can continue by having min_state_group being
// set to this maximum
let max_group_found = find_max_group(
&mut client,
client,
room_id,
min_state_group,
groups_to_compress,
max_state_group,
);
let mut state_group_map =
get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found);
state_group_map.append(&mut get_initial_data_from_db(
client,
room_id,
min_state_group,
max_group_found,
));
println!("Got initial state from database. Checking for any missing state groups...");
@@ -111,7 +252,7 @@ pub fn get_data_from_db(
// println!("Missing {} state groups", missing_sgs.len());
// find state groups not picked up already and add them to the map
let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found);
let map = get_missing_from_db(client, &missing_sgs, min_state_group, max_group_found);
for (k, v) in map {
state_group_map.entry(k).or_insert(v);
}
@@ -354,7 +495,8 @@ fn test_pg_escape() {
/// * `new_map` - The state group data generated by the compressor to
/// replace replace the old contents
pub fn send_changes_to_db(
config: &Config,
db_url: &str,
room_id: &str,
old_map: &BTreeMap<i64, StateGroupEntry>,
new_map: &BTreeMap<i64, StateGroupEntry>,
) {
@@ -363,7 +505,7 @@ pub fn send_changes_to_db(
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let mut client = Client::connect(&config.db_url, connector).unwrap();
let mut client = Client::connect(db_url, connector).unwrap();
println!("Writing changes...");
@@ -375,7 +517,7 @@ pub fn send_changes_to_db(
pb.set_message("state groups");
pb.enable_steady_tick(100);
for sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
for sql_transaction in generate_sql(old_map, new_map, room_id) {
// commit this change to the database
// N.B. this is a synchronous library so will wait until finished before continueing...
// if want to speed up compressor then this might be a good place to start!