Added option to only run the compressor on a range of state groups in a room (#44)
This commit is contained in:
203
src/database.rs
203
src/database.rs
@@ -25,40 +25,62 @@ use super::StateGroupEntry;
|
||||
/// specific room.
|
||||
///
|
||||
/// - Connects to the database
|
||||
/// - Fetches rows with group id lower than max
|
||||
/// - Fetches the first [group] rows with group id after [min]
|
||||
/// - Recursively searches for missing predecessors and adds those
|
||||
///
|
||||
/// Returns with the state_group map and the id of the last group that was used
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `room_id` - The ID of the room in the database
|
||||
/// * `db_url` - The URL of a Postgres database. This should be of the
|
||||
/// form: "postgresql://user:pass@domain:port/database"
|
||||
/// * `max_state_group` - If specified, then only fetch the entries for state
|
||||
/// groups lower than or equal to this number. (N.B. all
|
||||
/// predecessors are also fetched)
|
||||
/// * `room_id` - The ID of the room in the database
|
||||
/// * `db_url` - The URL of a Postgres database. This should be of the
|
||||
/// form: "postgresql://user:pass@domain:port/database"
|
||||
/// * `min_state_group` - If specified, then only fetch the entries for state
|
||||
/// groups greater than (but not equal) to this number. It
|
||||
/// also requires groups_to_compress to be specified
|
||||
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
|
||||
/// * `max_state_group` - If specified, then only fetch the entries for state
|
||||
/// groups lower than or equal to this number.
|
||||
pub fn get_data_from_db(
|
||||
db_url: &str,
|
||||
room_id: &str,
|
||||
min_state_group: Option<i64>,
|
||||
groups_to_compress: Option<i64>,
|
||||
max_state_group: Option<i64>,
|
||||
) -> BTreeMap<i64, StateGroupEntry> {
|
||||
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
|
||||
// connect to the database
|
||||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
let connector = MakeTlsConnector::new(builder.build());
|
||||
|
||||
let mut client = Client::connect(db_url, connector).unwrap();
|
||||
|
||||
let mut state_group_map = get_initial_data_from_db(&mut client, room_id, max_state_group);
|
||||
// Search for the group id of the groups_to_compress'th group after min_state_group
|
||||
// If this is saved, then the compressor can continue by having min_state_group being
|
||||
// set to this maximum
|
||||
let max_group_found = find_max_group(
|
||||
&mut client,
|
||||
room_id,
|
||||
min_state_group,
|
||||
groups_to_compress,
|
||||
max_state_group,
|
||||
);
|
||||
|
||||
let mut state_group_map =
|
||||
get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found);
|
||||
|
||||
println!("Got initial state from database. Checking for any missing state groups...");
|
||||
|
||||
// Due to reasons some of the state groups appear in the edges table, but
|
||||
// not in the state_groups_state table. This means they don't get included
|
||||
// in our DB queries, so we have to fetch any missing groups explicitly.
|
||||
// not in the state_groups_state table.
|
||||
//
|
||||
// Also it is likely that the predecessor of a node will not be within the
|
||||
// chunk that was specified by min_state_group and groups_to_compress.
|
||||
// This means they don't get included in our DB queries, so we have to fetch
|
||||
// any missing groups explicitly.
|
||||
//
|
||||
// Since the returned groups may themselves reference groups we don't have,
|
||||
// we need to do this recursively until we don't find any more missing.
|
||||
//
|
||||
// N.B. This does NOT currently fetch the deltas for the missing groups!
|
||||
// By carefully chosen max_state_group this might cause issues...?
|
||||
loop {
|
||||
let mut missing_sgs: Vec<_> = state_group_map
|
||||
.iter()
|
||||
@@ -76,41 +98,92 @@ pub fn get_data_from_db(
|
||||
.collect();
|
||||
|
||||
if missing_sgs.is_empty() {
|
||||
println!("No missing state groups");
|
||||
// println!("No missing state groups");
|
||||
break;
|
||||
}
|
||||
|
||||
missing_sgs.sort_unstable();
|
||||
missing_sgs.dedup();
|
||||
|
||||
println!("Missing {} state groups", missing_sgs.len());
|
||||
// println!("Missing {} state groups", missing_sgs.len());
|
||||
|
||||
let map = get_missing_from_db(&mut client, &missing_sgs);
|
||||
state_group_map.extend(map.into_iter());
|
||||
// find state groups not picked up already and add them to the map
|
||||
let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found);
|
||||
for (k, v) in map {
|
||||
state_group_map.entry(k).or_insert(v);
|
||||
}
|
||||
}
|
||||
|
||||
state_group_map
|
||||
(state_group_map, max_group_found)
|
||||
}
|
||||
|
||||
/// Returns the group ID of the last group to be compressed
|
||||
///
|
||||
/// This can be saved so that future runs of the compressor only
|
||||
/// continue from after this point
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client` - A Postgres client to make requests with
|
||||
/// * `room_id` - The ID of the room in the database
|
||||
/// * `min_state_group` - The lower limit (non inclusive) of group id's to compress
|
||||
/// * 'groups_to_compress' - How many groups to compress
|
||||
/// * `max_state_group` - The upper bound on what this method can return
|
||||
fn find_max_group(
|
||||
client: &mut Client,
|
||||
room_id: &str,
|
||||
min_state_group: Option<i64>,
|
||||
groups_to_compress: Option<i64>,
|
||||
max_state_group: Option<i64>,
|
||||
) -> i64 {
|
||||
// Get list of state_id's in a certain room
|
||||
let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string();
|
||||
let params: Vec<&(dyn ToSql + Sync)>;
|
||||
|
||||
if let Some(max) = max_state_group {
|
||||
query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max)
|
||||
}
|
||||
|
||||
// Adds additional constraint if a groups_to_compress has been specified
|
||||
if min_state_group.is_some() && groups_to_compress.is_some() {
|
||||
params = vec![&room_id, &min_state_group, &groups_to_compress];
|
||||
query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids);
|
||||
} else {
|
||||
params = vec![&room_id];
|
||||
query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids);
|
||||
}
|
||||
|
||||
let sql_query = format!(
|
||||
"SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1",
|
||||
query_chunk_of_ids
|
||||
);
|
||||
let final_row = client.query(sql_query.as_str(), ¶ms).unwrap();
|
||||
|
||||
final_row.last().unwrap().get(0)
|
||||
}
|
||||
|
||||
/// Fetch the entries in state_groups_state and immediate predecessors for
|
||||
/// a specific room.
|
||||
///
|
||||
/// - Fetches rows with group id lower than max
|
||||
/// - Fetches first [groups_to_compress] rows with group id higher than min
|
||||
/// - Stores the group id, predecessor id and deltas into a map
|
||||
/// - returns map and maximum row that was considered
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client` - A Postgres client to make requests with
|
||||
/// * `room_id` - The ID of the room in the database
|
||||
/// * `max_state_group` - If specified, then only fetch the entries for state
|
||||
/// groups lower than or equal to this number. (N.B. doesn't
|
||||
/// fetch IMMEDIATE predecessors if ID is above this number)
|
||||
/// * `min_state_group` - If specified, then only fetch the entries for state
|
||||
/// groups greater than (but not equal) to this number. It
|
||||
/// also requires groups_to_compress to be specified
|
||||
/// * 'max_group_found' - The upper limit on state_groups ids to get from the database
|
||||
fn get_initial_data_from_db(
|
||||
client: &mut Client,
|
||||
room_id: &str,
|
||||
max_state_group: Option<i64>,
|
||||
min_state_group: Option<i64>,
|
||||
max_group_found: i64,
|
||||
) -> BTreeMap<i64, StateGroupEntry> {
|
||||
// Query to get id, predecessor and delta for each state group
|
||||
// Query to get id, predecessor and deltas for each state group
|
||||
let sql = r#"
|
||||
SELECT m.id, prev_state_group, type, state_key, s.event_id
|
||||
FROM state_groups AS m
|
||||
@@ -119,18 +192,21 @@ fn get_initial_data_from_db(
|
||||
WHERE m.room_id = $1
|
||||
"#;
|
||||
|
||||
// Adds additional constraint if a max_state_group has been specified
|
||||
// Then sends query to the datatbase
|
||||
let mut rows = if let Some(s) = max_state_group {
|
||||
let params: Vec<&dyn ToSql> = vec![&room_id, &s];
|
||||
client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params)
|
||||
// Adds additional constraint if minimum state_group has been specified.
|
||||
// note that the maximum group only affects queries if there is also a minimum
|
||||
// otherwise it is assumed that ALL groups should be fetched
|
||||
let mut rows = if let Some(min) = min_state_group {
|
||||
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found];
|
||||
client.query_raw(
|
||||
format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(),
|
||||
params,
|
||||
)
|
||||
} else {
|
||||
client.query_raw(sql, &[room_id])
|
||||
}
|
||||
.unwrap();
|
||||
|
||||
// Copy the data from the database into a map
|
||||
|
||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
||||
|
||||
let pb = ProgressBar::new_spinner();
|
||||
@@ -143,8 +219,10 @@ fn get_initial_data_from_db(
|
||||
// The row in the map to copy the data to
|
||||
let entry = state_group_map.entry(row.get(0)).or_default();
|
||||
|
||||
// Save the predecessor (this may already be there)
|
||||
// Save the predecessor and mark for compression (this may already be there)
|
||||
// TODO: slightly fewer redundant rewrites
|
||||
entry.prev_state_group = row.get(1);
|
||||
entry.in_range = true;
|
||||
|
||||
// Copy the single delta from the predecessor stored in this row
|
||||
if let Some(etype) = row.get::<_, Option<String>>(2) {
|
||||
@@ -172,34 +250,57 @@ fn get_initial_data_from_db(
|
||||
///
|
||||
/// * `client` - A Postgres client to make requests with
|
||||
/// * `missing_sgs` - An array of missing state_group ids
|
||||
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
|
||||
let mut rows = client
|
||||
.query_raw(
|
||||
r#"
|
||||
SELECT state_group, prev_state_group
|
||||
FROM state_group_edges
|
||||
WHERE state_group = ANY($1)
|
||||
"#,
|
||||
&[missing_sgs],
|
||||
)
|
||||
.unwrap();
|
||||
/// * 'min_state_group' - Minimum state_group id to mark as in range
|
||||
/// * 'max_group_found' - Maximum state_group id to mark as in range
|
||||
fn get_missing_from_db(
|
||||
client: &mut Client,
|
||||
missing_sgs: &[i64],
|
||||
min_state_group: Option<i64>,
|
||||
max_group_found: i64,
|
||||
) -> BTreeMap<i64, StateGroupEntry> {
|
||||
// "Due to reasons" it is possible that some states only appear in edges table and not in state_groups table
|
||||
// so since we know the IDs we're looking for as they are the missing predecessors, we can find them by
|
||||
// left joining onto the edges table (instead of the state_group table!)
|
||||
let sql = r#"
|
||||
SELECT target.prev_state_group, source.prev_state_group, state.type, state.state_key, state.event_id
|
||||
FROM state_group_edges AS target
|
||||
LEFT JOIN state_group_edges AS source ON (target.prev_state_group = source.state_group)
|
||||
LEFT JOIN state_groups_state AS state ON (target.prev_state_group = state.state_group)
|
||||
WHERE target.prev_state_group = ANY($1)
|
||||
"#;
|
||||
|
||||
// initialise the map with empty entries (the missing group may not
|
||||
// have a prev_state_group either)
|
||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = missing_sgs
|
||||
.iter()
|
||||
.map(|sg| (*sg, StateGroupEntry::default()))
|
||||
.collect();
|
||||
let mut rows = client.query_raw(sql, &[missing_sgs]).unwrap();
|
||||
|
||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
||||
|
||||
while let Some(row) = rows.next().unwrap() {
|
||||
let state_group = row.get(0);
|
||||
let entry = state_group_map.get_mut(&state_group).unwrap();
|
||||
let id = row.get(0);
|
||||
// The row in the map to copy the data to
|
||||
let entry = state_group_map.entry(id).or_default();
|
||||
|
||||
// Save the predecessor and mark for compression (this may already be there)
|
||||
// Also may well not exist!
|
||||
entry.prev_state_group = row.get(1);
|
||||
if let Some(min) = min_state_group {
|
||||
if min < id && id <= max_group_found {
|
||||
entry.in_range = true
|
||||
}
|
||||
}
|
||||
|
||||
// Copy the single delta from the predecessor stored in this row
|
||||
if let Some(etype) = row.get::<_, Option<String>>(2) {
|
||||
entry.state_map.insert(
|
||||
&etype,
|
||||
&row.get::<_, String>(3),
|
||||
row.get::<_, String>(4).into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
state_group_map
|
||||
}
|
||||
|
||||
// TODO: find a library that has an existing safe postgres escape function
|
||||
/// Helper function that escapes the wrapped text when writing SQL
|
||||
pub struct PGEscape<'a>(pub &'a str);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user