2 Commits

Author SHA1 Message Date
Azrenbeth
987c5ac81f Remove reference to log file 2021-09-28 14:36:25 +01:00
Azrenbeth
fa42201e82 Log to stderr not to a file 2021-09-28 14:35:47 +01:00
34 changed files with 813 additions and 1648 deletions

View File

@@ -1,3 +0,0 @@
.git
.github
/target

View File

@@ -1,46 +0,0 @@
name: Build and push docker images
on:
push:
tags: ["v*"]
branches: [ main ]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Log in to Gitea Container Registry
uses: docker/login-action@v2
with:
registry: git.yongyuancv.cn
username: ${{ gitea.repository_owner }}
password: ${{ secrets.GITEA_TOKEN }}
- name: Calculate docker image tag
id: set-tag
uses: docker/metadata-action@master
with:
images: |
git.yongyuancv.cn/${{ gitea.repository }}
git.yongyuancv.cn/heimoshuiyu/${{ gitea.event.repository.name }}
flavor: |
latest=false
tags: |
type=raw,value=latest,enable=${{ gitea.ref == 'refs/heads/main' }}
type=sha,prefix=,format=long
type=semver,pattern=v{{version}}
type=semver,pattern=v{{major}}.{{minor}}
- name: Build and push all platforms
uses: docker/build-push-action@v4
with:
push: true
labels: "gitsha1=${{ gitea.sha }}"
tags: "${{ steps.set-tag.outputs.tags }}"
platforms: linux/amd64,linux/arm64
cache-from: type=registry,ref=git.yongyuancv.cn/${{ gitea.repository }}:buildcache
cache-to: type=registry,ref=git.yongyuancv.cn/${{ gitea.repository }}:buildcache,mode=max

2
.github/CODEOWNERS vendored
View File

@@ -1,2 +0,0 @@
# Automatically request reviews from the synapse-core team when a pull request comes in.
* @matrix-org/synapse-core

View File

@@ -1,58 +0,0 @@
# GitHub actions workflow which builds and publishes the docker images.
name: Build and push docker images
on:
push:
tags: ["v*"]
branches: [ main ]
workflow_dispatch:
permissions:
contents: read
packages: write
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Log in to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Log in to GHCR
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Calculate docker image tag
id: set-tag
uses: docker/metadata-action@master
with:
images: |
ghcr.io/${{ github.repository }}
docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/${{ github.event.repository.name }}
flavor: |
latest=false
tags: |
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
type=sha,prefix=,format=long
type=semver,pattern=v{{version}}
type=semver,pattern=v{{major}}.{{minor}}
- name: Build and push all platforms
uses: docker/build-push-action@v4
with:
push: true
labels: "gitsha1=${{ github.sha }}"
tags: "${{ steps.set-tag.outputs.tags }}"
platforms: linux/amd64,linux/arm64
cache-from: type=registry,ref=ghcr.io/${{ github.repository }}:buildcache
cache-to: type=registry,ref=ghcr.io/${{ github.repository }}:buildcache,mode=max

View File

@@ -1,28 +0,0 @@
name: Move new issues into the issue triage board
on:
issues:
types: [ opened ]
jobs:
add_new_issues:
name: Add new issues to the triage board
runs-on: ubuntu-latest
steps:
- uses: octokit/graphql-action@v2.x
id: add_to_project
with:
headers: '{"GraphQL-Features": "projects_next_graphql"}'
query: |
mutation add_to_project($projectid:ID!,$contentid:ID!) {
addProjectV2ItemById(input: {projectId: $projectid contentId: $contentid}) {
item {
id
}
}
}
projectid: ${{ env.PROJECT_ID }}
contentid: ${{ github.event.issue.node_id }}
env:
PROJECT_ID: "PVT_kwDOAIB0Bs4AFDdZ"
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}

View File

@@ -1,44 +0,0 @@
name: Move labelled issues to correct projects
on:
issues:
types: [ labeled ]
jobs:
move_needs_info:
name: Move X-Needs-Info on the triage board
runs-on: ubuntu-latest
if: >
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
steps:
- uses: actions/add-to-project@main
id: add_project
with:
project-url: "https://github.com/orgs/matrix-org/projects/67"
github-token: ${{ secrets.ELEMENT_BOT_TOKEN }}
- name: Set status
env:
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
run: |
gh api graphql -f query='
mutation(
$project: ID!
$item: ID!
$fieldid: ID!
$columnid: String!
) {
updateProjectV2ItemFieldValue(
input: {
projectId: $project
itemId: $item
fieldId: $fieldid
value: {
singleSelectOptionId: $columnid
}
}
) {
projectV2Item {
id
}
}
}' -f project="PVT_kwDOAIB0Bs4AFDdZ" -f item=${{ steps.add_project.outputs.itemId }} -f fieldid="PVTSSF_lADOAIB0Bs4AFDdZzgC6ZA4" -f columnid=ba22e43c --silent

View File

@@ -1,80 +0,0 @@
# Contributing
## Sign off
In order to have a concrete record that your contribution is intentional
and you agree to license it under the same terms as the project's license, we've adopted the
same lightweight approach that the Linux Kernel
[submitting patches process](
https://www.kernel.org/doc/html/latest/process/submitting-patches.html#sign-your-work-the-developer-s-certificate-of-origin>),
[Docker](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), and many other
projects use: the DCO ([Developer Certificate of Origin](http://developercertificate.org/)).
This is a simple declaration that you wrote
the contribution or otherwise have the right to contribute it to Matrix:
```
Developer Certificate of Origin
Version 1.1
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
660 York Street, Suite 102,
San Francisco, CA 94110 USA
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.
Developer's Certificate of Origin 1.1
By making a contribution to this project, I certify that:
(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or
(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or
(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.
(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
```
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment:
```
Signed-off-by: Your Name <your@email.example.org>
```
We accept contributions under a legally identifiable name, such as
your name on government documentation or common-law names (names
claimed by legitimate usage or repute). Unfortunately, we cannot
accept anonymous contributions at this time.
Git allows you to add this signoff automatically when using the `-s`
flag to `git commit`, which uses the name and email set in your
`user.name` and `user.email` git configs.
### Private Sign off
If you would like to provide your legal name privately to the Matrix.org
Foundation (instead of in a public commit or comment), you can do so
by emailing your legal name and a link to the pull request to
[dco@matrix.org](mailto:dco@matrix.org?subject=Private%20sign%20off).
It helps to include "sign off" or similar in the subject line. You will then
be instructed further.
Once private sign off is complete, doing so for future contributions will not
be required.

1279
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["synapse_auto_compressor", "compressor_integration_tests"]
members = ["auto_compressor", "compressor_integration_tests"]
[package]
authors = ["Erik Johnston"]
@@ -8,21 +8,20 @@ name = "synapse_compress_state"
version = "0.1.0"
edition = "2018"
[[bin]]
name = "synapse_compress_state"
required-features = ["clap"]
[dependencies]
indicatif = "0.17.6"
openssl = "0.10.60"
postgres = "0.19.7"
clap = "2.33.0"
indicatif = "0.16.0"
jemallocator = "0.3.2"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
rand = "0.8.5"
rayon = "1.7.0"
string_cache = "0.8.7"
env_logger = "0.10.0"
log = "0.4.20"
log-panics = "2.1.0"
rand = "0.8.0"
rayon = "1.3.0"
string_cache = "0.8.0"
env_logger = "0.9.0"
log = "0.4.14"
pyo3-log = "0.4.0"
log-panics = "2.0.0"
[dependencies.state-map]
git = "https://github.com/matrix-org/rust-matrix-state-map"
@@ -31,26 +30,11 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "4.4.2"
features = ["cargo"]
optional = true
[dependencies.pyo3]
version = "0.19.2"
features = ["extension-module"]
optional = true
[dependencies.pyo3-log]
version = "0.8.3"
optional = true
[dependencies.tikv-jemallocator]
version = "0.5.4"
optional = true
version = "0.14.1"
features = ["extension-module","abi3-py36"]
[features]
default = ["clap", "jemalloc"]
jemalloc = ["tikv-jemallocator"]
default = ["jemalloc"]
jemalloc = []
no-progress-bars = []
pyo3 = ["dep:pyo3", "dep:pyo3-log"]

View File

@@ -1,54 +0,0 @@
# This uses the multi-stage build feature of Docker to build the binaries for multiple architectures without QEMU.
# The first stage is responsible for building binaries for all the supported architectures (amd64 and arm64), and the
# second stage only copies the binaries for the target architecture.
# We leverage Zig and cargo-zigbuild for providing a cross-compilation-capable C compiler and linker.
ARG RUSTC_VERSION=1.72.0
ARG ZIG_VERSION=0.11.0
ARG CARGO_ZIGBUILD_VERSION=0.17.1
FROM --platform=${BUILDPLATFORM} docker.io/rust:${RUSTC_VERSION} AS builder
# Install cargo-zigbuild for cross-compilation
ARG CARGO_ZIGBUILD_VERSION
RUN cargo install --locked cargo-zigbuild@=${CARGO_ZIGBUILD_VERSION}
# Download zig compiler for cross-compilation
ARG ZIG_VERSION
RUN curl -L "https://ziglang.org/download/${ZIG_VERSION}/zig-linux-$(uname -m)-${ZIG_VERSION}.tar.xz" | tar -J -x -C /usr/local && \
ln -s "/usr/local/zig-linux-$(uname -m)-${ZIG_VERSION}/zig" /usr/local/bin/zig
# Install all cross-compilation targets
ARG RUSTC_VERSION
RUN rustup target add \
--toolchain "${RUSTC_VERSION}" \
x86_64-unknown-linux-musl \
aarch64-unknown-linux-musl
WORKDIR /opt/synapse-compressor/
COPY . .
# Build for all targets
RUN cargo zigbuild \
--release \
--workspace \
--bins \
--features "openssl/vendored" \
--target aarch64-unknown-linux-musl \
--target x86_64-unknown-linux-musl
# Move the binaries in a separate folder per architecture, so we can copy them using the TARGETARCH build arg
RUN mkdir -p /opt/binaries/amd64 /opt/binaries/arm64
RUN mv target/x86_64-unknown-linux-musl/release/synapse_compress_state \
target/x86_64-unknown-linux-musl/release/synapse_auto_compressor \
/opt/binaries/amd64
RUN mv target/aarch64-unknown-linux-musl/release/synapse_compress_state \
target/aarch64-unknown-linux-musl/release/synapse_auto_compressor \
/opt/binaries/arm64
FROM --platform=${TARGETPLATFORM} docker.io/alpine
ARG TARGETARCH
COPY --from=builder /opt/binaries/${TARGETARCH}/synapse_compress_state /usr/local/bin/synapse_compress_state
COPY --from=builder /opt/binaries/${TARGETARCH}/synapse_auto_compressor /usr/local/bin/synapse_auto_compressor

View File

@@ -3,7 +3,7 @@
This workspace contains experimental tools that attempt to reduce the number of
rows in the `state_groups_state` table inside of a Synapse Postgresql database.
# Automated tool: synapse_auto_compressor
# Automated tool: auto_compressor
## Introduction:
@@ -11,7 +11,7 @@ This tool is significantly more simple to use than the manual tool (described be
It scans through all of the rows in the `state_groups` database table from the start. When
it finds a group that hasn't been compressed, it runs the compressor for a while on that
group's room, saving where it got up to. After compressing a number of these chunks it stops,
saving where it got up to for the next run of the `synapse_auto_compressor`.
saving where it got up to for the next run of the `auto_compressor`.
It creates three extra tables in the database: `state_compressor_state` which stores the
information needed to stop and start the compressor for each room, `state_compressor_progress`
@@ -21,53 +21,41 @@ which stores how far through the `state_groups` table the compressor has scanned
The tool can be run manually when you are running out of space, or be scheduled to run
periodically.
## Building
## Building
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
for instructions on how to do this.
This project follows the deprecation policy of [Synapse](https://matrix-org.github.io/synapse/latest/deprecation_policy.html)
on Rust and will assume a recent stable version of Rust and the ability to fetch a more recent one if necessary.
To build `auto_compressor`, clone this repository and navigate to the `autocompressor/`
subdirectory. Then execute `cargo build`.
To build `synapse_auto_compressor`, clone this repository and navigate to the
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.
This will create an executable and store it in
`synapse_auto_compressor/target/debug/synapse_auto_compressor`.
This will create an executable and store it in `auto_compressor/target/debug/auto_compressor`.
## Example usage
Compress 100 chunks of size 500 in a remote PostgreSQL database:
```
$ synapse_auto_compressor -p postgresql://user:pass@localhost/synapse -c 500 -n 100
$ auto_compressor -p postgresql://user:pass@localhost/synapse -c 500 -n 100
```
Compress 100 chunks of size 500 using local PostgreSQL socket:
```
$ sudo -u postgres synapse_auto_compressor -p "user=postgres dbname=matrix-synapse host=/var/run/postgresql" -c 500 -n 100
```
## Running Options
- -p [POSTGRES_LOCATION] **Required**
- -p [POSTGRES_LOCATION] **Required**
The configuration for connecting to the Postgres database. This should be of the form
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
string: `"user=username password=password dbname=database host=mydomain.com"`
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
for the full details.
- -c [CHUNK_SIZE] **Required**
- -c [CHUNK_SIZE] **Required**
The number of state groups to work on at once. All of the entries from state_groups_state are
requested from the database for state groups that are worked on. Therefore small chunk
sizes may be needed on machines with low memory. Note: if the compressor fails to find
space savings on the chunk as a whole (which may well happen in rooms with lots of backfill
in) then the entire chunk is skipped.
- -n [CHUNKS_TO_COMPRESS] **Required**
- -n [CHUNKS_TO_COMPRESS] **Required**
*CHUNKS_TO_COMPRESS* chunks of size *CHUNK_SIZE* will be compressed. The higher this
number is set to, the longer the compressor will run for.
- -l [LEVELS]
- -d [LEVELS]
Sizes of each new level in the compression algorithm, as a comma-separated list.
The first entry in the list is for the lowest, most granular level, with each
subsequent entry being for the next highest level. The number of entries in the
@@ -79,14 +67,14 @@ given set of state. [defaults to "100,50,25"]
## Scheduling the compressor
The automatic tool may put some strain on the database, so it might be best to schedule
it to run at a quiet time for the server. This could be done by creating an executable
script and scheduling it with something like
script and scheduling it with something like
[cron](https://www.man7.org/linux/man-pages/man1/crontab.1.html).
# Manual tool: synapse_compress_state
## Introduction
A manual tool that reads in the rows from `state_groups_state` and `state_group_edges`
A manual tool that reads in the rows from `state_groups_state` and `state_group_edges`
tables for a specified room and calculates the changes that could be made that
(hopefully) will significantly reduce the number of rows.
@@ -97,7 +85,7 @@ that if `-t` is given then each change to a particular state group is wrapped
in a transaction). If you do wish to send the changes to the database automatically
then the `-c` flag can be set.
The SQL generated is safe to apply against the database with Synapse running.
The SQL generated is safe to apply against the database with Synapse running.
This is because the `state_groups` and `state_groups_state` tables are append-only:
once written to the database, they are never modified. There is therefore no danger
of a modification racing against a running Synapse. Further, this script makes its
@@ -107,7 +95,7 @@ from any of the queries that Synapse performs.
The tool will also ensure that the generated state deltas do give the same state
as the existing state deltas before generating any SQL.
## Building
## Building
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
for instructions on how to do this.
@@ -137,54 +125,54 @@ $ psql synapse < out.data
## Running Options
- -p [POSTGRES_LOCATION] **Required**
- -p [POSTGRES_LOCATION] **Required**
The configuration for connecting to the Postgres database. This should be of the form
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
string: `"user=username password=password dbname=database host=mydomain.com"`
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
for the full details.
- -r [ROOM_ID] **Required**
- -r [ROOM_ID] **Required**
The room to process (this is the value found in the `rooms` table of the database
not the common name for the room - it should look like: "!wOlkWNmgkAZFxbTaqj:matrix.org".
- -b [MIN_STATE_GROUP]
- -b [MIN_STATE_GROUP]
The state group to start processing from (non-inclusive).
- -n [GROUPS_TO_COMPRESS]
- -n [GROUPS_TO_COMPRESS]
How many groups to load into memory to compress (starting
from the 1st group in the room or the group specified by -b).
- -l [LEVELS]
- -l [LEVELS]
Sizes of each new level in the compression algorithm, as a comma-separated list.
The first entry in the list is for the lowest, most granular level, with each
The first entry in the list is for the lowest, most granular level, with each
subsequent entry being for the next highest level. The number of entries in the
list determines the number of levels that will be used. The sum of the sizes of
the levels affects the performance of fetching the state from the database, as the
sum of the sizes is the upper bound on the number of iterations needed to fetch a
given set of state. [defaults to "100,50,25"]
- -m [COUNT]
- -m [COUNT]
If the compressor cannot save this many rows from the database then it will stop early.
- -s [MAX_STATE_GROUP]
- -s [MAX_STATE_GROUP]
If a max_state_group is specified then only state groups with id's lower than this
number can be compressed.
- -o [FILE]
- -o [FILE]
File to output the SQL transactions to (for later running on the database).
- -t
- -t
If this flag is set then each change to a particular state group is wrapped in a
transaction. This should be done if you wish to apply the changes while synapse is
still running.
- -c
- -c
If this flag is set then the changes the compressor makes will be committed to the
database. This should be safe to use while synapse is running as it wraps the changes
to every state group in it's own transaction (as if the transaction flag was set).
- -g
- -g
If this flag is set then output the node and edge information for the state_group
directed graph built up from the predecessor state_group links. These can be looked
at in something like Gephi (https://gephi.org).
@@ -208,10 +196,10 @@ $ docker-compose down
# Using the synapse_compress_state library
If you want to use the compressor in another project, it is recomended that you
use jemalloc `https://github.com/tikv/jemallocator`.
use jemalloc `https://github.com/gnzlbg/jemallocator`.
To prevent the progress bars from being shown, use the `no-progress-bars` feature.
(See `synapse_auto_compressor/Cargo.toml` for an example)
(See `auto_compressor/Cargo.toml` for an example)
# Troubleshooting
@@ -228,29 +216,29 @@ from the machine where Postgres is running, the url will be the following:
### From remote machine
If you wish to connect from a different machine, you'll need to edit your Postgres settings to allow
remote connections. This requires updating the
remote connections. This requires updating the
[`pg_hba.conf`](https://www.postgresql.org/docs/current/auth-pg-hba-conf.html) and the `listen_addresses`
setting in [`postgresql.conf`](https://www.postgresql.org/docs/current/runtime-config-connection.html)
## Printing debugging logs
The amount of output the tools produce can be altered by setting the RUST_LOG
environment variable to something.
The amount of output the tools produce can be altered by setting the RUST_LOG
environment variable to something.
To get more logs when running the synapse_auto_compressor tool try the following:
To get more logs when running the auto_compressor tool try the following:
```
$ RUST_LOG=debug synapse_auto_compressor -p postgresql://user:pass@localhost/synapse -c 50 -n 100
$ RUST_LOG=debug auto_compressor -p postgresql://user:pass@localhost/synapse -c 50 -n 100
```
If you want to suppress all the debugging info you are getting from the
If you want to suppress all the debugging info you are getting from the
Postgres client then try:
```
RUST_LOG=synapse_auto_compressor=debug,synapse_compress_state=debug synapse_auto_compressor [etc.]
RUST_LOG=auto_compressor=debug,synapse_compress_state=debug auto_compressor [etc.]
```
This will only print the debugging information from those two packages. For more info see
This will only print the debugging information from those two packages. For more info see
https://docs.rs/env_logger/0.9.0/env_logger/.
## Building difficulties
@@ -260,7 +248,7 @@ and building on Linux will also require `pkg-config`
This can be done on Ubuntu with: `$ apt-get install libssl-dev pkg-config`
Note that building requires quite a lot of memory and out-of-memory errors might not be
Note that building requires quite a lot of memory and out-of-memory errors might not be
obvious. It's recomended you only build these tools on machines with at least 2GB of RAM.
## Auto Compressor skips chunks when running on already compressed room
@@ -277,8 +265,8 @@ be a large problem.
## Compressor is trying to increase the number of rows
Backfilling can lead to issues with compression. The synapse_auto_compressor will
skip chunks it can't reduce the size of and so this should help jump over the backfilled
Backfilling can lead to issues with compression. The auto_compressor will
skip chunks it can't reduce the size of and so this should help jump over the backfilled
state_groups. Lots of state resolution might also impact the ability to use the compressor.
To examine the state_group hierarchy run the manual tool on a room with the `-g` option

View File

@@ -0,0 +1,30 @@
[package]
name = "auto_compressor"
authors = ["William Ashton"]
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.0"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
jemallocator = "0.3.2"
rand = "0.8.0"
serial_test = "0.5.1"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
env_logger = "0.9.0"
log = "0.4.14"
log-panics = "2.0.0"
anyhow = "1.0.42"
pyo3-log = "0.4.0"
# Needed for pyo3 support
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.pyo3]
version = "0.14.1"
features = ["extension-module","abi3-py36"]

View File

@@ -7,9 +7,7 @@
//! on space reductions
use anyhow::Result;
#[cfg(feature = "pyo3")]
use log::{error, LevelFilter};
#[cfg(feature = "pyo3")]
use pyo3::{
exceptions::PyRuntimeError, prelude::pymodule, types::PyModule, PyErr, PyResult, Python,
};
@@ -28,7 +26,7 @@ pub mod state_saving;
///
/// This is needed since FromStr cannot be implemented for structs
/// that aren't defined in this scope
#[derive(PartialEq, Eq, Debug, Clone)]
#[derive(PartialEq, Debug)]
pub struct LevelInfo(pub Vec<Level>);
// Implement FromStr so that an argument of the form "100,50,25"
@@ -58,24 +56,21 @@ impl FromStr for LevelInfo {
}
// PyO3 INTERFACE STARTS HERE
#[cfg(feature = "pyo3")]
#[pymodule]
fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
fn auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
let _ = pyo3_log::Logger::default()
// don't send out anything lower than a warning from other crates
.filter(LevelFilter::Warn)
// don't log warnings from synapse_compress_state, the
// synapse_auto_compressor handles these situations and provides better
// log messages
// don't log warnings from synapse_compress_state, the auto_compressor handles these
// situations and provides better log messages
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Error)
// log info and above for the synapse_auto_compressor
.filter_target("synapse_auto_compressor".to_owned(), LevelFilter::Debug)
// log info and above for the auto_compressor
.filter_target("auto_compressor".to_owned(), LevelFilter::Debug)
.install();
// ensure any panics produce error messages in the log
log_panics::init();
#[pyfn(m)]
#[pyo3(name = "compress_largest_rooms")]
#[pyfn(m, compress_largest_rooms)]
fn compress_state_events_table(
py: Python,
db_url: String,
@@ -97,7 +92,7 @@ fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
number_of_chunks: i64,
) -> PyResult<()> {
// Announce the start of the program to the logs
log::info!("synapse_auto_compressor started");
log::info!("auto_compressor started");
// Parse the default_level string into a LevelInfo struct
let default_levels: LevelInfo = match default_levels.parse() {
@@ -125,7 +120,7 @@ fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e)));
}
log::info!("synapse_auto_compressor finished");
log::info!("auto_compressor finished");
Ok(())
}
Ok(())

View File

@@ -16,48 +16,54 @@
//! the state_compressor_state table so that the compressor can seemlesly
//! continue from where it left off.
#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
use auto_compressor::{manager, state_saving, LevelInfo};
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use log::LevelFilter;
use std::env;
use synapse_auto_compressor::{manager, state_saving, LevelInfo};
use std::{env, fs::OpenOptions};
/// Execution starts here
fn main() {
// setup the logger for the synapse_auto_compressor
// setup the logger for the auto_compressor
// The default can be overwritten with RUST_LOG
// see the README for more information
let log_file = OpenOptions::new()
.append(true)
.create(true)
.open("auto_compressor.log")
.unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e));
if env::var("RUST_LOG").is_err() {
let mut log_builder = env_logger::builder();
// Ensure panics still come through
log_builder.filter_module("panic", LevelFilter::Error);
// Only output errors from the synapse_compress state library
log_builder.filter_module("synapse_compress_state", LevelFilter::Error);
// Output log levels info and above from synapse_auto_compressor
log_builder.filter_module("synapse_auto_compressor", LevelFilter::Info);
// Output log levels info and above from auto_compressor
log_builder.filter_module("auto_compressor", LevelFilter::Info);
log_builder.init();
} else {
// If RUST_LOG was set then use that
let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
// Ensure panics still come through
log_builder.filter_module("panic", LevelFilter::Error);
log_builder.init();
}
log_panics::init();
// Announce the start of the program to the logs
log::info!("synapse_auto_compressor started");
log::info!("auto_compressor started");
// parse the command line arguments using the clap crate
let arguments = Command::new(crate_name!())
let arguments = App::new(crate_name!())
.version(crate_version!())
.author(crate_authors!("\n"))
.about(crate_description!())
.arg(
Arg::new("postgres-url")
.short('p')
Arg::with_name("postgres-url")
.short("p")
.value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.")
.long_help(concat!(
@@ -67,13 +73,12 @@ fn main() {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.num_args(1)
.takes_value(true)
.required(true),
).arg(
Arg::new("chunk_size")
.short('c')
Arg::with_name("chunk_size")
.short("c")
.value_name("COUNT")
.value_parser(clap::value_parser!(i64))
.help("The maximum number of state groups to load into memroy at once")
.long_help(concat!(
"The number of state_groups to work on at once. All of the entries",
@@ -84,13 +89,12 @@ fn main() {
" chunk as a whole (which may well happen in rooms with lots",
" of backfill in) then the entire chunk is skipped.)",
))
.num_args(1)
.takes_value(true)
.required(true),
).arg(
Arg::new("default_levels")
.short('l')
Arg::with_name("default_levels")
.short("l")
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelInfo))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -103,43 +107,40 @@ fn main() {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.num_args(1)
.takes_value(true)
.required(false),
).arg(
Arg::new("number_of_chunks")
.short('n')
Arg::with_name("number_of_chunks")
.short("n")
.value_name("CHUNKS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("The number of chunks to compress")
.help("The number of chunks to compress")
.long_help(concat!(
"This many chunks of the database will be compressed. The higher this number is set to, ",
"the longer the compressor will run for."
))
.num_args(1)
.takes_value(true)
.required(true),
).get_matches();
// The URL of the database
let db_url = arguments
.get_one::<String>("postgres-url")
.value_of("postgres-url")
.expect("A database url is required");
// The number of state groups to work on at once
let chunk_size = arguments
.get_one("chunk_size")
.copied()
.value_of("chunk_size")
.map(|s| s.parse().expect("chunk_size must be an integer"))
.expect("A chunk size is required");
// The default structure to use when compressing
let default_levels = arguments
.get_one::<LevelInfo>("default_levels")
.cloned()
.unwrap();
let default_levels = value_t!(arguments, "default_levels", LevelInfo)
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
// The number of rooms to compress with this tool
let number_of_chunks = arguments
.get_one("number_of_chunks")
.copied()
.value_of("number_of_chunks")
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
.expect("number_of_chunks is required");
// Connect to the database and create the 2 tables this tool needs
@@ -154,5 +155,5 @@ fn main() {
manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks)
.unwrap();
log::info!("synapse_auto_compressor finished");
log::info!("auto_compressor finished");
}

View File

@@ -6,16 +6,16 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
string_cache = "0.8.7"
serial_test = "2.0.0"
openssl = "0.10.60"
postgres = "0.19.7"
string_cache = "0.8.0"
serial_test = "0.5.1"
openssl = "0.10.32"
postgres = "0.19.0"
postgres-openssl = "0.5.0"
rand = "0.8.5"
rand = "0.8.0"
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
synapse_auto_compressor = { path = "../synapse_auto_compressor/" }
env_logger = "0.10.0"
log = "0.4.20"
auto_compressor = { path = "../auto_compressor/" }
env_logger = "0.9.0"
log = "0.4.14"
[dependencies.state-map]
git = "https://github.com/matrix-org/rust-matrix-state-map"
git = "https://github.com/matrix-org/rust-matrix-state-map"

View File

@@ -4,12 +4,7 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use state_map::StateMap;
use std::{
borrow::Cow,
collections::BTreeMap,
env,
fmt::{self, Write as _},
};
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
use string_cache::DefaultAtom as Atom;
use synapse_compress_state::StateGroupEntry;
@@ -28,48 +23,47 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
let mut client = Client::connect(DB_URL, connector).unwrap();
// build up the query
let mut sql = String::new();
let room_id = PGEscape(room_id);
let event_id = PGEscape("left_blank");
let mut sql = "".to_string();
for (sg, entry) in state_group_map {
// create the entry for state_groups
writeln!(
sql,
"INSERT INTO state_groups (id, room_id, event_id) \
VALUES ({sg}, {room_id}, {event_id});",
)
.expect("Writing to a String cannot fail");
sql.push_str(&format!(
"INSERT INTO state_groups (id, room_id, event_id) VALUES ({},{},{});\n",
sg,
PGEscape(room_id),
PGEscape("left_blank")
));
// create the entry in state_group_edges IF exists
if let Some(prev_sg) = entry.prev_state_group {
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
sql.push_str(&format!(
"INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n",
sg, prev_sg
));
}
// write entry for each row in delta
if !entry.state_map.is_empty() {
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
let mut first = true;
for ((t, s), e) in entry.state_map.iter() {
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
}
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
}
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
sql.push_str(";\n");
}
}
@@ -185,7 +179,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
// the predecessor (so have split this into a different query)
let query_pred = r#"
SELECT prev_state_group
FROM state_group_edges
FROM state_group_edges
WHERE state_group = $1
"#;
@@ -195,7 +189,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
while let Some(sg) = next_group {
// get predecessor from state_group_edges
let mut pred = client.query_raw(query_pred, [sg]).unwrap();
let mut pred = client.query_raw(query_pred, &[sg]).unwrap();
// set next_group to predecessor
next_group = match pred.next().unwrap() {
@@ -209,7 +203,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
}
drop(pred);
let mut rows = client.query_raw(query_deltas, [sg]).unwrap();
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
while let Some(row) = rows.next().unwrap() {
// Copy the single delta from the predecessor stored in this row
@@ -249,7 +243,7 @@ pub fn database_structure_matches_map(state_group_map: &BTreeMap<i64, StateGroup
// the predecessor (so have split this into a different query)
let query_pred = r#"
SELECT prev_state_group
FROM state_group_edges
FROM state_group_edges
WHERE state_group = $1
"#;
@@ -362,7 +356,7 @@ fn functions_are_self_consistent() {
}
pub fn setup_logger() {
// setup the logger for the synapse_auto_compressor
// setup the logger for the auto_compressor
// The default can be overwritten with RUST_LOG
// see the README for more information
if env::var("RUST_LOG").is_err() {
@@ -372,7 +366,7 @@ pub fn setup_logger() {
// default to printing the debug information for both packages being tested
// (Note that just setting the global level to debug will log every sql transaction)
log_builder.filter_module("synapse_compress_state", LevelFilter::Debug);
log_builder.filter_module("synapse_auto_compressor", LevelFilter::Debug);
log_builder.filter_module("auto_compressor", LevelFilter::Debug);
// use try_init() incase the logger has been setup by some previous test
let _ = log_builder.try_init();
} else {

View File

@@ -1,5 +1,9 @@
use std::collections::BTreeMap;
use auto_compressor::{
manager::{compress_chunks_of_database, run_compressor_on_room_chunk},
state_saving::{connect_to_database, create_tables_if_needed},
};
use compressor_integration_tests::{
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
database_structure_matches_map, empty_database,
@@ -10,10 +14,6 @@ use compressor_integration_tests::{
setup_logger, DB_URL,
};
use serial_test::serial;
use synapse_auto_compressor::{
manager::{compress_chunks_of_database, run_compressor_on_room_chunk},
state_saving::{connect_to_database, create_tables_if_needed},
};
use synapse_compress_state::Level;
#[test]

View File

@@ -1,9 +1,9 @@
use compressor_integration_tests::{clear_compressor_state, setup_logger, DB_URL};
use serial_test::serial;
use synapse_auto_compressor::state_saving::{
use auto_compressor::state_saving::{
connect_to_database, create_tables_if_needed, read_room_compressor_state,
write_room_compressor_state,
};
use compressor_integration_tests::{clear_compressor_state, setup_logger, DB_URL};
use serial_test::serial;
use synapse_compress_state::Level;
#[test]

View File

@@ -46,11 +46,10 @@ fn run_succeeds_without_crashing() {
let transactions = true;
let graphs = false;
let commit_changes = false;
let verify = true;
let config = Config::new(
db_url,
room_id,
db_url.clone(),
room_id.clone(),
output_file,
min_state_group,
groups_to_compress,
@@ -60,7 +59,6 @@ fn run_succeeds_without_crashing() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -96,7 +94,6 @@ fn changes_commited_if_no_min_saved_rows() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -110,7 +107,6 @@ fn changes_commited_if_no_min_saved_rows() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -164,7 +160,6 @@ fn changes_commited_if_min_saved_rows_exceeded() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -178,7 +173,6 @@ fn changes_commited_if_min_saved_rows_exceeded() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -233,7 +227,6 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -247,7 +240,6 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -288,7 +280,6 @@ fn run_panics_if_invalid_db_url() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -302,7 +293,6 @@ fn run_panics_if_invalid_db_url() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -346,7 +336,6 @@ fn run_only_affects_given_room_id() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -360,7 +349,6 @@ fn run_only_affects_given_room_id() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -418,7 +406,6 @@ fn run_respects_groups_to_compress() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url,
@@ -432,7 +419,6 @@ fn run_respects_groups_to_compress() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -506,7 +492,6 @@ fn run_is_idempotent_when_run_on_whole_room() {
let transactions = true;
let graphs = false;
let commit_changes = true;
let verify = true;
let config1 = Config::new(
db_url.clone(),
@@ -520,23 +505,21 @@ fn run_is_idempotent_when_run_on_whole_room() {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
let config2 = Config::new(
db_url,
room_id,
db_url.clone(),
room_id.clone(),
output_file2,
min_state_group,
groups_to_compress,
min_saved_rows,
max_state_group,
level_sizes,
level_sizes.clone(),
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();

View File

@@ -56,7 +56,7 @@ fn continue_run_called_twice_same_as_run() {
let start = Some(6);
let chunk_size = 7;
let level_info = chunk_stats_1.new_level_info;
let level_info = chunk_stats_1.new_level_info.clone();
// Run the compressor with those settings
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();

View File

@@ -18,7 +18,7 @@ the compressor is run.
3. Navigate to the correct location
For the automatic tool:
`$ cd /home/synapse/rust-synapse-compress-state/synpase_auto_compressor`
`$ cd /home/synapse/rust-synapse-compress-state/auto_compressor`
For the manual tool:
`$ cd /home/synapse/rust-synapse-compress-state`
@@ -30,9 +30,9 @@ This will install the relevant compressor tool into the activated virtual enviro
## Automatic tool example:
```python
import synapse_auto_compressor
import auto_compressor
synapse_auto_compressor.compress_state_events_table(
auto_compressor.compress_state_events_table(
db_url="postgresql://localhost/synapse",
chunk_size=500,
default_levels="100,50,25",
@@ -51,4 +51,4 @@ synapse_compress_state.run_compression(
output_file="out.sql",
transactions=True
)
```
```

View File

@@ -1,8 +1,4 @@
[build-system]
requires = ["maturin>=1.0,<2.0"]
requires = ["maturin>=0.11,<0.12"]
build-backend = "maturin"
[tool.maturin]
profile = "release"
features = ["pyo3"]
no-default-features = true
cargo-extra-args = "--no-default-features"

View File

@@ -30,7 +30,7 @@
use indicatif::{ProgressBar, ProgressStyle};
use state_map::StateMap;
use std::{collections::BTreeMap, time::Duration};
use std::collections::BTreeMap;
use string_cache::DefaultAtom as Atom;
use super::{collapse_state_maps, StateGroupEntry};
@@ -156,7 +156,7 @@ impl<'a> Compressor<'a> {
) -> Compressor<'a> {
let levels = level_info
.iter()
.map(|l| Level::restore(l.max_length, l.current_chain_length, l.head))
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).head))
.collect();
let mut compressor = Compressor {
@@ -181,18 +181,17 @@ impl<'a> Compressor<'a> {
panic!("Can only call `create_new_tree` once");
}
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else {
ProgressBar::new(self.original_state_map.len() as u64)
};
pb = ProgressBar::new(self.original_state_map.len() as u64);
}
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
);
pb.set_message("state groups");
pb.enable_steady_tick(Duration::from_millis(100));
pb.enable_steady_tick(100);
for (&state_group, entry) in self.original_state_map {
// Check whether this entry is in_range or is just present in the map due to being

View File

@@ -96,7 +96,11 @@ fn create_new_tree_does_nothing_if_already_compressed() {
let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64>
let prev = pred_group.copied();
let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
// insert that edge into the initial map
initial.insert(

View File

@@ -54,7 +54,7 @@ fn get_head_returns_head() {
#[test]
fn has_space_returns_true_if_empty() {
let l = Level::new(15);
assert!(l.has_space());
assert_eq!(l.has_space(), true);
}
#[test]
@@ -65,7 +65,7 @@ fn has_space_returns_true_if_part_full() {
l.update(1, true);
l.update(143, true);
l.update(15, true);
assert!(l.has_space());
assert_eq!(l.has_space(), true);
}
#[test]
@@ -76,5 +76,5 @@ fn has_space_returns_false_if_full() {
l.update(3, true);
l.update(4, true);
l.update(5, true);
assert!(!l.has_space());
assert_eq!(l.has_space(), false);
}

View File

@@ -145,7 +145,11 @@ fn stats_correct_if_no_changes() {
let pred_group = initial_edges.get(&i);
// Need Option<i64> not Option<&i64>
let prev = pred_group.copied();
let prev;
match pred_group {
Some(i) => prev = Some(*i),
None => prev = None,
}
// insert that edge into the initial map
initial.insert(

View File

@@ -18,7 +18,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
use postgres_openssl::MakeTlsConnector;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{borrow::Cow, collections::BTreeMap, fmt, time::Duration};
use std::{borrow::Cow, collections::BTreeMap, fmt};
use crate::{compressor::Level, generate_sql};
@@ -237,9 +237,15 @@ fn load_map_from_db(
let mut missing_sgs: Vec<_> = state_group_map
.iter()
.filter_map(|(_sg, entry)| {
entry
.prev_state_group
.filter(|&prev_sg| !state_group_map.contains_key(&prev_sg))
if let Some(prev_sg) = entry.prev_state_group {
if state_group_map.contains_key(&prev_sg) {
None
} else {
Some(prev_sg)
}
} else {
None
}
})
.collect();
@@ -366,17 +372,16 @@ fn get_initial_data_from_db(
// Copy the data from the database into a map
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else {
ProgressBar::new_spinner()
};
pb = ProgressBar::new_spinner();
}
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner} [{elapsed}] {pos} rows retrieved")
.unwrap(),
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
);
pb.enable_steady_tick(Duration::from_millis(100));
pb.enable_steady_tick(100);
while let Some(row) = rows.next().unwrap() {
// The row in the map to copy the data to
@@ -532,18 +537,17 @@ pub fn send_changes_to_db(
debug!("Writing changes...");
// setup the progress bar
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else {
ProgressBar::new(old_map.len() as u64)
};
pb = ProgressBar::new(old_map.len() as u64);
}
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
);
pb.set_message("state groups");
pb.enable_steady_tick(Duration::from_millis(100));
pb.enable_steady_tick(100);
for sql_transaction in generate_sql(old_map, new_map, room_id) {
if sql_transaction.is_empty() {

View File

@@ -20,19 +20,14 @@
// of arguments - this hopefully doesn't make the code unclear
// #[allow(clippy::too_many_arguments)] is therefore used around some functions
use log::{info, warn};
#[cfg(feature = "pyo3")]
use log::{info, warn, LevelFilter};
use pyo3::{exceptions, prelude::*};
#[cfg(feature = "clap")]
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
use indicatif::{ProgressBar, ProgressStyle};
use rayon::prelude::*;
use state_map::StateMap;
use std::{
collections::BTreeMap, convert::TryInto, fmt::Write as _, fs::File, io::Write, str::FromStr,
time::Duration,
};
use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr};
use string_cache::DefaultAtom as Atom;
mod compressor;
@@ -54,7 +49,7 @@ pub struct StateGroupEntry {
}
/// Helper struct for parsing the `level_sizes` argument.
#[derive(PartialEq, Debug, Clone)]
#[derive(PartialEq, Debug)]
struct LevelSizes(Vec<usize>);
impl FromStr for LevelSizes {
@@ -114,22 +109,18 @@ pub struct Config {
// Whether or not to commit changes to the database automatically
// N.B. currently assumes transactions is true (to be on the safe side)
commit_changes: bool,
// Whether to verify the correctness of the compressed state groups by
// comparing them to the original groups
verify: bool,
}
#[cfg(feature = "clap")]
impl Config {
/// Build up config from command line arguments
pub fn parse_arguments() -> Config {
let matches = Command::new(crate_name!())
let matches = App::new(crate_name!())
.version(crate_version!())
.author(crate_authors!("\n"))
.about(crate_description!())
.arg(
Arg::new("postgres-url")
.short('p')
Arg::with_name("postgres-url")
.short("p")
.value_name("POSTGRES_LOCATION")
.help("The configruation for connecting to the postgres database.")
.long_help(concat!(
@@ -139,69 +130,64 @@ impl Config {
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
"for the full details."
))
.num_args(1)
.takes_value(true)
.required(true),
).arg(
Arg::new("room_id")
.short('r')
Arg::with_name("room_id")
.short("r")
.value_name("ROOM_ID")
.help("The room to process")
.long_help(concat!(
"The room to process. This is the value found in the rooms table of the database",
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
))
.num_args(1)
.takes_value(true)
.required(true),
).arg(
Arg::new("min_state_group")
.short('b')
Arg::with_name("min_state_group")
.short("b")
.value_name("MIN_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The state group to start processing from (non inclusive)")
.num_args(1)
.takes_value(true)
.required(false),
).arg(
Arg::new("min_saved_rows")
.short('m')
Arg::with_name("min_saved_rows")
.short("m")
.value_name("COUNT")
.value_parser(clap::value_parser!(i32))
.help("Abort if fewer than COUNT rows would be saved")
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
.num_args(1)
.takes_value(true)
.required(false),
).arg(
Arg::new("groups_to_compress")
.short('n')
Arg::with_name("groups_to_compress")
.short("n")
.value_name("GROUPS_TO_COMPRESS")
.value_parser(clap::value_parser!(i64))
.help("How many groups to load into memory to compress")
.help("How many groups to load into memory to compress")
.long_help(concat!(
"How many groups to load into memory to compress (starting from",
" the 1st group in the room or the group specified by -s)"))
.num_args(1)
.takes_value(true)
.required(false),
).arg(
Arg::new("output_file")
.short('o')
Arg::with_name("output_file")
.short("o")
.value_name("FILE")
.help("File to output the changes to in SQL")
.num_args(1),
.takes_value(true),
).arg(
Arg::new("max_state_group")
.short('s')
Arg::with_name("max_state_group")
.short("s")
.value_name("MAX_STATE_GROUP")
.value_parser(clap::value_parser!(i64))
.help("The maximum state group to process up to")
.long_help(concat!(
"If a max_state_group is specified then only state groups with id's lower",
" than this number are able to be compressed."))
.num_args(1)
.takes_value(true)
.required(false),
).arg(
Arg::new("level_sizes")
.short('l')
Arg::with_name("level_sizes")
.short("l")
.value_name("LEVELS")
.value_parser(clap::value_parser!(LevelSizes))
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
.long_help(concat!(
"Sizes of each new level in the compression algorithm, as a comma separated list.",
@@ -214,64 +200,67 @@ impl Config {
" iterations needed to fetch a given set of state.",
))
.default_value("100,50,25")
.num_args(1),
.takes_value(true),
).arg(
Arg::new("transactions")
.short('t')
.action(clap::ArgAction::SetTrue)
Arg::with_name("transactions")
.short("t")
.help("Whether to wrap each state group change in a transaction")
.long_help(concat!("If this flag is set then then each change to a particular",
" state group is wrapped in a transaction. This should be done if you wish to",
" apply the changes while synapse is still running."))
.requires("output_file"),
).arg(
Arg::new("graphs")
.short('g')
.action(clap::ArgAction::SetTrue)
Arg::with_name("graphs")
.short("g")
.help("Output before and after graphs")
.long_help(concat!("If this flag is set then output the node and edge information for",
" the state_group directed graph built up from the predecessor state_group links.",
" These can be looked at in something like Gephi (https://gephi.org)")),
).arg(
Arg::new("commit_changes")
.short('c')
.action(clap::ArgAction::SetTrue)
Arg::with_name("commit_changes")
.short("c")
.help("Commit changes to the database")
.long_help(concat!("If this flag is set then the changes the compressor makes will",
" be committed to the database. This should be safe to use while synapse is running",
" as it assumes by default that the transactions flag is set")),
).arg(
Arg::new("no_verify")
.short('N')
.action(clap::ArgAction::SetTrue)
.help("Do not double-check that the compression was performed correctly")
.long_help(concat!("If this flag is set then the verification of the compressed",
" state groups, which compares them to the original groups, is skipped. This",
" saves time at the cost of potentially generating mismatched state.")),
).get_matches();
let db_url = matches
.get_one::<String>("postgres-url")
.value_of("postgres-url")
.expect("db url should be required");
let output_file = matches.get_one::<String>("output_file").map(|path| {
let output_file = matches.value_of("output_file").map(|path| {
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
});
let room_id = matches
.get_one::<String>("room_id")
.value_of("room_id")
.expect("room_id should be required since no file");
let min_state_group = matches.get_one("min_state_group").copied();
let groups_to_compress = matches.get_one("groups_to_compress").copied();
let min_saved_rows = matches.get_one("min_saved_rows").copied();
let max_state_group = matches.get_one("max_state_group").copied();
let level_sizes = matches.get_one("level_sizes").cloned().unwrap();
let min_state_group = matches
.value_of("min_state_group")
.map(|s| s.parse().expect("min_state_group must be an integer"));
let transactions = matches.get_flag("transactions");
let graphs = matches.get_flag("graphs");
let commit_changes = matches.get_flag("commit_changes");
let verify = !matches.get_flag("no_verify");
let groups_to_compress = matches
.value_of("groups_to_compress")
.map(|s| s.parse().expect("groups_to_compress must be an integer"));
let min_saved_rows = matches
.value_of("min_saved_rows")
.map(|v| v.parse().expect("COUNT must be an integer"));
let max_state_group = matches
.value_of("max_state_group")
.map(|s| s.parse().expect("max_state_group must be an integer"));
let level_sizes = value_t!(matches, "level_sizes", LevelSizes)
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
let transactions = matches.is_present("transactions");
let graphs = matches.is_present("graphs");
let commit_changes = matches.is_present("commit_changes");
Config {
db_url: String::from(db_url),
@@ -285,7 +274,6 @@ impl Config {
transactions,
graphs,
commit_changes,
verify,
}
}
}
@@ -374,8 +362,8 @@ pub fn run(mut config: Config) {
}
if let Some(min) = config.min_saved_rows {
let saving = original_summed_size.saturating_sub(compressed_summed_size);
if saving < min.try_into().unwrap_or(0) {
let saving = (original_summed_size - compressed_summed_size) as i32;
if saving < min {
warn!(
"Only {} rows would be saved by this compression. Skipping output.",
saving
@@ -384,9 +372,7 @@ pub fn run(mut config: Config) {
}
}
if config.verify {
check_that_maps_match(&state_group_map, new_state_group_map);
}
check_that_maps_match(&state_group_map, new_state_group_map);
// If we are given an output file, we output the changes as SQL. If the
// `transactions` argument is set we wrap each change to a state group in a
@@ -422,7 +408,8 @@ fn generate_sql<'a>(
new_map: &'a BTreeMap<i64, StateGroupEntry>,
room_id: &'a str,
) -> impl Iterator<Item = String> + 'a {
old_map.iter().map(move |(sg, old_entry)| {
old_map.iter().map(move |(sg,old_entry)| {
let new_entry = &new_map[sg];
// Check if the new map has a different entry for this state group
@@ -432,50 +419,48 @@ fn generate_sql<'a>(
let mut sql = String::new();
// remove the current edge
writeln!(
sql,
"DELETE FROM state_group_edges WHERE state_group = {sg};",
)
.expect("Writing to a String cannot fail");
sql.push_str(&format!(
"DELETE FROM state_group_edges WHERE state_group = {};\n",
sg
));
// if the new entry has a predecessor then put that into state_group_edges
if let Some(prev_sg) = new_entry.prev_state_group {
writeln!(
sql,
"INSERT INTO state_group_edges (state_group, prev_state_group) \
VALUES ({sg}, {prev_sg});",
)
.unwrap();
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
}
// remove the current deltas for this state group
writeln!(
sql,
"DELETE FROM state_groups_state WHERE state_group = {sg};",
)
.unwrap();
sql.push_str(&format!(
"DELETE FROM state_groups_state WHERE state_group = {};\n",
sg
));
if !new_entry.state_map.is_empty() {
// place all the deltas for the state group in the new map into state_groups_state
sql.push_str(
"INSERT INTO state_groups_state \
(state_group, room_id, type, state_key, event_id) \
VALUES\n",
);
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
let room_id = PGEscape(room_id);
let mut first = true;
for ((t, s), e) in new_entry.state_map.iter() {
let t = PGEscape(t);
let s = PGEscape(s);
let e = PGEscape(e);
// Add a comma at the start if not the first row to be inserted
if first {
sql.push_str(" ");
first = false;
} else {
sql.push_str(" ,");
}
// write the row to be inserted of the form:
// write the row to be insterted of the form:
// (state_group, room_id, type, state_key, event_id)
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
sql.push_str(&format!(
"({}, {}, {}, {}, {})",
sg,
PGEscape(room_id),
PGEscape(t),
PGEscape(s),
PGEscape(e)
));
}
// Replace the last comma with a semicolon
sql.replace_range((sql.len() - 2).., ";\n");
sql.push_str(";\n");
}
sql
@@ -507,18 +492,17 @@ fn output_sql(
info!("Writing changes...");
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else {
ProgressBar::new(old_map.len() as u64)
};
pb = ProgressBar::new(old_map.len() as u64);
}
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
);
pb.set_message("state groups");
pb.enable_steady_tick(Duration::from_millis(100));
pb.enable_steady_tick(100);
if let Some(output) = &mut config.output_file {
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
@@ -566,7 +550,7 @@ pub fn continue_run(
let (state_group_map, max_group_found) =
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
let original_num_rows = state_group_map.values().map(|v| v.state_map.len()).sum();
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
// Now we actually call the compression algorithm.
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
@@ -623,18 +607,17 @@ fn check_that_maps_match(
) {
info!("Checking that state maps match...");
let pb = if cfg!(feature = "no-progress-bars") {
ProgressBar::hidden()
let pb: ProgressBar;
if cfg!(feature = "no-progress-bars") {
pb = ProgressBar::hidden();
} else {
ProgressBar::new(old_map.len() as u64)
};
pb = ProgressBar::new(old_map.len() as u64);
}
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
.unwrap(),
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
);
pb.set_message("state groups");
pb.enable_steady_tick(Duration::from_millis(100));
pb.enable_steady_tick(100);
// Now let's iterate through and assert that the state for each group
// matches between the two versions.
@@ -712,7 +695,6 @@ impl Config {
transactions: bool,
graphs: bool,
commit_changes: bool,
verify: bool,
) -> Result<Config, String> {
let mut output: Option<File> = None;
if let Some(file) = output_file {
@@ -740,7 +722,6 @@ impl Config {
transactions,
graphs,
commit_changes,
verify,
})
}
}
@@ -750,31 +731,22 @@ impl Config {
/// Default arguments are equivalent to using the command line tool
/// No default's are provided for db_url or room_id since these arguments
/// are compulsory (so that new() act's like parse_arguments())
#[cfg(feature = "pyo3")]
#[allow(clippy::too_many_arguments)]
#[pyfunction]
#[pyo3(signature = (
#[pyfunction(
// db_url has no default
db_url,
// room_id has no default
room_id,
output_file = None,
min_state_group = None,
groups_to_compress = None,
min_saved_rows = None,
max_state_group = None,
level_sizes = String::from("100,50,25"),
// room_id has no default
output_file = "None",
min_state_group = "None",
groups_to_compress = "None",
min_saved_rows = "None",
max_state_group = "None",
level_sizes = "String::from(\"100,50,25\")",
// have this default to true as is much worse to not have it if you need it
// than to have it and not need it
transactions = true,
graphs = false,
commit_changes = false,
verify = true,
))]
)]
fn run_compression(
db_url: String,
room_id: String,
@@ -787,7 +759,6 @@ fn run_compression(
transactions: bool,
graphs: bool,
commit_changes: bool,
verify: bool,
) -> PyResult<()> {
let config = Config::new(
db_url,
@@ -801,7 +772,6 @@ fn run_compression(
transactions,
graphs,
commit_changes,
verify,
);
match config {
Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)),
@@ -813,15 +783,14 @@ fn run_compression(
}
/// Python module - "import synapse_compress_state" to use
#[cfg(feature = "pyo3")]
#[pymodule]
fn synapse_compress_state(_py: Python, m: &PyModule) -> PyResult<()> {
let _ = pyo3_log::Logger::default()
// don't send out anything lower than a warning from other crates
.filter(log::LevelFilter::Warn)
// don't log warnings from synapse_compress_state, the synapse_auto_compressor handles these
.filter(LevelFilter::Warn)
// don't log warnings from synapse_compress_state, the auto_compressor handles these
// situations and provides better log messages
.filter_target("synapse_compress_state".to_owned(), log::LevelFilter::Debug)
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Debug)
.install();
// ensure any panics produce error messages in the log
log_panics::init();
@@ -986,6 +955,7 @@ mod lib_tests {
#[test]
fn check_that_maps_match_returns_if_both_empty() {
check_that_maps_match(&BTreeMap::new(), &BTreeMap::new());
assert!(true);
}
#[test]
@@ -1018,6 +988,7 @@ mod lib_tests {
}
check_that_maps_match(&old_map, &BTreeMap::new());
assert!(true);
}
#[test]
@@ -1053,6 +1024,7 @@ mod lib_tests {
}
check_that_maps_match(&BTreeMap::new(), &new_map);
assert!(true);
}
#[test]
@@ -1084,6 +1056,7 @@ mod lib_tests {
}
check_that_maps_match(&BTreeMap::new(), &old_map.clone());
assert!(true);
}
#[test]
@@ -1146,6 +1119,7 @@ mod lib_tests {
}
check_that_maps_match(&old_map, &new_map);
assert!(true);
}
#[test]
@@ -1227,6 +1201,7 @@ mod lib_tests {
);
check_that_maps_match(&old_map, &new_map);
assert!(true);
}
//TODO: tests for correct SQL code produced by output_sql
@@ -1249,7 +1224,6 @@ mod pyo3_tests {
let transactions = false;
let graphs = false;
let commit_changes = false;
let verify = true;
let config = Config::new(
db_url.clone(),
@@ -1263,7 +1237,6 @@ mod pyo3_tests {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
@@ -1297,7 +1270,6 @@ mod pyo3_tests {
let transactions = true;
let graphs = true;
let commit_changes = true;
let verify = true;
let config = Config::new(
db_url.clone(),
@@ -1311,12 +1283,11 @@ mod pyo3_tests {
transactions,
graphs,
commit_changes,
verify,
)
.unwrap();
assert_eq!(config.db_url, db_url);
assert!(config.output_file.is_some());
assert!(!config.output_file.is_none());
assert_eq!(config.room_id, room_id);
assert_eq!(config.min_state_group, Some(3225));
assert_eq!(config.groups_to_compress, Some(970));

View File

@@ -18,7 +18,7 @@
#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
use log::LevelFilter;
use std::env;

View File

@@ -1,56 +0,0 @@
[package]
name = "synapse_auto_compressor"
authors = ["William Ashton"]
version = "0.1.3"
edition = "2018"
[[bin]]
name = "synapse_auto_compressor"
required-features = ["clap"]
[package.metadata.maturin]
requires-python = ">=3.7"
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
classifier = [
"Development Status :: 4 - Beta",
"Programming Language :: Rust",
]
[dependencies]
openssl = { version = "0.10.60", features = ["vendored"] }
postgres = "0.19.7"
postgres-openssl = "0.5.0"
rand = "0.8.5"
serial_test = "2.0.0"
synapse_compress_state = { path = "../", features = ["no-progress-bars"], default-features = false }
env_logger = "0.10.0"
log = "0.4.20"
log-panics = "2.1.0"
anyhow = "1.0.75"
# Needed for pyo3 support
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies.clap]
version = "4.4.2"
features = ["cargo"]
optional = true
[dependencies.pyo3]
version = "0.19.2"
features = ["extension-module"]
optional = true
[dependencies.pyo3-log]
version = "0.8.3"
optional = true
[dependencies.tikv-jemallocator]
version = "0.5.4"
optional = true
[features]
default = ["clap", "jemalloc"]
jemalloc = ["tikv-jemallocator", "synapse_compress_state/jemalloc"]
pyo3 = ["dep:pyo3", "dep:pyo3-log", "synapse_compress_state/pyo3"]

View File

@@ -1,12 +0,0 @@
# Auto Compressor
See the top level readme for information.
## Publishing to PyPI
Bump the version number and run from the root directory of the repo:
```
docker run -it --rm -v $(pwd):/io -e OPENSSL_STATIC=1 konstin2/maturin publish -m synapse_auto_compressor/Cargo.toml --cargo-extra-args "\--features='openssl/vendored'"
```

View File

@@ -1,8 +0,0 @@
[build-system]
requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"
[tool.maturin]
profile = "release"
features = ["pyo3"]
no-default-features = true