Make the auto compressor uploadable to pypi (#75)

This commit is contained in:
Erik Johnston
2021-09-28 16:57:13 +01:00
committed by GitHub
parent bf57e81f54
commit 0111079153
15 changed files with 118 additions and 98 deletions

View File

@@ -0,0 +1,36 @@
[package]
name = "synapse_auto_compressor"
authors = ["William Ashton"]
version = "0.1.1"
edition = "2018"
[package.metadata.maturin]
requires-python = ">=3.6"
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
classifier = [
"Development Status :: 4 - Beta",
"Programming Language :: Rust",
]
[dependencies]
clap = "2.33.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
jemallocator = "0.3.2"
rand = "0.8.0"
serial_test = "0.5.1"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
env_logger = "0.9.0"
log = "0.4.14"
log-panics = "2.0.0"
anyhow = "1.0.42"
pyo3-log = "0.4.0"
# Needed for pyo3 support
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.pyo3]
version = "0.14.1"
features = ["extension-module","abi3-py36"]

View File

@@ -0,0 +1,12 @@
# Auto Compressor
See the top level readme for information.
## Publishing to PyPI
Bump the version number and run from the root directory of the repo:
```
docker run -it --rm -v $(pwd):/io -e OPENSSL_STATIC=1 konstin2/maturin publish -m synapse_auto_compressor/Cargo.toml --cargo-extra-args "\--features='openssl/vendored'"
```

View File

@@ -0,0 +1,128 @@
//! This is a tool that uses the synapse_compress_state library to
//! reduce the size of the synapse state_groups_state table in a postgres
//! database.
//!
//! It adds the tables state_compressor_state and state_compressor_progress
//! to the database and uses these to enable it to incrementally work
//! on space reductions
use anyhow::Result;
use log::{error, LevelFilter};
use pyo3::{
exceptions::PyRuntimeError, prelude::pymodule, types::PyModule, PyErr, PyResult, Python,
};
use std::str::FromStr;
use synapse_compress_state::Level;
pub mod manager;
pub mod state_saving;
/// Helper struct for parsing the `default_levels` argument.
///
/// The compressor keeps track of a number of Levels, each of which have a maximum length,
/// current length, and an optional current head (None if level is empty, Some if a head
/// exists).
///
/// This is needed since FromStr cannot be implemented for structs
/// that aren't defined in this scope
#[derive(PartialEq, Debug)]
pub struct LevelInfo(pub Vec<Level>);
// Implement FromStr so that an argument of the form "100,50,25"
// can be used to create a vector of levels with max sizes 100, 50 and 25
// For more info see the LevelState documentation in lib.rs
impl FromStr for LevelInfo {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Stores the max sizes of each level
let mut level_info: Vec<Level> = Vec::new();
// Split the string up at each comma
for size_str in s.split(',') {
// try and convert each section into a number
// panic if that fails
let size: usize = size_str
.parse()
.map_err(|_| "Not a comma separated list of numbers")?;
// add this parsed number to the sizes struct
level_info.push(Level::new(size));
}
// Return the built up vector inside a LevelInfo struct
Ok(LevelInfo(level_info))
}
}
// PyO3 INTERFACE STARTS HERE
#[pymodule]
fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
let _ = pyo3_log::Logger::default()
// don't send out anything lower than a warning from other crates
.filter(LevelFilter::Warn)
// don't log warnings from synapse_compress_state, the
// synapse_auto_compressor handles these situations and provides better
// log messages
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Error)
// log info and above for the synapse_auto_compressor
.filter_target("synapse_auto_compressor".to_owned(), LevelFilter::Debug)
.install();
// ensure any panics produce error messages in the log
log_panics::init();
#[pyfn(m, compress_largest_rooms)]
fn compress_state_events_table(
py: Python,
db_url: String,
chunk_size: i64,
default_levels: String,
number_of_chunks: i64,
) -> PyResult<()> {
// Stops the compressor from holding the GIL while running
py.allow_threads(|| {
_compress_state_events_table_body(db_url, chunk_size, default_levels, number_of_chunks)
})
}
// Not accessbile through Py03. It is a "private" function.
fn _compress_state_events_table_body(
db_url: String,
chunk_size: i64,
default_levels: String,
number_of_chunks: i64,
) -> PyResult<()> {
// Announce the start of the program to the logs
log::info!("synapse_auto_compressor started");
// Parse the default_level string into a LevelInfo struct
let default_levels: LevelInfo = match default_levels.parse() {
Ok(l_sizes) => l_sizes,
Err(e) => {
return Err(PyErr::new::<PyRuntimeError, _>(format!(
"Unable to parse level_sizes: {}",
e
)))
}
};
// call compress_largest_rooms with the arguments supplied
let run_result = manager::compress_chunks_of_database(
&db_url,
chunk_size,
&default_levels.0,
number_of_chunks,
);
// (Note, need to do `{:?}` formatting to show error context)
// Don't log the context of errors but do use it in the PyRuntimeError
if let Err(e) = run_result {
error!("{}", e);
return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e)));
}
log::info!("synapse_auto_compressor finished");
Ok(())
}
Ok(())
}

View File

@@ -0,0 +1,159 @@
//! This is a tool that uses the synapse_compress_state library to
//! reduce the size of the synapse state_groups_state table in a postgres
//! database.
//!
//! It adds the tables state_compressor_state and state_compressor_progress
//! to the database and uses these to enable it to incrementally work
//! on space reductions.
//!
//! This binary calls manager::compress_largest_rooms() with the arguments
//! provided. That is, it compresses (in batches) the top N rooms ranked by
//! amount of "uncompressed" state. This is measured by the number of rows in
//! the state_groups_state table.
//!
//! After each batch, the rows processed are marked as "compressed" (using
//! the state_compressor_progress table), and the program state is saved into
//! the state_compressor_state table so that the compressor can seemlesly
//! continue from where it left off.
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use log::LevelFilter;
use std::{env, fs::OpenOptions};
use synapse_auto_compressor::{manager, state_saving, LevelInfo};
/// Execution starts here
fn main() {
// setup the logger for the synapse_auto_compressor
// The default can be overwritten with RUST_LOG
// see the README for more information
let log_file = OpenOptions::new()
.append(true)
.create(true)
.open("synapse_auto_compressor.log")
.unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e));
if env::var("RUST_LOG").is_err() {
let mut log_builder = env_logger::builder();
// Ensure panics still come through
log_builder.filter_module("panic", LevelFilter::Error);
// Only output errors from the synapse_compress state library
log_builder.filter_module("synapse_compress_state", LevelFilter::Error);
// Output log levels info and above from synapse_auto_compressor
log_builder.filter_module("synapse_auto_compressor", LevelFilter::Info);
log_builder.init();
} else {
// If RUST_LOG was set then use that
let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
// Ensure panics still come through
log_builder.filter_module("panic", LevelFilter::Error);
log_builder.init();
}
log_panics::init();
// Announce the start of the program to the logs
log::info!("synapse_auto_compressor started");
// parse the command line arguments using the clap crate
let arguments = App::new(crate_name!())
.version(crate_version!())
.author(crate_authors!("\n"))
.about(crate_description!())
.arg(
Arg::with_name("postgres-url")
.short("p")
.value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.")
.long_help(concat!(
"The configuration for connecting to the Postgres database. This should be of the form ",
r#""postgresql://username:password@mydomain.com/database" or a key-value pair "#,
r#"string: "user=username password=password dbname=database host=mydomain.com" "#,
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.takes_value(true)
.required(true),
).arg(
Arg::with_name("chunk_size")
.short("c")
.value_name("COUNT")
.help("The maximum number of state groups to load into memroy at once")
.long_help(concat!(
"The number of state_groups to work on at once. All of the entries",
" from state_groups_state are requested from the database",
" for state groups that are worked on. Therefore small",
" chunk sizes may be needed on machines with low memory.",
" (Note: if the compressor fails to find space savings on the",
" chunk as a whole (which may well happen in rooms with lots",
" of backfill in) then the entire chunk is skipped.)",
))
.takes_value(true)
.required(true),
).arg(
Arg::with_name("default_levels")
.short("l")
.value_name("LEVELS")
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
" The first entry in the list is for the lowest, most granular level,",
" with each subsequent entry being for the next highest level.",
" The number of entries in the list determines the number of levels",
" that will be used.",
"\nThe sum of the sizes of the levels effect the performance of fetching the state",
" from the database, as the sum of the sizes is the upper bound on number of",
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.takes_value(true)
.required(false),
).arg(
Arg::with_name("number_of_chunks")
.short("n")
.value_name("CHUNKS_TO_COMPRESS")
.help("The number of chunks to compress")
.long_help(concat!(
"This many chunks of the database will be compressed. The higher this number is set to, ",
"the longer the compressor will run for."
))
.takes_value(true)
.required(true),
).get_matches();
// The URL of the database
let db_url = arguments
.value_of("postgres-url")
.expect("A database url is required");
// The number of state groups to work on at once
let chunk_size = arguments
.value_of("chunk_size")
.map(|s| s.parse().expect("chunk_size must be an integer"))
.expect("A chunk size is required");
// The default structure to use when compressing
let default_levels = value_t!(arguments, "default_levels", LevelInfo)
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
// The number of rooms to compress with this tool
let number_of_chunks = arguments
.value_of("number_of_chunks")
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
.expect("number_of_chunks is required");
// Connect to the database and create the 2 tables this tool needs
// (Note: if they already exist then this does nothing)
let mut client = state_saving::connect_to_database(db_url)
.unwrap_or_else(|e| panic!("Error occured while connecting to {}: {}", db_url, e));
state_saving::create_tables_if_needed(&mut client)
.unwrap_or_else(|e| panic!("Error occured while creating tables in database: {}", e));
// call compress_largest_rooms with the arguments supplied
// panic if an error is produced
manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks)
.unwrap();
log::info!("synapse_auto_compressor finished");
}

View File

@@ -0,0 +1,194 @@
// This module contains functions that carry out diffferent types
// of compression on the database.
use crate::state_saving::{
connect_to_database, create_tables_if_needed, get_next_room_to_compress,
read_room_compressor_state, write_room_compressor_state,
};
use anyhow::{bail, Context, Result};
use log::{debug, info, warn};
use synapse_compress_state::{continue_run, ChunkStats, Level};
/// Runs the compressor on a chunk of the room
///
/// Returns `Some(chunk_stats)` if the compressor has progressed
/// and `None` if it had already got to the end of the room
///
/// # Arguments
///
/// * `db_url` - The URL of the postgres database that synapse is using.
/// e.g. "postgresql://user:password@domain.com/synapse"
///
/// * `room_id` - The id of the room to run the compressor on. Note this
/// is the id as stored in the database and will look like
/// "!aasdfasdfafdsdsa:matrix.org" instead of the common
/// name
///
/// * `chunk_size` - The number of state_groups to work on. All of the entries
/// from state_groups_state are requested from the database
/// for state groups that are worked on. Therefore small
/// chunk sizes may be needed on machines with low memory.
/// (Note: if the compressor fails to find space savings on the
/// chunk as a whole (which may well happen in rooms with lots
/// of backfill in) then the entire chunk is skipped.)
///
/// * `default_levels` - If the compressor has never been run on this room before
/// then we need to provide the compressor with some information
/// on what sort of compression structure we want. The default that
/// the library suggests is `vec![Level::new(100), Level::new(50), Level::new(25)]`
pub fn run_compressor_on_room_chunk(
db_url: &str,
room_id: &str,
chunk_size: i64,
default_levels: &[Level],
) -> Result<Option<ChunkStats>> {
// connect to the database
let mut client =
connect_to_database(db_url).with_context(|| format!("Failed to connect to {}", db_url))?;
// Access the database to find out where the compressor last got up to
let retrieved_state = read_room_compressor_state(&mut client, room_id)
.with_context(|| format!("Failed to read compressor state for room {}", room_id,))?;
// If the database didn't contain any information, then use the default state
let (start, level_info) = match retrieved_state {
Some((s, l)) => (Some(s), l),
None => (None, default_levels.to_vec()),
};
// run the compressor on this chunk
let option_chunk_stats = continue_run(start, chunk_size, db_url, room_id, &level_info);
if option_chunk_stats.is_none() {
debug!("No work to do on this room...");
return Ok(None);
}
// Ok to unwrap because have checked that it's not None
let chunk_stats = option_chunk_stats.unwrap();
debug!("{:?}", chunk_stats);
// Check to see whether the compressor sent its changes to the database
if !chunk_stats.commited {
if chunk_stats.new_num_rows - chunk_stats.original_num_rows != 0 {
warn!(
"The compressor tried to increase the number of rows in {} between {:?} and {}. Skipping...",
room_id, start, chunk_stats.last_compressed_group,
);
}
// Skip over the failed chunk and set the level info to the default (empty) state
write_room_compressor_state(
&mut client,
room_id,
default_levels,
chunk_stats.last_compressed_group,
)
.with_context(|| {
format!(
"Failed to skip chunk in room {} between {:?} and {}",
room_id, start, chunk_stats.last_compressed_group
)
})?;
return Ok(Some(chunk_stats));
}
// Save where we got up to after this successful commit
write_room_compressor_state(
&mut client,
room_id,
&chunk_stats.new_level_info,
chunk_stats.last_compressed_group,
)
.with_context(|| {
format!(
"Failed to save state after compressing chunk in room {} between {:?} and {}",
room_id, start, chunk_stats.last_compressed_group
)
})?;
Ok(Some(chunk_stats))
}
/// Runs the compressor in chunks on rooms with the lowest uncompressed state group ids
///
/// # Arguments
///
/// * `db_url` - The URL of the postgres database that synapse is using.
/// e.g. "postgresql://user:password@domain.com/synapse"
///
/// * `chunk_size` - The number of state_groups to work on. All of the entries
/// from state_groups_state are requested from the database
/// for state groups that are worked on. Therefore small
/// chunk sizes may be needed on machines with low memory.
/// (Note: if the compressor fails to find space savings on the
/// chunk as a whole (which may well happen in rooms with lots
/// of backfill in) then the entire chunk is skipped.)
///
/// * `default_levels` - If the compressor has never been run on this room before
/// Then we need to provide the compressor with some information
/// on what sort of compression structure we want. The default that
/// the library suggests is empty levels with max sizes of 100, 50 and 25
///
/// * `number_of_chunks`- The number of chunks to compress. The larger this number is, the longer
/// the compressor will run for.
pub fn compress_chunks_of_database(
db_url: &str,
chunk_size: i64,
default_levels: &[Level],
number_of_chunks: i64,
) -> Result<()> {
// connect to the database
let mut client = connect_to_database(db_url)
.with_context(|| format!("Failed to connect to database at {}", db_url))?;
create_tables_if_needed(&mut client).context("Failed to create state compressor tables")?;
let mut skipped_chunks = 0;
let mut rows_saved = 0;
let mut chunks_processed = 0;
while chunks_processed < number_of_chunks {
let room_to_compress = get_next_room_to_compress(&mut client)
.context("Failed to work out what room to compress next")?;
if room_to_compress.is_none() {
break;
}
let room_to_compress =
room_to_compress.expect("Have checked that rooms_to_compress is not None");
info!(
"Running compressor on room {} with chunk size {}",
room_to_compress, chunk_size
);
let work_done =
run_compressor_on_room_chunk(db_url, &room_to_compress, chunk_size, default_levels)?;
if let Some(ref chunk_stats) = work_done {
if chunk_stats.commited {
let savings = chunk_stats.original_num_rows - chunk_stats.new_num_rows;
rows_saved += chunk_stats.original_num_rows - chunk_stats.new_num_rows;
debug!("Saved {} rows for room {}", savings, room_to_compress);
} else {
skipped_chunks += 1;
debug!(
"Unable to make savings for room {}, skipping chunk",
room_to_compress
);
}
chunks_processed += 1;
} else {
bail!("Ran the compressor on a room that had no more work to do!")
}
}
info!(
"Finished running compressor. Saved {} rows. Skipped {}/{} chunks",
rows_saved, skipped_chunks, chunks_processed
);
Ok(())
}

View File

@@ -0,0 +1,321 @@
// This module contains functions to communicate with the database
use anyhow::{bail, Result};
use log::trace;
use synapse_compress_state::Level;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
use postgres_openssl::MakeTlsConnector;
/// Connects to the database and returns a postgres client
///
/// # Arguments
///
/// * `db_url` - The URL of the postgres database that synapse is using.
/// e.g. "postgresql://user:password@domain.com/synapse"
pub fn connect_to_database(db_url: &str) -> Result<Client> {
let mut builder = SslConnector::builder(SslMethod::tls())?;
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());
let client = Client::connect(db_url, connector)?;
Ok(client)
}
/// Creates the state_compressor_state and state_compressor progress tables
///
/// If these tables already exist then this function does nothing
///
/// # Arguments
///
/// * `client` - A postgres client used to send the requests to the database
pub fn create_tables_if_needed(client: &mut Client) -> Result<()> {
let create_state_table = r#"
CREATE TABLE IF NOT EXISTS state_compressor_state (
room_id TEXT NOT NULL,
level_num INT NOT NULL,
max_size INT NOT NULL,
current_length INT NOT NULL,
current_head BIGINT,
UNIQUE (room_id, level_num)
)"#;
client.execute(create_state_table, &[])?;
let create_state_table_indexes = r#"
CREATE INDEX IF NOT EXISTS state_compressor_state_index ON state_compressor_state (room_id)"#;
client.execute(create_state_table_indexes, &[])?;
let create_progress_table = r#"
CREATE TABLE IF NOT EXISTS state_compressor_progress (
room_id TEXT PRIMARY KEY,
last_compressed BIGINT NOT NULL
)"#;
client.execute(create_progress_table, &[])?;
let create_compressor_global_progress_table = r#"
CREATE TABLE IF NOT EXISTS state_compressor_total_progress(
lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,
lowest_uncompressed_group BIGINT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO state_compressor_total_progress
(lowest_uncompressed_group)
VALUES (0)
ON CONFLICT (lock) DO NOTHING;
"#;
client.batch_execute(create_compressor_global_progress_table)?;
Ok(())
}
/// Retrieve the level info so we can restart the compressor
///
/// # Arguments
///
/// * `client` - A postgres client used to send the requests to the database
/// * `room_id` - The room who's saved compressor state we want to load
pub fn read_room_compressor_state(
client: &mut Client,
room_id: &str,
) -> Result<Option<(i64, Vec<Level>)>> {
// Query to retrieve all levels from state_compressor_state
// Ordered by ascending level_number
let sql = r#"
SELECT level_num, max_size, current_length, current_head, last_compressed
FROM state_compressor_state
LEFT JOIN state_compressor_progress USING (room_id)
WHERE room_id = $1
ORDER BY level_num ASC
"#;
// send the query to the database
let mut levels = client.query_raw(sql, &[room_id])?;
// Needed to ensure that the rows are for unique consecutive levels
// starting from 1 (i.e of form [1,2,3] not [0,1,2] or [1,1,2,2,3])
let mut prev_seen = 0;
// The vector to store the level info from the database in
let mut level_info: Vec<Level> = Vec::new();
// Where the last compressor run stopped
let mut last_compressed = None;
// Used to only read last_compressed value once
let mut first_row = true;
// Loop through all the rows retrieved by that query
while let Some(l) = levels.next()? {
// Read out the fields into variables
//
// Some of these are `usize` as they may be used to index vectors, but stored as Postgres
// type `INT` which is the same as`i32`.
//
// Since usize is unlikely to be ess than 32 bits wide, this conversion should be safe
let level_num: usize = l.get::<_, i32>("level_num") as usize;
let max_size: usize = l.get::<_, i32>("max_size") as usize;
let current_length: usize = l.get::<_, i32>("current_length") as usize;
let current_head: Option<i64> = l.get("current_head");
// Only read the last compressed column once since is the same for each row
if first_row {
last_compressed = l.get("last_compressed"); // might be NULL if corrupted
if last_compressed.is_none() {
bail!(
"No entry in state_compressor_progress for room {} but entries in state_compressor_state were found",
room_id
)
}
first_row = false;
}
// Check that there aren't multiple entries for the same level number
// in the database. (Should be impossible due to unique key constraint)
if prev_seen == level_num {
bail!(
"The level {} occurs twice in state_compressor_state for room {}",
level_num,
room_id,
);
}
// Check that there is no missing level in the database
// e.g. if the previous row retrieved was for level 1 and this
// row is for level 3 then since the SQL query orders the results
// in ascenting level numbers, there was no level 2 found!
if prev_seen != level_num - 1 {
bail!("Levels between {} and {} are missing", prev_seen, level_num,);
}
// if the level is not empty, then it must have a head!
if current_head.is_none() && current_length != 0 {
bail!(
"Level {} has no head but current length is {} in room {}",
level_num,
current_length,
room_id,
);
}
// If the level has more groups in than the maximum then something is wrong!
if current_length > max_size {
bail!(
"Level {} has length {} but max size {} in room {}",
level_num,
current_length,
max_size,
room_id,
);
}
// Add this level to the level_info vector
level_info.push(Level::restore(max_size, current_length, current_head));
// Mark the previous level_number seen as the current one
prev_seen = level_num;
}
// If we didn't retrieve anything from the database then there is no saved state
// in the database!
if level_info.is_empty() {
return Ok(None);
}
// Return the compressor state we retrieved
// last_compressed cannot be None at this point, so safe to unwrap
Ok(Some((last_compressed.unwrap(), level_info)))
}
/// Save the level info so it can be loaded by the next run of the compressor
///
/// # Arguments
///
/// * `client` - A postgres client used to send the requests to the database
/// * `room_id` - The room who's saved compressor state we want to save
/// * `level_info` - The state that can be used to restore the compressor later
/// * `last_compressed` - The last state_group that was compressed. This is needed
/// so that the compressor knows where to start from next
pub fn write_room_compressor_state(
client: &mut Client,
room_id: &str,
level_info: &[Level],
last_compressed: i64,
) -> Result<()> {
// Wrap all the changes to the state for this room in a transaction
// This prevents accidentally having malformed compressor start info
let mut write_transaction = client.transaction()?;
// Go through every level that the compressor is using
for (level_num, level) in level_info.iter().enumerate() {
// the 1st level is level 1 not level 0, but enumerate starts at 0
// so need to add 1 to get correct number
let level_num = level_num + 1;
// bring the level info out of the Level struct
let (max_size, current_len, current_head) = (
level.get_max_length(),
level.get_current_length(),
level.get_head(),
);
// Update the database with this compressor state information
//
// Some of these are `usize` as they may be used to index vectors, but stored as Postgres
// type `INT` which is the same as`i32`.
//
// Since these values should always be small, this conversion should be safe.
let (level_num, max_size, current_len) =
(level_num as i32, max_size as i32, current_len as i32);
let params: Vec<&(dyn ToSql + Sync)> =
vec![&room_id, &level_num, &max_size, &current_len, &current_head];
write_transaction.execute(
r#"
INSERT INTO state_compressor_state
(room_id, level_num, max_size, current_length, current_head)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (room_id, level_num)
DO UPDATE SET
max_size = excluded.max_size,
current_length = excluded.current_length,
current_head= excluded.current_head;
"#,
&params,
)?;
}
// Update the database with this progress information
let params: Vec<&(dyn ToSql + Sync)> = vec![&room_id, &last_compressed];
write_transaction.execute(
r#"
INSERT INTO state_compressor_progress (room_id, last_compressed)
VALUES ($1, $2)
ON CONFLICT (room_id)
DO UPDATE SET last_compressed = excluded.last_compressed;
"#,
&params,
)?;
// Commit the transaction (otherwise changes never happen)
write_transaction.commit()?;
Ok(())
}
/// Returns the room with with the lowest uncompressed state group id
///
/// A group is detected as uncompressed if it is greater than the `last_compressed`
/// entry in `state_compressor_progress` for that room.
///
/// The `lowest_uncompressed_group` value stored in `state_compressor_total_progress`
/// stores where this method last finished, to prevent repeating work
///
/// # Arguments
///
/// * `client` - A postgres client used to send the requests to the database
pub fn get_next_room_to_compress(client: &mut Client) -> Result<Option<String>> {
// Walk the state_groups table until find next uncompressed group
let get_next_room = r#"
SELECT room_id, id
FROM state_groups
LEFT JOIN state_compressor_progress USING (room_id)
WHERE
id >= (SELECT lowest_uncompressed_group FROM state_compressor_total_progress)
AND (
id > last_compressed
OR last_compressed IS NULL
)
ORDER BY id ASC
LIMIT 1
"#;
let row_opt = client.query_opt(get_next_room, &[])?;
let next_room_row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let next_room: String = next_room_row.get("room_id");
let lowest_uncompressed_group: i64 = next_room_row.get("id");
// This method has determined where the lowest uncompressesed group is, save that
// information so we don't have to redo this work in the future.
let update_total_progress = r#"
UPDATE state_compressor_total_progress SET lowest_uncompressed_group = $1;
"#;
client.execute(update_total_progress, &[&lowest_uncompressed_group])?;
trace!(
"next_room: {}, lowest_uncompressed: {}",
next_room,
lowest_uncompressed_group
);
Ok(Some(next_room))
}