Files
rust-synapse-compress-state/synapse_auto_compressor/src/lib.rs
2021-09-28 16:57:13 +01:00

129 lines
4.5 KiB
Rust

//! This is a tool that uses the synapse_compress_state library to
//! reduce the size of the synapse state_groups_state table in a postgres
//! database.
//!
//! It adds the tables state_compressor_state and state_compressor_progress
//! to the database and uses these to enable it to incrementally work
//! on space reductions
use anyhow::Result;
use log::{error, LevelFilter};
use pyo3::{
exceptions::PyRuntimeError, prelude::pymodule, types::PyModule, PyErr, PyResult, Python,
};
use std::str::FromStr;
use synapse_compress_state::Level;
pub mod manager;
pub mod state_saving;
/// Helper struct for parsing the `default_levels` argument.
///
/// The compressor keeps track of a number of Levels, each of which have a maximum length,
/// current length, and an optional current head (None if level is empty, Some if a head
/// exists).
///
/// This is needed since FromStr cannot be implemented for structs
/// that aren't defined in this scope
#[derive(PartialEq, Debug)]
pub struct LevelInfo(pub Vec<Level>);
// Implement FromStr so that an argument of the form "100,50,25"
// can be used to create a vector of levels with max sizes 100, 50 and 25
// For more info see the LevelState documentation in lib.rs
impl FromStr for LevelInfo {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Stores the max sizes of each level
let mut level_info: Vec<Level> = Vec::new();
// Split the string up at each comma
for size_str in s.split(',') {
// try and convert each section into a number
// panic if that fails
let size: usize = size_str
.parse()
.map_err(|_| "Not a comma separated list of numbers")?;
// add this parsed number to the sizes struct
level_info.push(Level::new(size));
}
// Return the built up vector inside a LevelInfo struct
Ok(LevelInfo(level_info))
}
}
// PyO3 INTERFACE STARTS HERE
#[pymodule]
fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
let _ = pyo3_log::Logger::default()
// don't send out anything lower than a warning from other crates
.filter(LevelFilter::Warn)
// don't log warnings from synapse_compress_state, the
// synapse_auto_compressor handles these situations and provides better
// log messages
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Error)
// log info and above for the synapse_auto_compressor
.filter_target("synapse_auto_compressor".to_owned(), LevelFilter::Debug)
.install();
// ensure any panics produce error messages in the log
log_panics::init();
#[pyfn(m, compress_largest_rooms)]
fn compress_state_events_table(
py: Python,
db_url: String,
chunk_size: i64,
default_levels: String,
number_of_chunks: i64,
) -> PyResult<()> {
// Stops the compressor from holding the GIL while running
py.allow_threads(|| {
_compress_state_events_table_body(db_url, chunk_size, default_levels, number_of_chunks)
})
}
// Not accessbile through Py03. It is a "private" function.
fn _compress_state_events_table_body(
db_url: String,
chunk_size: i64,
default_levels: String,
number_of_chunks: i64,
) -> PyResult<()> {
// Announce the start of the program to the logs
log::info!("synapse_auto_compressor started");
// Parse the default_level string into a LevelInfo struct
let default_levels: LevelInfo = match default_levels.parse() {
Ok(l_sizes) => l_sizes,
Err(e) => {
return Err(PyErr::new::<PyRuntimeError, _>(format!(
"Unable to parse level_sizes: {}",
e
)))
}
};
// call compress_largest_rooms with the arguments supplied
let run_result = manager::compress_chunks_of_database(
&db_url,
chunk_size,
&default_levels.0,
number_of_chunks,
);
// (Note, need to do `{:?}` formatting to show error context)
// Don't log the context of errors but do use it in the PyRuntimeError
if let Err(e) = run_result {
error!("{}", e);
return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e)));
}
log::info!("synapse_auto_compressor finished");
Ok(())
}
Ok(())
}