Add method that compresses the chunks with lowest uncompressed state_group ids (#72)

This commit is contained in:
Azrenbeth
2021-09-27 09:49:34 +01:00
committed by GitHub
parent 3271221311
commit a069d8765a
6 changed files with 331 additions and 9 deletions

View File

@@ -2,10 +2,11 @@
// of compression on the database.
use crate::state_saving::{
connect_to_database, read_room_compressor_state, write_room_compressor_state,
connect_to_database, create_tables_if_needed, get_next_room_to_compress,
read_room_compressor_state, write_room_compressor_state,
};
use anyhow::{Context, Result};
use log::{debug, warn};
use anyhow::{bail, Context, Result};
use log::{debug, info, warn};
use synapse_compress_state::{continue_run, ChunkStats, Level};
/// Runs the compressor on a chunk of the room
@@ -110,3 +111,84 @@ pub fn run_compressor_on_room_chunk(
Ok(Some(chunk_stats))
}
/// Runs the compressor in chunks on rooms with the lowest uncompressed state group ids
///
/// # Arguments
///
/// * `db_url` - The URL of the postgres database that synapse is using.
/// e.g. "postgresql://user:password@domain.com/synapse"
///
/// * `chunk_size` - The number of state_groups to work on. All of the entries
/// from state_groups_state are requested from the database
/// for state groups that are worked on. Therefore small
/// chunk sizes may be needed on machines with low memory.
/// (Note: if the compressor fails to find space savings on the
/// chunk as a whole (which may well happen in rooms with lots
/// of backfill in) then the entire chunk is skipped.)
///
/// * `default_levels` - If the compressor has never been run on this room before
/// Then we need to provide the compressor with some information
/// on what sort of compression structure we want. The default that
/// the library suggests is empty levels with max sizes of 100, 50 and 25
///
/// * `number_of_chunks`- The number of chunks to compress. The larger this number is, the longer
/// the compressor will run for.
pub fn compress_chunks_of_database(
db_url: &str,
chunk_size: i64,
default_levels: &[Level],
number_of_chunks: i64,
) -> Result<()> {
// connect to the database
let mut client = connect_to_database(db_url)
.with_context(|| format!("Failed to connect to database at {}", db_url))?;
create_tables_if_needed(&mut client).context("Failed to create state compressor tables")?;
let mut skipped_chunks = 0;
let mut rows_saved = 0;
let mut chunks_processed = 0;
while chunks_processed < number_of_chunks {
let room_to_compress = get_next_room_to_compress(&mut client)
.context("Failed to work out what room to compress next")?;
if room_to_compress.is_none() {
break;
}
let room_to_compress =
room_to_compress.expect("Have checked that rooms_to_compress is not None");
info!(
"Running compressor on room {} with chunk size {}",
room_to_compress, chunk_size
);
let work_done =
run_compressor_on_room_chunk(db_url, &room_to_compress, chunk_size, default_levels)?;
if let Some(ref chunk_stats) = work_done {
if chunk_stats.commited {
let savings = chunk_stats.original_num_rows - chunk_stats.new_num_rows;
rows_saved += chunk_stats.original_num_rows - chunk_stats.new_num_rows;
debug!("Saved {} rows for room {}", savings, room_to_compress);
} else {
skipped_chunks += 1;
debug!(
"Unable to make savings for room {}, skipping chunk",
room_to_compress
);
}
chunks_processed += 1;
} else {
bail!("Ran the compressor on a room that had no more work to do!")
}
}
info!(
"Finished running compressor. Saved {} rows. Skipped {}/{} chunks",
rows_saved, skipped_chunks, chunks_processed
);
Ok(())
}

View File

@@ -1,6 +1,7 @@
// This module contains functions to communicate with the database
use anyhow::{bail, Result};
use log::trace;
use synapse_compress_state::Level;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
@@ -55,6 +56,20 @@ pub fn create_tables_if_needed(client: &mut Client) -> Result<()> {
client.execute(create_progress_table, &[])?;
let create_compressor_global_progress_table = r#"
CREATE TABLE IF NOT EXISTS state_compressor_total_progress(
lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,
lowest_uncompressed_group BIGINT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO state_compressor_total_progress
(lowest_uncompressed_group)
VALUES (0)
ON CONFLICT (lock) DO NOTHING;
"#;
client.batch_execute(create_compressor_global_progress_table)?;
Ok(())
}
@@ -249,3 +264,58 @@ pub fn write_room_compressor_state(
Ok(())
}
/// Returns the room with with the lowest uncompressed state group id
///
/// A group is detected as uncompressed if it is greater than the `last_compressed`
/// entry in `state_compressor_progress` for that room.
///
/// The `lowest_uncompressed_group` value stored in `state_compressor_total_progress`
/// stores where this method last finished, to prevent repeating work
///
/// # Arguments
///
/// * `client` - A postgres client used to send the requests to the database
pub fn get_next_room_to_compress(client: &mut Client) -> Result<Option<String>> {
// Walk the state_groups table until find next uncompressed group
let get_next_room = r#"
SELECT room_id, id
FROM state_groups
LEFT JOIN state_compressor_progress USING (room_id)
WHERE
id >= (SELECT lowest_uncompressed_group FROM state_compressor_total_progress)
AND (
id > last_compressed
OR last_compressed IS NULL
)
ORDER BY id ASC
LIMIT 1
"#;
let row_opt = client.query_opt(get_next_room, &[])?;
let next_room_row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let next_room: String = next_room_row.get("room_id");
let lowest_uncompressed_group: i64 = next_room_row.get("id");
// This method has determined where the lowest uncompressesed group is, save that
// information so we don't have to redo this work in the future.
let update_total_progress = r#"
UPDATE state_compressor_total_progress SET lowest_uncompressed_group = $1;
"#;
client.execute(update_total_progress, &[&lowest_uncompressed_group])?;
trace!(
"next_room: {}, lowest_uncompressed: {}",
next_room,
lowest_uncompressed_group
);
Ok(Some(next_room))
}