Add new package with methods to save and load compressor state (#63)
This commit is contained in:
100
Cargo.lock
generated
100
Cargo.lock
generated
@@ -2,6 +2,15 @@
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "0.7.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ansi_term"
|
||||
version = "0.11.0"
|
||||
@@ -11,6 +20,18 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.44"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5ab7d9e73059c86c36473f459b52adbd99c3554a4fec492caef460806006f00"
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.50"
|
||||
@@ -33,6 +54,26 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "auto_compressor"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"jemallocator",
|
||||
"log",
|
||||
"log-panics",
|
||||
"openssl",
|
||||
"postgres",
|
||||
"postgres-openssl",
|
||||
"pyo3",
|
||||
"pyo3-log",
|
||||
"rand",
|
||||
"serial_test",
|
||||
"synapse_compress_state",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.0.1"
|
||||
@@ -103,6 +144,7 @@ dependencies = [
|
||||
name = "compressor_integration_tests"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"auto_compressor",
|
||||
"openssl",
|
||||
"postgres",
|
||||
"postgres-openssl",
|
||||
@@ -210,6 +252,18 @@ version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/TilCreator/env_logger?branch=fix_pipe#3d09e0d824d9301cf1c0d4a9f148f8cfeb216329"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"humantime",
|
||||
"log",
|
||||
"regex",
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fallible-iterator"
|
||||
version = "0.2.0"
|
||||
@@ -371,6 +425,12 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "indicatif"
|
||||
version = "0.16.2"
|
||||
@@ -466,6 +526,15 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log-panics"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae0136257df209261daa18d6c16394757c63e032e27aafd8b07788b051082bef"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
@@ -790,6 +859,17 @@ dependencies = [
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyo3-log"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9d42fe53a9ba733c9dd4289f73389dafa35b6ad0e2e07fbb480e5a111b83749"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"log",
|
||||
"pyo3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyo3-macros"
|
||||
version = "0.14.1"
|
||||
@@ -902,6 +982,8 @@ version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
@@ -1054,6 +1136,15 @@ dependencies = [
|
||||
"string_cache",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "termcolor"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
|
||||
dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "terminal_size"
|
||||
version = "0.1.17"
|
||||
@@ -1234,6 +1325,15 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[workspace]
|
||||
members = ["compressor_integration_tests"]
|
||||
members = ["auto_compressor", "compressor_integration_tests"]
|
||||
|
||||
[package]
|
||||
authors = ["Erik Johnston"]
|
||||
|
||||
30
auto_compressor/Cargo.toml
Normal file
30
auto_compressor/Cargo.toml
Normal file
@@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "auto_compressor"
|
||||
authors = ["William Ashton"]
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
clap = "2.33.0"
|
||||
openssl = "0.10.32"
|
||||
postgres = "0.19.0"
|
||||
postgres-openssl = "0.5.0"
|
||||
jemallocator = "0.3.2"
|
||||
rand = "0.8.0"
|
||||
serial_test = "0.5.1"
|
||||
synapse_compress_state = { path = "../" }
|
||||
env_logger = { version = "0.9.0", git = "https://github.com/TilCreator/env_logger", branch = "fix_pipe" }
|
||||
log = "0.4.14"
|
||||
log-panics = "2.0.0"
|
||||
anyhow = "1.0.42"
|
||||
pyo3-log = "0.4.0"
|
||||
|
||||
# Needed for pyo3 support
|
||||
[lib]
|
||||
crate-type = ["cdylib", "rlib"]
|
||||
|
||||
[dependencies.pyo3]
|
||||
version = "0.14.1"
|
||||
features = ["extension-module","abi3-py36"]
|
||||
9
auto_compressor/src/lib.rs
Normal file
9
auto_compressor/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
//! This is a tool that uses the synapse_compress_state library to
|
||||
//! reduce the size of the synapse state_groups_state table in a postgres
|
||||
//! database.
|
||||
//!
|
||||
//! It adds the tables state_compressor_state and state_compressor_progress
|
||||
//! to the database and uses these to enable it to incrementally work
|
||||
//! on space reductions
|
||||
|
||||
pub mod state_saving;
|
||||
251
auto_compressor/src/state_saving.rs
Normal file
251
auto_compressor/src/state_saving.rs
Normal file
@@ -0,0 +1,251 @@
|
||||
// This module contains functions to communicate with the database
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use synapse_compress_state::Level;
|
||||
|
||||
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
||||
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
|
||||
use postgres_openssl::MakeTlsConnector;
|
||||
|
||||
/// Connects to the database and returns a postgres client
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `db_url` - The URL of the postgres database that synapse is using.
|
||||
/// e.g. "postgresql://user:password@domain.com/synapse"
|
||||
pub fn connect_to_database(db_url: &str) -> Result<Client> {
|
||||
let mut builder = SslConnector::builder(SslMethod::tls())?;
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
let connector = MakeTlsConnector::new(builder.build());
|
||||
|
||||
let client = Client::connect(db_url, connector)?;
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Creates the state_compressor_state and state_compressor progress tables
|
||||
///
|
||||
/// If these tables already exist then this function does nothing
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client` - A postgres client used to send the requests to the database
|
||||
pub fn create_tables_if_needed(client: &mut Client) -> Result<()> {
|
||||
let create_state_table = r#"
|
||||
CREATE TABLE IF NOT EXISTS state_compressor_state (
|
||||
room_id TEXT NOT NULL,
|
||||
level_num INT NOT NULL,
|
||||
max_size INT NOT NULL,
|
||||
current_length INT NOT NULL,
|
||||
current_head BIGINT,
|
||||
UNIQUE (room_id, level_num)
|
||||
)"#;
|
||||
|
||||
client.execute(create_state_table, &[])?;
|
||||
|
||||
let create_state_table_indexes = r#"
|
||||
CREATE INDEX IF NOT EXISTS state_compressor_state_index ON state_compressor_state (room_id)"#;
|
||||
|
||||
client.execute(create_state_table_indexes, &[])?;
|
||||
|
||||
let create_progress_table = r#"
|
||||
CREATE TABLE IF NOT EXISTS state_compressor_progress (
|
||||
room_id TEXT PRIMARY KEY,
|
||||
last_compressed BIGINT NOT NULL
|
||||
)"#;
|
||||
|
||||
client.execute(create_progress_table, &[])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve the level info so we can restart the compressor
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client` - A postgres client used to send the requests to the database
|
||||
/// * `room_id` - The room who's saved compressor state we want to load
|
||||
pub fn read_room_compressor_state(
|
||||
client: &mut Client,
|
||||
room_id: &str,
|
||||
) -> Result<Option<(i64, Vec<Level>)>> {
|
||||
// Query to retrieve all levels from state_compressor_state
|
||||
// Ordered by ascending level_number
|
||||
let sql = r#"
|
||||
SELECT level_num, max_size, current_length, current_head, last_compressed
|
||||
FROM state_compressor_state
|
||||
LEFT JOIN state_compressor_progress USING (room_id)
|
||||
WHERE room_id = $1
|
||||
ORDER BY level_num ASC
|
||||
"#;
|
||||
|
||||
// send the query to the database
|
||||
let mut levels = client.query_raw(sql, &[room_id])?;
|
||||
|
||||
// Needed to ensure that the rows are for unique consecutive levels
|
||||
// starting from 1 (i.e of form [1,2,3] not [0,1,2] or [1,1,2,2,3])
|
||||
let mut prev_seen = 0;
|
||||
|
||||
// The vector to store the level info from the database in
|
||||
let mut level_info: Vec<Level> = Vec::new();
|
||||
|
||||
// Where the last compressor run stopped
|
||||
let mut last_compressed = None;
|
||||
// Used to only read last_compressed value once
|
||||
let mut first_row = true;
|
||||
|
||||
// Loop through all the rows retrieved by that query
|
||||
while let Some(l) = levels.next()? {
|
||||
// Read out the fields into variables
|
||||
//
|
||||
// Some of these are `usize` as they may be used to index vectors, but stored as Postgres
|
||||
// type `INT` which is the same as`i32`.
|
||||
//
|
||||
// Since usize is unlikely to be ess than 32 bits wide, this conversion should be safe
|
||||
let level_num: usize = l.get::<_, i32>("level_num") as usize;
|
||||
let max_size: usize = l.get::<_, i32>("max_size") as usize;
|
||||
let current_length: usize = l.get::<_, i32>("current_length") as usize;
|
||||
let current_head: Option<i64> = l.get("current_head");
|
||||
|
||||
// Only read the last compressed column once since is the same for each row
|
||||
if first_row {
|
||||
last_compressed = l.get("last_compressed"); // might be NULL if corrupted
|
||||
if last_compressed.is_none() {
|
||||
bail!(
|
||||
"No entry in state_compressor_progress for room {} but entries in state_compressor_state were found",
|
||||
room_id
|
||||
)
|
||||
}
|
||||
first_row = false;
|
||||
}
|
||||
|
||||
// Check that there aren't multiple entries for the same level number
|
||||
// in the database. (Should be impossible due to unique key constraint)
|
||||
if prev_seen == level_num {
|
||||
bail!(
|
||||
"The level {} occurs twice in state_compressor_state for room {}",
|
||||
level_num,
|
||||
room_id,
|
||||
);
|
||||
}
|
||||
|
||||
// Check that there is no missing level in the database
|
||||
// e.g. if the previous row retrieved was for level 1 and this
|
||||
// row is for level 3 then since the SQL query orders the results
|
||||
// in ascenting level numbers, there was no level 2 found!
|
||||
if prev_seen != level_num - 1 {
|
||||
bail!("Levels between {} and {} are missing", prev_seen, level_num,);
|
||||
}
|
||||
|
||||
// if the level is not empty, then it must have a head!
|
||||
if current_head.is_none() && current_length != 0 {
|
||||
bail!(
|
||||
"Level {} has no head but current length is {} in room {}",
|
||||
level_num,
|
||||
current_length,
|
||||
room_id,
|
||||
);
|
||||
}
|
||||
|
||||
// If the level has more groups in than the maximum then something is wrong!
|
||||
if current_length > max_size {
|
||||
bail!(
|
||||
"Level {} has length {} but max size {} in room {}",
|
||||
level_num,
|
||||
current_length,
|
||||
max_size,
|
||||
room_id,
|
||||
);
|
||||
}
|
||||
|
||||
// Add this level to the level_info vector
|
||||
level_info.push(Level::restore(max_size, current_length, current_head));
|
||||
// Mark the previous level_number seen as the current one
|
||||
prev_seen = level_num;
|
||||
}
|
||||
|
||||
// If we didn't retrieve anything from the database then there is no saved state
|
||||
// in the database!
|
||||
if level_info.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Return the compressor state we retrieved
|
||||
// last_compressed cannot be None at this point, so safe to unwrap
|
||||
Ok(Some((last_compressed.unwrap(), level_info)))
|
||||
}
|
||||
|
||||
/// Save the level info so it can be loaded by the next run of the compressor
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client` - A postgres client used to send the requests to the database
|
||||
/// * `room_id` - The room who's saved compressor state we want to save
|
||||
/// * `level_info` - The state that can be used to restore the compressor later
|
||||
/// * `last_compressed` - The last state_group that was compressed. This is needed
|
||||
/// so that the compressor knows where to start from next
|
||||
pub fn write_room_compressor_state(
|
||||
client: &mut Client,
|
||||
room_id: &str,
|
||||
level_info: &[Level],
|
||||
last_compressed: i64,
|
||||
) -> Result<()> {
|
||||
// Wrap all the changes to the state for this room in a transaction
|
||||
// This prevents accidentally having malformed compressor start info
|
||||
let mut write_transaction = client.transaction()?;
|
||||
|
||||
// Go through every level that the compressor is using
|
||||
for (level_num, level) in level_info.iter().enumerate() {
|
||||
// the 1st level is level 1 not level 0, but enumerate starts at 0
|
||||
// so need to add 1 to get correct number
|
||||
let level_num = level_num + 1;
|
||||
|
||||
// bring the level info out of the Level struct
|
||||
let (max_size, current_len, current_head) = (
|
||||
level.get_max_length(),
|
||||
level.get_current_length(),
|
||||
level.get_head(),
|
||||
);
|
||||
|
||||
// Update the database with this compressor state information
|
||||
//
|
||||
// Some of these are `usize` as they may be used to index vectors, but stored as Postgres
|
||||
// type `INT` which is the same as`i32`.
|
||||
//
|
||||
// Since these values should always be small, this conversion should be safe.
|
||||
let (level_num, max_size, current_len) =
|
||||
(level_num as i32, max_size as i32, current_len as i32);
|
||||
let params: Vec<&(dyn ToSql + Sync)> =
|
||||
vec![&room_id, &level_num, &max_size, ¤t_len, ¤t_head];
|
||||
|
||||
write_transaction.execute(
|
||||
r#"
|
||||
INSERT INTO state_compressor_state
|
||||
(room_id, level_num, max_size, current_length, current_head)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (room_id, level_num)
|
||||
DO UPDATE SET
|
||||
max_size = excluded.max_size,
|
||||
current_length = excluded.current_length,
|
||||
current_head= excluded.current_head;
|
||||
"#,
|
||||
¶ms,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Update the database with this progress information
|
||||
let params: Vec<&(dyn ToSql + Sync)> = vec![&room_id, &last_compressed];
|
||||
write_transaction.execute(
|
||||
r#"
|
||||
INSERT INTO state_compressor_progress (room_id, last_compressed)
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (room_id)
|
||||
DO UPDATE SET last_compressed = excluded.last_compressed;
|
||||
"#,
|
||||
¶ms,
|
||||
)?;
|
||||
|
||||
// Commit the transaction (otherwise changes never happen)
|
||||
write_transaction.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -13,6 +13,7 @@ postgres = "0.19.0"
|
||||
postgres-openssl = "0.5.0"
|
||||
rand = "0.8.0"
|
||||
synapse_compress_state = { path = "../" }
|
||||
auto_compressor = { path = "../auto_compressor/" }
|
||||
|
||||
[dependencies.state-map]
|
||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||
@@ -69,7 +69,7 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
|
||||
client.batch_execute(&sql).unwrap();
|
||||
}
|
||||
|
||||
// Clears the contents of the testing database
|
||||
/// Clears the contents of the testing database
|
||||
pub fn empty_database() {
|
||||
// connect to the database
|
||||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
@@ -80,9 +80,9 @@ pub fn empty_database() {
|
||||
|
||||
// delete all the contents from all three tables
|
||||
let sql = r"
|
||||
DELETE FROM state_groups;
|
||||
DELETE FROM state_group_edges;
|
||||
DELETE FROM state_groups_state;
|
||||
TRUNCATE state_groups;
|
||||
TRUNCATE state_group_edges;
|
||||
TRUNCATE state_groups_state;
|
||||
";
|
||||
|
||||
client.batch_execute(sql).unwrap();
|
||||
@@ -300,6 +300,24 @@ pub fn database_structure_matches_map(state_group_map: &BTreeMap<i64, StateGroup
|
||||
true
|
||||
}
|
||||
|
||||
/// Clears the compressor state from the database
|
||||
pub fn clear_compressor_state() {
|
||||
// connect to the database
|
||||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
let connector = MakeTlsConnector::new(builder.build());
|
||||
|
||||
let mut client = Client::connect(DB_URL, connector).unwrap();
|
||||
|
||||
// delete all the contents from the state compressor tables
|
||||
let sql = r"
|
||||
TRUNCATE state_compressor_state;
|
||||
TRUNCATE state_compressor_progress;
|
||||
";
|
||||
|
||||
client.batch_execute(sql).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn functions_are_self_consistent() {
|
||||
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
use auto_compressor::state_saving::{
|
||||
connect_to_database, create_tables_if_needed, read_room_compressor_state,
|
||||
write_room_compressor_state,
|
||||
};
|
||||
use compressor_integration_tests::{clear_compressor_state, DB_URL};
|
||||
use serial_test::serial;
|
||||
use synapse_compress_state::Level;
|
||||
|
||||
#[test]
|
||||
#[serial(db)]
|
||||
fn write_then_read_state_gives_correct_results() {
|
||||
let mut client = connect_to_database(DB_URL).unwrap();
|
||||
create_tables_if_needed(&mut client).unwrap();
|
||||
clear_compressor_state();
|
||||
|
||||
let room_id = "room1";
|
||||
let written_info: Vec<Level> =
|
||||
vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))];
|
||||
let written_num = 53;
|
||||
write_room_compressor_state(&mut client, room_id, &written_info, written_num).unwrap();
|
||||
|
||||
let (read_num, read_info) = read_room_compressor_state(&mut client, room_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(written_info, read_info);
|
||||
assert_eq!(written_num, read_num);
|
||||
}
|
||||
Reference in New Issue
Block a user