From a069d8765a81e1b5565b7444f35cfabb6921af64 Mon Sep 17 00:00:00 2001 From: Azrenbeth <77782548+Azrenbeth@users.noreply.github.com> Date: Mon, 27 Sep 2021 09:49:34 +0100 Subject: [PATCH] Add method that compresses the chunks with lowest uncompressed state_group ids (#72) --- auto_compressor/Cargo.toml | 2 +- auto_compressor/src/manager.rs | 88 ++++++++- auto_compressor/src/state_saving.rs | 70 +++++++ compressor_integration_tests/src/lib.rs | 1 + .../tests/auto_compressor_manager_tests.rs | 171 +++++++++++++++++- src/database.rs | 8 +- 6 files changed, 331 insertions(+), 9 deletions(-) diff --git a/auto_compressor/Cargo.toml b/auto_compressor/Cargo.toml index 0f35a5a..13e3874 100644 --- a/auto_compressor/Cargo.toml +++ b/auto_compressor/Cargo.toml @@ -27,4 +27,4 @@ crate-type = ["cdylib", "rlib"] [dependencies.pyo3] version = "0.14.1" -features = ["extension-module","abi3-py36"] +features = ["extension-module","abi3-py36"] diff --git a/auto_compressor/src/manager.rs b/auto_compressor/src/manager.rs index e1796ac..e72de5e 100644 --- a/auto_compressor/src/manager.rs +++ b/auto_compressor/src/manager.rs @@ -2,10 +2,11 @@ // of compression on the database. use crate::state_saving::{ - connect_to_database, read_room_compressor_state, write_room_compressor_state, + connect_to_database, create_tables_if_needed, get_next_room_to_compress, + read_room_compressor_state, write_room_compressor_state, }; -use anyhow::{Context, Result}; -use log::{debug, warn}; +use anyhow::{bail, Context, Result}; +use log::{debug, info, warn}; use synapse_compress_state::{continue_run, ChunkStats, Level}; /// Runs the compressor on a chunk of the room @@ -110,3 +111,84 @@ pub fn run_compressor_on_room_chunk( Ok(Some(chunk_stats)) } + +/// Runs the compressor in chunks on rooms with the lowest uncompressed state group ids +/// +/// # Arguments +/// +/// * `db_url` - The URL of the postgres database that synapse is using. +/// e.g. "postgresql://user:password@domain.com/synapse" +/// +/// * `chunk_size` - The number of state_groups to work on. All of the entries +/// from state_groups_state are requested from the database +/// for state groups that are worked on. Therefore small +/// chunk sizes may be needed on machines with low memory. +/// (Note: if the compressor fails to find space savings on the +/// chunk as a whole (which may well happen in rooms with lots +/// of backfill in) then the entire chunk is skipped.) +/// +/// * `default_levels` - If the compressor has never been run on this room before +/// Then we need to provide the compressor with some information +/// on what sort of compression structure we want. The default that +/// the library suggests is empty levels with max sizes of 100, 50 and 25 +/// +/// * `number_of_chunks`- The number of chunks to compress. The larger this number is, the longer +/// the compressor will run for. +pub fn compress_chunks_of_database( + db_url: &str, + chunk_size: i64, + default_levels: &[Level], + number_of_chunks: i64, +) -> Result<()> { + // connect to the database + let mut client = connect_to_database(db_url) + .with_context(|| format!("Failed to connect to database at {}", db_url))?; + + create_tables_if_needed(&mut client).context("Failed to create state compressor tables")?; + + let mut skipped_chunks = 0; + let mut rows_saved = 0; + let mut chunks_processed = 0; + + while chunks_processed < number_of_chunks { + let room_to_compress = get_next_room_to_compress(&mut client) + .context("Failed to work out what room to compress next")?; + + if room_to_compress.is_none() { + break; + } + + let room_to_compress = + room_to_compress.expect("Have checked that rooms_to_compress is not None"); + + info!( + "Running compressor on room {} with chunk size {}", + room_to_compress, chunk_size + ); + + let work_done = + run_compressor_on_room_chunk(db_url, &room_to_compress, chunk_size, default_levels)?; + + if let Some(ref chunk_stats) = work_done { + if chunk_stats.commited { + let savings = chunk_stats.original_num_rows - chunk_stats.new_num_rows; + rows_saved += chunk_stats.original_num_rows - chunk_stats.new_num_rows; + debug!("Saved {} rows for room {}", savings, room_to_compress); + } else { + skipped_chunks += 1; + debug!( + "Unable to make savings for room {}, skipping chunk", + room_to_compress + ); + } + chunks_processed += 1; + } else { + bail!("Ran the compressor on a room that had no more work to do!") + } + } + info!( + "Finished running compressor. Saved {} rows. Skipped {}/{} chunks", + rows_saved, skipped_chunks, chunks_processed + ); + Ok(()) +} diff --git a/auto_compressor/src/state_saving.rs b/auto_compressor/src/state_saving.rs index 988b111..c92e748 100644 --- a/auto_compressor/src/state_saving.rs +++ b/auto_compressor/src/state_saving.rs @@ -1,6 +1,7 @@ // This module contains functions to communicate with the database use anyhow::{bail, Result}; +use log::trace; use synapse_compress_state::Level; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; @@ -55,6 +56,20 @@ pub fn create_tables_if_needed(client: &mut Client) -> Result<()> { client.execute(create_progress_table, &[])?; + let create_compressor_global_progress_table = r#" + CREATE TABLE IF NOT EXISTS state_compressor_total_progress( + lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, + lowest_uncompressed_group BIGINT NOT NULL, + CHECK (Lock='X') + ); + INSERT INTO state_compressor_total_progress + (lowest_uncompressed_group) + VALUES (0) + ON CONFLICT (lock) DO NOTHING; + "#; + + client.batch_execute(create_compressor_global_progress_table)?; + Ok(()) } @@ -249,3 +264,58 @@ pub fn write_room_compressor_state( Ok(()) } + +/// Returns the room with with the lowest uncompressed state group id +/// +/// A group is detected as uncompressed if it is greater than the `last_compressed` +/// entry in `state_compressor_progress` for that room. +/// +/// The `lowest_uncompressed_group` value stored in `state_compressor_total_progress` +/// stores where this method last finished, to prevent repeating work +/// +/// # Arguments +/// +/// * `client` - A postgres client used to send the requests to the database +pub fn get_next_room_to_compress(client: &mut Client) -> Result> { + // Walk the state_groups table until find next uncompressed group + let get_next_room = r#" + SELECT room_id, id + FROM state_groups + LEFT JOIN state_compressor_progress USING (room_id) + WHERE + id >= (SELECT lowest_uncompressed_group FROM state_compressor_total_progress) + AND ( + id > last_compressed + OR last_compressed IS NULL + ) + ORDER BY id ASC + LIMIT 1 + "#; + + let row_opt = client.query_opt(get_next_room, &[])?; + + let next_room_row = if let Some(row) = row_opt { + row + } else { + return Ok(None); + }; + + let next_room: String = next_room_row.get("room_id"); + let lowest_uncompressed_group: i64 = next_room_row.get("id"); + + // This method has determined where the lowest uncompressesed group is, save that + // information so we don't have to redo this work in the future. + let update_total_progress = r#" + UPDATE state_compressor_total_progress SET lowest_uncompressed_group = $1; + "#; + + client.execute(update_total_progress, &[&lowest_uncompressed_group])?; + + trace!( + "next_room: {}, lowest_uncompressed: {}", + next_room, + lowest_uncompressed_group + ); + + Ok(Some(next_room)) +} diff --git a/compressor_integration_tests/src/lib.rs b/compressor_integration_tests/src/lib.rs index 181ffef..9f72f68 100644 --- a/compressor_integration_tests/src/lib.rs +++ b/compressor_integration_tests/src/lib.rs @@ -314,6 +314,7 @@ pub fn clear_compressor_state() { let sql = r" TRUNCATE state_compressor_state; TRUNCATE state_compressor_progress; + UPDATE state_compressor_total_progress SET lowest_uncompressed_group = 0; "; client.batch_execute(sql).unwrap(); diff --git a/compressor_integration_tests/tests/auto_compressor_manager_tests.rs b/compressor_integration_tests/tests/auto_compressor_manager_tests.rs index eb59b5c..2b9f8bc 100644 --- a/compressor_integration_tests/tests/auto_compressor_manager_tests.rs +++ b/compressor_integration_tests/tests/auto_compressor_manager_tests.rs @@ -1,11 +1,16 @@ +use std::collections::BTreeMap; + use auto_compressor::{ - manager::run_compressor_on_room_chunk, + manager::{compress_chunks_of_database, run_compressor_on_room_chunk}, state_saving::{connect_to_database, create_tables_if_needed}, }; use compressor_integration_tests::{ add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map, database_structure_matches_map, empty_database, - map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state}, + map_builder::{ + compressed_3_3_from_0_to_13_with_state, line_segments_with_state, + structure_from_edges_with_state, + }, setup_logger, DB_URL, }; use serial_test::serial; @@ -31,7 +36,7 @@ fn run_compressor_on_room_chunk_works() { clear_compressor_state(); // compress in 3,3 level sizes by default - let default_levels = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)]; + let default_levels = vec![Level::new(3), Level::new(3)]; // compress the first 7 groups in the room // structure should be the following afterwards @@ -63,3 +68,163 @@ fn run_compressor_on_room_chunk_works() { // Check that the structure of the database matches the expected structure assert!(database_structure_matches_map(&expected)); } + +#[test] +#[serial(db)] +fn compress_chunks_of_database_compresses_multiple_rooms() { + setup_logger(); + // This creates 2 with the following structure + // + // 0-1-2 3-4-5 6-7-8 9-10-11 12-13 + // (with room2's numbers shifted up 14) + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i in that room + let initial1 = line_segments_with_state(0, 13); + let initial2 = line_segments_with_state(14, 27); + + empty_database(); + add_contents_to_database("room1", &initial1); + add_contents_to_database("room2", &initial2); + + let mut client = connect_to_database(DB_URL).unwrap(); + create_tables_if_needed(&mut client).unwrap(); + clear_compressor_state(); + + // compress in 3,3 level sizes by default + let default_levels = vec![Level::new(3), Level::new(3)]; + + // Compress 4 chunks of size 8. + // The first two should compress room1 and the second two should compress room2 + compress_chunks_of_database(DB_URL, 8, &default_levels, 4).unwrap(); + + // We are aiming for the following structure in the database for room1 + // i.e. groups 6 and 9 should have changed from initial map + // N.B. this saves 11 rows + // + // 0 3\ 12 + // 1 4 6\ 13 + // 2 5 7 9 + // 8 10 + // 11 + // + // Where each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + let expected1 = compressed_3_3_from_0_to_13_with_state(); + + // Check that the database still gives correct states for each group in room1 + assert!(database_collapsed_states_match_map(&initial1)); + + // Check that the structure of the database matches the expected structure for room1 + assert!(database_structure_matches_map(&expected1)); + + // room 2 should have the same structure but will all numbers shifted up by 14 + let expected_edges: BTreeMap = vec![ + (15, 14), + (16, 15), + (18, 17), + (19, 18), + (20, 17), + (21, 20), + (22, 21), + (23, 20), + (24, 23), + (25, 24), + (27, 26), + ] + .into_iter() + .collect(); + + let expected2 = structure_from_edges_with_state(expected_edges, 14, 27); + + // Check that the database still gives correct states for each group in room2 + assert!(database_collapsed_states_match_map(&initial2)); + + // Check that the structure of the database matches the expected structure for room2 + assert!(database_structure_matches_map(&expected2)); +} + +#[test] +#[serial(db)] +fn compress_chunks_of_database_continues_where_it_left_off() { + setup_logger(); + // This creates 2 with the following structure + // + // 0-1-2 3-4-5 6-7-8 9-10-11 12-13 + // (with room2's numbers shifted up 14) + // + // Each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i in that room + let initial1 = line_segments_with_state(0, 13); + let initial2 = line_segments_with_state(14, 27); + + empty_database(); + add_contents_to_database("room1", &initial1); + add_contents_to_database("room2", &initial2); + + let mut client = connect_to_database(DB_URL).unwrap(); + create_tables_if_needed(&mut client).unwrap(); + clear_compressor_state(); + + // compress in 3,3 level sizes by default + let default_levels = vec![Level::new(3), Level::new(3)]; + + // Compress chunks of various sizes: + // + // These two should compress room1 + compress_chunks_of_database(DB_URL, 8, &default_levels, 1).unwrap(); + compress_chunks_of_database(DB_URL, 100, &default_levels, 1).unwrap(); + // These three should compress room2 + compress_chunks_of_database(DB_URL, 1, &default_levels, 2).unwrap(); + compress_chunks_of_database(DB_URL, 5, &default_levels, 1).unwrap(); + compress_chunks_of_database(DB_URL, 5, &default_levels, 1).unwrap(); + + // We are aiming for the following structure in the database for room1 + // i.e. groups 6 and 9 should have changed from initial map + // N.B. this saves 11 rows + // + // 0 3\ 12 + // 1 4 6\ 13 + // 2 5 7 9 + // 8 10 + // 11 + // + // Where each group i has state: + // ('node','is', i) + // ('group', j, 'seen') - for all j less than i + let expected1 = compressed_3_3_from_0_to_13_with_state(); + + // Check that the database still gives correct states for each group in room1 + assert!(database_collapsed_states_match_map(&initial1)); + + // Check that the structure of the database matches the expected structure for room1 + assert!(database_structure_matches_map(&expected1)); + + // room 2 should have the same structure but will all numbers shifted up by 14 + let expected_edges: BTreeMap = vec![ + (15, 14), + (16, 15), + (18, 17), + (19, 18), + (20, 17), + (21, 20), + (22, 21), + (23, 20), + (24, 23), + (25, 24), + (27, 26), + ] + .into_iter() + .collect(); + + let expected2 = structure_from_edges_with_state(expected_edges, 14, 27); + + // Check that the database still gives correct states for each group in room2 + assert!(database_collapsed_states_match_map(&initial2)); + + // Check that the structure of the database matches the expected structure for room2 + assert!(database_structure_matches_map(&expected2)); +} diff --git a/src/database.rs b/src/database.rs index ba13f4f..7da15d8 100644 --- a/src/database.rs +++ b/src/database.rs @@ -158,6 +158,7 @@ fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap $2 LIMIT $3", query_chunk_of_ids); + query_chunk_of_ids = format!( + r"{} AND id > $2 ORDER BY id ASC LIMIT $3", + query_chunk_of_ids + ); } else if groups_to_compress.is_some() { params = vec![&room_id, &groups_to_compress]; - query_chunk_of_ids = format!(r"{} LIMIT $2", query_chunk_of_ids); + query_chunk_of_ids = format!(r"{} ORDER BY id ASC LIMIT $2", query_chunk_of_ids); } else { params = vec![&room_id]; }