Compare commits
15 Commits
azren/log_
...
v0.1.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd62afb3d5 | ||
|
|
65ffce2362 | ||
|
|
b4f3d8adbd | ||
|
|
04ac0529e1 | ||
|
|
d6df1ac5a4 | ||
|
|
32e43da3e8 | ||
|
|
4c7316311b | ||
|
|
83e8dedfa9 | ||
|
|
38d800a775 | ||
|
|
5272acedd2 | ||
|
|
9d642cfb67 | ||
|
|
0111079153 | ||
|
|
bf57e81f54 | ||
|
|
2fc2db2848 | ||
|
|
4f823fba78 |
3
.dockerignore
Normal file
3
.dockerignore
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
.git
|
||||||
|
.github
|
||||||
|
/target
|
||||||
2
.github/CODEOWNERS
vendored
Normal file
2
.github/CODEOWNERS
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
# Automatically request reviews from the synapse-core team when a pull request comes in.
|
||||||
|
* @matrix-org/synapse-core
|
||||||
764
Cargo.lock
generated
764
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
20
Cargo.toml
20
Cargo.toml
@@ -1,5 +1,5 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = ["auto_compressor", "compressor_integration_tests"]
|
members = ["synapse_auto_compressor", "compressor_integration_tests"]
|
||||||
|
|
||||||
[package]
|
[package]
|
||||||
authors = ["Erik Johnston"]
|
authors = ["Erik Johnston"]
|
||||||
@@ -9,9 +9,7 @@ version = "0.1.0"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = "2.33.0"
|
|
||||||
indicatif = "0.16.0"
|
indicatif = "0.16.0"
|
||||||
jemallocator = "0.3.2"
|
|
||||||
openssl = "0.10.32"
|
openssl = "0.10.32"
|
||||||
postgres = "0.19.0"
|
postgres = "0.19.0"
|
||||||
postgres-openssl = "0.5.0"
|
postgres-openssl = "0.5.0"
|
||||||
@@ -20,7 +18,7 @@ rayon = "1.3.0"
|
|||||||
string_cache = "0.8.0"
|
string_cache = "0.8.0"
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
pyo3-log = "0.4.0"
|
pyo3-log = "0.6.0"
|
||||||
log-panics = "2.0.0"
|
log-panics = "2.0.0"
|
||||||
|
|
||||||
[dependencies.state-map]
|
[dependencies.state-map]
|
||||||
@@ -30,11 +28,19 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
|
|||||||
[lib]
|
[lib]
|
||||||
crate-type = ["cdylib", "rlib"]
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
|
||||||
|
[dependencies.clap]
|
||||||
|
version = "3.1.14"
|
||||||
|
features = ["cargo"]
|
||||||
|
|
||||||
[dependencies.pyo3]
|
[dependencies.pyo3]
|
||||||
version = "0.14.1"
|
version = "0.16.4"
|
||||||
features = ["extension-module","abi3-py36"]
|
features = ["extension-module"]
|
||||||
|
|
||||||
|
[dependencies.tikv-jemallocator]
|
||||||
|
version = "0.5.0"
|
||||||
|
optional = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["jemalloc"]
|
default = ["jemalloc"]
|
||||||
jemalloc = []
|
jemalloc = ["tikv-jemallocator"]
|
||||||
no-progress-bars = []
|
no-progress-bars = []
|
||||||
|
|||||||
22
Dockerfile
Normal file
22
Dockerfile
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
FROM rust:alpine AS builder
|
||||||
|
|
||||||
|
RUN apk add python3 musl-dev pkgconfig openssl-dev make
|
||||||
|
|
||||||
|
ENV RUSTFLAGS="-C target-feature=-crt-static"
|
||||||
|
|
||||||
|
WORKDIR /opt/synapse-compressor/
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
RUN cargo build
|
||||||
|
|
||||||
|
WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
|
||||||
|
|
||||||
|
RUN cargo build
|
||||||
|
|
||||||
|
FROM alpine
|
||||||
|
|
||||||
|
RUN apk add --no-cache libgcc
|
||||||
|
|
||||||
|
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_compress_state /usr/local/bin/synapse_compress_state
|
||||||
|
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_auto_compressor /usr/local/bin/synapse_auto_compressor
|
||||||
81
README.md
81
README.md
@@ -3,7 +3,7 @@
|
|||||||
This workspace contains experimental tools that attempt to reduce the number of
|
This workspace contains experimental tools that attempt to reduce the number of
|
||||||
rows in the `state_groups_state` table inside of a Synapse Postgresql database.
|
rows in the `state_groups_state` table inside of a Synapse Postgresql database.
|
||||||
|
|
||||||
# Automated tool: auto_compressor
|
# Automated tool: synapse_auto_compressor
|
||||||
|
|
||||||
## Introduction:
|
## Introduction:
|
||||||
|
|
||||||
@@ -11,7 +11,7 @@ This tool is significantly more simple to use than the manual tool (described be
|
|||||||
It scans through all of the rows in the `state_groups` database table from the start. When
|
It scans through all of the rows in the `state_groups` database table from the start. When
|
||||||
it finds a group that hasn't been compressed, it runs the compressor for a while on that
|
it finds a group that hasn't been compressed, it runs the compressor for a while on that
|
||||||
group's room, saving where it got up to. After compressing a number of these chunks it stops,
|
group's room, saving where it got up to. After compressing a number of these chunks it stops,
|
||||||
saving where it got up to for the next run of the `auto_compressor`.
|
saving where it got up to for the next run of the `synapse_auto_compressor`.
|
||||||
|
|
||||||
It creates three extra tables in the database: `state_compressor_state` which stores the
|
It creates three extra tables in the database: `state_compressor_state` which stores the
|
||||||
information needed to stop and start the compressor for each room, `state_compressor_progress`
|
information needed to stop and start the compressor for each room, `state_compressor_progress`
|
||||||
@@ -21,41 +21,42 @@ which stores how far through the `state_groups` table the compressor has scanned
|
|||||||
The tool can be run manually when you are running out of space, or be scheduled to run
|
The tool can be run manually when you are running out of space, or be scheduled to run
|
||||||
periodically.
|
periodically.
|
||||||
|
|
||||||
## Building
|
## Building
|
||||||
|
|
||||||
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
||||||
for instructions on how to do this.
|
for instructions on how to do this.
|
||||||
|
|
||||||
To build `auto_compressor`, clone this repository and navigate to the `autocompressor/`
|
To build `synapse_auto_compressor`, clone this repository and navigate to the
|
||||||
subdirectory. Then execute `cargo build`.
|
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.
|
||||||
|
|
||||||
This will create an executable and store it in `auto_compressor/target/debug/auto_compressor`.
|
This will create an executable and store it in
|
||||||
|
`synapse_auto_compressor/target/debug/synapse_auto_compressor`.
|
||||||
|
|
||||||
## Example usage
|
## Example usage
|
||||||
```
|
```
|
||||||
$ auto_compressor -p postgresql://user:pass@localhost/synapse -c 500 -n 100
|
$ synapse_auto_compressor -p postgresql://user:pass@localhost/synapse -c 500 -n 100
|
||||||
```
|
```
|
||||||
## Running Options
|
## Running Options
|
||||||
|
|
||||||
- -p [POSTGRES_LOCATION] **Required**
|
- -p [POSTGRES_LOCATION] **Required**
|
||||||
The configuration for connecting to the Postgres database. This should be of the form
|
The configuration for connecting to the Postgres database. This should be of the form
|
||||||
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
|
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
|
||||||
string: `"user=username password=password dbname=database host=mydomain.com"`
|
string: `"user=username password=password dbname=database host=mydomain.com"`
|
||||||
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
|
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
|
||||||
for the full details.
|
for the full details.
|
||||||
|
|
||||||
- -c [CHUNK_SIZE] **Required**
|
- -c [CHUNK_SIZE] **Required**
|
||||||
The number of state groups to work on at once. All of the entries from state_groups_state are
|
The number of state groups to work on at once. All of the entries from state_groups_state are
|
||||||
requested from the database for state groups that are worked on. Therefore small chunk
|
requested from the database for state groups that are worked on. Therefore small chunk
|
||||||
sizes may be needed on machines with low memory. Note: if the compressor fails to find
|
sizes may be needed on machines with low memory. Note: if the compressor fails to find
|
||||||
space savings on the chunk as a whole (which may well happen in rooms with lots of backfill
|
space savings on the chunk as a whole (which may well happen in rooms with lots of backfill
|
||||||
in) then the entire chunk is skipped.
|
in) then the entire chunk is skipped.
|
||||||
|
|
||||||
- -n [CHUNKS_TO_COMPRESS] **Required**
|
- -n [CHUNKS_TO_COMPRESS] **Required**
|
||||||
*CHUNKS_TO_COMPRESS* chunks of size *CHUNK_SIZE* will be compressed. The higher this
|
*CHUNKS_TO_COMPRESS* chunks of size *CHUNK_SIZE* will be compressed. The higher this
|
||||||
number is set to, the longer the compressor will run for.
|
number is set to, the longer the compressor will run for.
|
||||||
|
|
||||||
- -d [LEVELS]
|
- -d [LEVELS]
|
||||||
Sizes of each new level in the compression algorithm, as a comma-separated list.
|
Sizes of each new level in the compression algorithm, as a comma-separated list.
|
||||||
The first entry in the list is for the lowest, most granular level, with each
|
The first entry in the list is for the lowest, most granular level, with each
|
||||||
subsequent entry being for the next highest level. The number of entries in the
|
subsequent entry being for the next highest level. The number of entries in the
|
||||||
@@ -67,14 +68,14 @@ given set of state. [defaults to "100,50,25"]
|
|||||||
## Scheduling the compressor
|
## Scheduling the compressor
|
||||||
The automatic tool may put some strain on the database, so it might be best to schedule
|
The automatic tool may put some strain on the database, so it might be best to schedule
|
||||||
it to run at a quiet time for the server. This could be done by creating an executable
|
it to run at a quiet time for the server. This could be done by creating an executable
|
||||||
script and scheduling it with something like
|
script and scheduling it with something like
|
||||||
[cron](https://www.man7.org/linux/man-pages/man1/crontab.1.html).
|
[cron](https://www.man7.org/linux/man-pages/man1/crontab.1.html).
|
||||||
|
|
||||||
# Manual tool: synapse_compress_state
|
# Manual tool: synapse_compress_state
|
||||||
|
|
||||||
## Introduction
|
## Introduction
|
||||||
|
|
||||||
A manual tool that reads in the rows from `state_groups_state` and `state_group_edges`
|
A manual tool that reads in the rows from `state_groups_state` and `state_group_edges`
|
||||||
tables for a specified room and calculates the changes that could be made that
|
tables for a specified room and calculates the changes that could be made that
|
||||||
(hopefully) will significantly reduce the number of rows.
|
(hopefully) will significantly reduce the number of rows.
|
||||||
|
|
||||||
@@ -85,7 +86,7 @@ that if `-t` is given then each change to a particular state group is wrapped
|
|||||||
in a transaction). If you do wish to send the changes to the database automatically
|
in a transaction). If you do wish to send the changes to the database automatically
|
||||||
then the `-c` flag can be set.
|
then the `-c` flag can be set.
|
||||||
|
|
||||||
The SQL generated is safe to apply against the database with Synapse running.
|
The SQL generated is safe to apply against the database with Synapse running.
|
||||||
This is because the `state_groups` and `state_groups_state` tables are append-only:
|
This is because the `state_groups` and `state_groups_state` tables are append-only:
|
||||||
once written to the database, they are never modified. There is therefore no danger
|
once written to the database, they are never modified. There is therefore no danger
|
||||||
of a modification racing against a running Synapse. Further, this script makes its
|
of a modification racing against a running Synapse. Further, this script makes its
|
||||||
@@ -95,7 +96,7 @@ from any of the queries that Synapse performs.
|
|||||||
The tool will also ensure that the generated state deltas do give the same state
|
The tool will also ensure that the generated state deltas do give the same state
|
||||||
as the existing state deltas before generating any SQL.
|
as the existing state deltas before generating any SQL.
|
||||||
|
|
||||||
## Building
|
## Building
|
||||||
|
|
||||||
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
||||||
for instructions on how to do this.
|
for instructions on how to do this.
|
||||||
@@ -125,54 +126,54 @@ $ psql synapse < out.data
|
|||||||
|
|
||||||
## Running Options
|
## Running Options
|
||||||
|
|
||||||
- -p [POSTGRES_LOCATION] **Required**
|
- -p [POSTGRES_LOCATION] **Required**
|
||||||
The configuration for connecting to the Postgres database. This should be of the form
|
The configuration for connecting to the Postgres database. This should be of the form
|
||||||
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
|
`"postgresql://username:password@mydomain.com/database"` or a key-value pair
|
||||||
string: `"user=username password=password dbname=database host=mydomain.com"`
|
string: `"user=username password=password dbname=database host=mydomain.com"`
|
||||||
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
|
See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html
|
||||||
for the full details.
|
for the full details.
|
||||||
|
|
||||||
- -r [ROOM_ID] **Required**
|
- -r [ROOM_ID] **Required**
|
||||||
The room to process (this is the value found in the `rooms` table of the database
|
The room to process (this is the value found in the `rooms` table of the database
|
||||||
not the common name for the room - it should look like: "!wOlkWNmgkAZFxbTaqj:matrix.org".
|
not the common name for the room - it should look like: "!wOlkWNmgkAZFxbTaqj:matrix.org".
|
||||||
|
|
||||||
- -b [MIN_STATE_GROUP]
|
- -b [MIN_STATE_GROUP]
|
||||||
The state group to start processing from (non-inclusive).
|
The state group to start processing from (non-inclusive).
|
||||||
|
|
||||||
- -n [GROUPS_TO_COMPRESS]
|
- -n [GROUPS_TO_COMPRESS]
|
||||||
How many groups to load into memory to compress (starting
|
How many groups to load into memory to compress (starting
|
||||||
from the 1st group in the room or the group specified by -b).
|
from the 1st group in the room or the group specified by -b).
|
||||||
|
|
||||||
- -l [LEVELS]
|
- -l [LEVELS]
|
||||||
Sizes of each new level in the compression algorithm, as a comma-separated list.
|
Sizes of each new level in the compression algorithm, as a comma-separated list.
|
||||||
The first entry in the list is for the lowest, most granular level, with each
|
The first entry in the list is for the lowest, most granular level, with each
|
||||||
subsequent entry being for the next highest level. The number of entries in the
|
subsequent entry being for the next highest level. The number of entries in the
|
||||||
list determines the number of levels that will be used. The sum of the sizes of
|
list determines the number of levels that will be used. The sum of the sizes of
|
||||||
the levels affects the performance of fetching the state from the database, as the
|
the levels affects the performance of fetching the state from the database, as the
|
||||||
sum of the sizes is the upper bound on the number of iterations needed to fetch a
|
sum of the sizes is the upper bound on the number of iterations needed to fetch a
|
||||||
given set of state. [defaults to "100,50,25"]
|
given set of state. [defaults to "100,50,25"]
|
||||||
|
|
||||||
- -m [COUNT]
|
- -m [COUNT]
|
||||||
If the compressor cannot save this many rows from the database then it will stop early.
|
If the compressor cannot save this many rows from the database then it will stop early.
|
||||||
|
|
||||||
- -s [MAX_STATE_GROUP]
|
- -s [MAX_STATE_GROUP]
|
||||||
If a max_state_group is specified then only state groups with id's lower than this
|
If a max_state_group is specified then only state groups with id's lower than this
|
||||||
number can be compressed.
|
number can be compressed.
|
||||||
|
|
||||||
- -o [FILE]
|
- -o [FILE]
|
||||||
File to output the SQL transactions to (for later running on the database).
|
File to output the SQL transactions to (for later running on the database).
|
||||||
|
|
||||||
- -t
|
- -t
|
||||||
If this flag is set then each change to a particular state group is wrapped in a
|
If this flag is set then each change to a particular state group is wrapped in a
|
||||||
transaction. This should be done if you wish to apply the changes while synapse is
|
transaction. This should be done if you wish to apply the changes while synapse is
|
||||||
still running.
|
still running.
|
||||||
|
|
||||||
- -c
|
- -c
|
||||||
If this flag is set then the changes the compressor makes will be committed to the
|
If this flag is set then the changes the compressor makes will be committed to the
|
||||||
database. This should be safe to use while synapse is running as it wraps the changes
|
database. This should be safe to use while synapse is running as it wraps the changes
|
||||||
to every state group in it's own transaction (as if the transaction flag was set).
|
to every state group in it's own transaction (as if the transaction flag was set).
|
||||||
|
|
||||||
- -g
|
- -g
|
||||||
If this flag is set then output the node and edge information for the state_group
|
If this flag is set then output the node and edge information for the state_group
|
||||||
directed graph built up from the predecessor state_group links. These can be looked
|
directed graph built up from the predecessor state_group links. These can be looked
|
||||||
at in something like Gephi (https://gephi.org).
|
at in something like Gephi (https://gephi.org).
|
||||||
@@ -196,10 +197,10 @@ $ docker-compose down
|
|||||||
# Using the synapse_compress_state library
|
# Using the synapse_compress_state library
|
||||||
|
|
||||||
If you want to use the compressor in another project, it is recomended that you
|
If you want to use the compressor in another project, it is recomended that you
|
||||||
use jemalloc `https://github.com/gnzlbg/jemallocator`.
|
use jemalloc `https://github.com/tikv/jemallocator`.
|
||||||
|
|
||||||
To prevent the progress bars from being shown, use the `no-progress-bars` feature.
|
To prevent the progress bars from being shown, use the `no-progress-bars` feature.
|
||||||
(See `auto_compressor/Cargo.toml` for an example)
|
(See `synapse_auto_compressor/Cargo.toml` for an example)
|
||||||
|
|
||||||
# Troubleshooting
|
# Troubleshooting
|
||||||
|
|
||||||
@@ -216,29 +217,29 @@ from the machine where Postgres is running, the url will be the following:
|
|||||||
### From remote machine
|
### From remote machine
|
||||||
|
|
||||||
If you wish to connect from a different machine, you'll need to edit your Postgres settings to allow
|
If you wish to connect from a different machine, you'll need to edit your Postgres settings to allow
|
||||||
remote connections. This requires updating the
|
remote connections. This requires updating the
|
||||||
[`pg_hba.conf`](https://www.postgresql.org/docs/current/auth-pg-hba-conf.html) and the `listen_addresses`
|
[`pg_hba.conf`](https://www.postgresql.org/docs/current/auth-pg-hba-conf.html) and the `listen_addresses`
|
||||||
setting in [`postgresql.conf`](https://www.postgresql.org/docs/current/runtime-config-connection.html)
|
setting in [`postgresql.conf`](https://www.postgresql.org/docs/current/runtime-config-connection.html)
|
||||||
|
|
||||||
## Printing debugging logs
|
## Printing debugging logs
|
||||||
|
|
||||||
The amount of output the tools produce can be altered by setting the RUST_LOG
|
The amount of output the tools produce can be altered by setting the RUST_LOG
|
||||||
environment variable to something.
|
environment variable to something.
|
||||||
|
|
||||||
To get more logs when running the auto_compressor tool try the following:
|
To get more logs when running the synapse_auto_compressor tool try the following:
|
||||||
|
|
||||||
```
|
```
|
||||||
$ RUST_LOG=debug auto_compressor -p postgresql://user:pass@localhost/synapse -c 50 -n 100
|
$ RUST_LOG=debug synapse_auto_compressor -p postgresql://user:pass@localhost/synapse -c 50 -n 100
|
||||||
```
|
```
|
||||||
|
|
||||||
If you want to suppress all the debugging info you are getting from the
|
If you want to suppress all the debugging info you are getting from the
|
||||||
Postgres client then try:
|
Postgres client then try:
|
||||||
|
|
||||||
```
|
```
|
||||||
RUST_LOG=auto_compressor=debug,synapse_compress_state=debug auto_compressor [etc.]
|
RUST_LOG=synapse_auto_compressor=debug,synapse_compress_state=debug synapse_auto_compressor [etc.]
|
||||||
```
|
```
|
||||||
|
|
||||||
This will only print the debugging information from those two packages. For more info see
|
This will only print the debugging information from those two packages. For more info see
|
||||||
https://docs.rs/env_logger/0.9.0/env_logger/.
|
https://docs.rs/env_logger/0.9.0/env_logger/.
|
||||||
|
|
||||||
## Building difficulties
|
## Building difficulties
|
||||||
@@ -248,7 +249,7 @@ and building on Linux will also require `pkg-config`
|
|||||||
|
|
||||||
This can be done on Ubuntu with: `$ apt-get install libssl-dev pkg-config`
|
This can be done on Ubuntu with: `$ apt-get install libssl-dev pkg-config`
|
||||||
|
|
||||||
Note that building requires quite a lot of memory and out-of-memory errors might not be
|
Note that building requires quite a lot of memory and out-of-memory errors might not be
|
||||||
obvious. It's recomended you only build these tools on machines with at least 2GB of RAM.
|
obvious. It's recomended you only build these tools on machines with at least 2GB of RAM.
|
||||||
|
|
||||||
## Auto Compressor skips chunks when running on already compressed room
|
## Auto Compressor skips chunks when running on already compressed room
|
||||||
@@ -265,8 +266,8 @@ be a large problem.
|
|||||||
|
|
||||||
## Compressor is trying to increase the number of rows
|
## Compressor is trying to increase the number of rows
|
||||||
|
|
||||||
Backfilling can lead to issues with compression. The auto_compressor will
|
Backfilling can lead to issues with compression. The synapse_auto_compressor will
|
||||||
skip chunks it can't reduce the size of and so this should help jump over the backfilled
|
skip chunks it can't reduce the size of and so this should help jump over the backfilled
|
||||||
state_groups. Lots of state resolution might also impact the ability to use the compressor.
|
state_groups. Lots of state resolution might also impact the ability to use the compressor.
|
||||||
|
|
||||||
To examine the state_group hierarchy run the manual tool on a room with the `-g` option
|
To examine the state_group hierarchy run the manual tool on a room with the `-g` option
|
||||||
|
|||||||
@@ -1,30 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "auto_compressor"
|
|
||||||
authors = ["William Ashton"]
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2018"
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
clap = "2.33.0"
|
|
||||||
openssl = "0.10.32"
|
|
||||||
postgres = "0.19.0"
|
|
||||||
postgres-openssl = "0.5.0"
|
|
||||||
jemallocator = "0.3.2"
|
|
||||||
rand = "0.8.0"
|
|
||||||
serial_test = "0.5.1"
|
|
||||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
|
||||||
env_logger = "0.9.0"
|
|
||||||
log = "0.4.14"
|
|
||||||
log-panics = "2.0.0"
|
|
||||||
anyhow = "1.0.42"
|
|
||||||
pyo3-log = "0.4.0"
|
|
||||||
|
|
||||||
# Needed for pyo3 support
|
|
||||||
[lib]
|
|
||||||
crate-type = ["cdylib", "rlib"]
|
|
||||||
|
|
||||||
[dependencies.pyo3]
|
|
||||||
version = "0.14.1"
|
|
||||||
features = ["extension-module","abi3-py36"]
|
|
||||||
@@ -13,9 +13,9 @@ postgres = "0.19.0"
|
|||||||
postgres-openssl = "0.5.0"
|
postgres-openssl = "0.5.0"
|
||||||
rand = "0.8.0"
|
rand = "0.8.0"
|
||||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
||||||
auto_compressor = { path = "../auto_compressor/" }
|
synapse_auto_compressor = { path = "../synapse_auto_compressor/" }
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
|
|
||||||
[dependencies.state-map]
|
[dependencies.state-map]
|
||||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
|
|||||||
// the predecessor (so have split this into a different query)
|
// the predecessor (so have split this into a different query)
|
||||||
let query_pred = r#"
|
let query_pred = r#"
|
||||||
SELECT prev_state_group
|
SELECT prev_state_group
|
||||||
FROM state_group_edges
|
FROM state_group_edges
|
||||||
WHERE state_group = $1
|
WHERE state_group = $1
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
@@ -243,7 +243,7 @@ pub fn database_structure_matches_map(state_group_map: &BTreeMap<i64, StateGroup
|
|||||||
// the predecessor (so have split this into a different query)
|
// the predecessor (so have split this into a different query)
|
||||||
let query_pred = r#"
|
let query_pred = r#"
|
||||||
SELECT prev_state_group
|
SELECT prev_state_group
|
||||||
FROM state_group_edges
|
FROM state_group_edges
|
||||||
WHERE state_group = $1
|
WHERE state_group = $1
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
@@ -356,7 +356,7 @@ fn functions_are_self_consistent() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_logger() {
|
pub fn setup_logger() {
|
||||||
// setup the logger for the auto_compressor
|
// setup the logger for the synapse_auto_compressor
|
||||||
// The default can be overwritten with RUST_LOG
|
// The default can be overwritten with RUST_LOG
|
||||||
// see the README for more information
|
// see the README for more information
|
||||||
if env::var("RUST_LOG").is_err() {
|
if env::var("RUST_LOG").is_err() {
|
||||||
@@ -366,7 +366,7 @@ pub fn setup_logger() {
|
|||||||
// default to printing the debug information for both packages being tested
|
// default to printing the debug information for both packages being tested
|
||||||
// (Note that just setting the global level to debug will log every sql transaction)
|
// (Note that just setting the global level to debug will log every sql transaction)
|
||||||
log_builder.filter_module("synapse_compress_state", LevelFilter::Debug);
|
log_builder.filter_module("synapse_compress_state", LevelFilter::Debug);
|
||||||
log_builder.filter_module("auto_compressor", LevelFilter::Debug);
|
log_builder.filter_module("synapse_auto_compressor", LevelFilter::Debug);
|
||||||
// use try_init() incase the logger has been setup by some previous test
|
// use try_init() incase the logger has been setup by some previous test
|
||||||
let _ = log_builder.try_init();
|
let _ = log_builder.try_init();
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,9 +1,5 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use auto_compressor::{
|
|
||||||
manager::{compress_chunks_of_database, run_compressor_on_room_chunk},
|
|
||||||
state_saving::{connect_to_database, create_tables_if_needed},
|
|
||||||
};
|
|
||||||
use compressor_integration_tests::{
|
use compressor_integration_tests::{
|
||||||
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
|
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
|
||||||
database_structure_matches_map, empty_database,
|
database_structure_matches_map, empty_database,
|
||||||
@@ -14,6 +10,10 @@ use compressor_integration_tests::{
|
|||||||
setup_logger, DB_URL,
|
setup_logger, DB_URL,
|
||||||
};
|
};
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
use synapse_auto_compressor::{
|
||||||
|
manager::{compress_chunks_of_database, run_compressor_on_room_chunk},
|
||||||
|
state_saving::{connect_to_database, create_tables_if_needed},
|
||||||
|
};
|
||||||
use synapse_compress_state::Level;
|
use synapse_compress_state::Level;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use auto_compressor::state_saving::{
|
use compressor_integration_tests::{clear_compressor_state, setup_logger, DB_URL};
|
||||||
|
use serial_test::serial;
|
||||||
|
use synapse_auto_compressor::state_saving::{
|
||||||
connect_to_database, create_tables_if_needed, read_room_compressor_state,
|
connect_to_database, create_tables_if_needed, read_room_compressor_state,
|
||||||
write_room_compressor_state,
|
write_room_compressor_state,
|
||||||
};
|
};
|
||||||
use compressor_integration_tests::{clear_compressor_state, setup_logger, DB_URL};
|
|
||||||
use serial_test::serial;
|
|
||||||
use synapse_compress_state::Level;
|
use synapse_compress_state::Level;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -46,10 +46,11 @@ fn run_succeeds_without_crashing() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = false;
|
let commit_changes = false;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url.clone(),
|
db_url,
|
||||||
room_id.clone(),
|
room_id,
|
||||||
output_file,
|
output_file,
|
||||||
min_state_group,
|
min_state_group,
|
||||||
groups_to_compress,
|
groups_to_compress,
|
||||||
@@ -59,6 +60,7 @@ fn run_succeeds_without_crashing() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -94,6 +96,7 @@ fn changes_commited_if_no_min_saved_rows() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -107,6 +110,7 @@ fn changes_commited_if_no_min_saved_rows() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -160,6 +164,7 @@ fn changes_commited_if_min_saved_rows_exceeded() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -173,6 +178,7 @@ fn changes_commited_if_min_saved_rows_exceeded() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -227,6 +233,7 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -240,6 +247,7 @@ fn changes_not_commited_if_fewer_than_min_saved_rows() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -280,6 +288,7 @@ fn run_panics_if_invalid_db_url() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -293,6 +302,7 @@ fn run_panics_if_invalid_db_url() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -336,6 +346,7 @@ fn run_only_affects_given_room_id() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -349,6 +360,7 @@ fn run_only_affects_given_room_id() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -406,6 +418,7 @@ fn run_respects_groups_to_compress() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -419,6 +432,7 @@ fn run_respects_groups_to_compress() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -492,6 +506,7 @@ fn run_is_idempotent_when_run_on_whole_room() {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config1 = Config::new(
|
let config1 = Config::new(
|
||||||
db_url.clone(),
|
db_url.clone(),
|
||||||
@@ -505,21 +520,23 @@ fn run_is_idempotent_when_run_on_whole_room() {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let config2 = Config::new(
|
let config2 = Config::new(
|
||||||
db_url.clone(),
|
db_url,
|
||||||
room_id.clone(),
|
room_id,
|
||||||
output_file2,
|
output_file2,
|
||||||
min_state_group,
|
min_state_group,
|
||||||
groups_to_compress,
|
groups_to_compress,
|
||||||
min_saved_rows,
|
min_saved_rows,
|
||||||
max_state_group,
|
max_state_group,
|
||||||
level_sizes.clone(),
|
level_sizes,
|
||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ fn continue_run_called_twice_same_as_run() {
|
|||||||
|
|
||||||
let start = Some(6);
|
let start = Some(6);
|
||||||
let chunk_size = 7;
|
let chunk_size = 7;
|
||||||
let level_info = chunk_stats_1.new_level_info.clone();
|
let level_info = chunk_stats_1.new_level_info;
|
||||||
|
|
||||||
// Run the compressor with those settings
|
// Run the compressor with those settings
|
||||||
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
|
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ the compressor is run.
|
|||||||
|
|
||||||
3. Navigate to the correct location
|
3. Navigate to the correct location
|
||||||
For the automatic tool:
|
For the automatic tool:
|
||||||
`$ cd /home/synapse/rust-synapse-compress-state/auto_compressor`
|
`$ cd /home/synapse/rust-synapse-compress-state/synpase_auto_compressor`
|
||||||
For the manual tool:
|
For the manual tool:
|
||||||
`$ cd /home/synapse/rust-synapse-compress-state`
|
`$ cd /home/synapse/rust-synapse-compress-state`
|
||||||
|
|
||||||
@@ -30,9 +30,9 @@ This will install the relevant compressor tool into the activated virtual enviro
|
|||||||
## Automatic tool example:
|
## Automatic tool example:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import auto_compressor
|
import synapse_auto_compressor
|
||||||
|
|
||||||
auto_compressor.compress_state_events_table(
|
synapse_auto_compressor.compress_state_events_table(
|
||||||
db_url="postgresql://localhost/synapse",
|
db_url="postgresql://localhost/synapse",
|
||||||
chunk_size=500,
|
chunk_size=500,
|
||||||
default_levels="100,50,25",
|
default_levels="100,50,25",
|
||||||
@@ -51,4 +51,4 @@ synapse_compress_state.run_compression(
|
|||||||
output_file="out.sql",
|
output_file="out.sql",
|
||||||
transactions=True
|
transactions=True
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -181,12 +181,11 @@ impl<'a> Compressor<'a> {
|
|||||||
panic!("Can only call `create_new_tree` once");
|
panic!("Can only call `create_new_tree` once");
|
||||||
}
|
}
|
||||||
|
|
||||||
let pb: ProgressBar;
|
let pb = if cfg!(feature = "no-progress-bars") {
|
||||||
if cfg!(feature = "no-progress-bars") {
|
ProgressBar::hidden()
|
||||||
pb = ProgressBar::hidden();
|
|
||||||
} else {
|
} else {
|
||||||
pb = ProgressBar::new(self.original_state_map.len() as u64);
|
ProgressBar::new(self.original_state_map.len() as u64)
|
||||||
}
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -96,11 +96,7 @@ fn create_new_tree_does_nothing_if_already_compressed() {
|
|||||||
let pred_group = initial_edges.get(&i);
|
let pred_group = initial_edges.get(&i);
|
||||||
|
|
||||||
// Need Option<i64> not Option<&i64>
|
// Need Option<i64> not Option<&i64>
|
||||||
let prev;
|
let prev = pred_group.copied();
|
||||||
match pred_group {
|
|
||||||
Some(i) => prev = Some(*i),
|
|
||||||
None => prev = None,
|
|
||||||
}
|
|
||||||
|
|
||||||
// insert that edge into the initial map
|
// insert that edge into the initial map
|
||||||
initial.insert(
|
initial.insert(
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ fn get_head_returns_head() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn has_space_returns_true_if_empty() {
|
fn has_space_returns_true_if_empty() {
|
||||||
let l = Level::new(15);
|
let l = Level::new(15);
|
||||||
assert_eq!(l.has_space(), true);
|
assert!(l.has_space());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -65,7 +65,7 @@ fn has_space_returns_true_if_part_full() {
|
|||||||
l.update(1, true);
|
l.update(1, true);
|
||||||
l.update(143, true);
|
l.update(143, true);
|
||||||
l.update(15, true);
|
l.update(15, true);
|
||||||
assert_eq!(l.has_space(), true);
|
assert!(l.has_space());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -76,5 +76,5 @@ fn has_space_returns_false_if_full() {
|
|||||||
l.update(3, true);
|
l.update(3, true);
|
||||||
l.update(4, true);
|
l.update(4, true);
|
||||||
l.update(5, true);
|
l.update(5, true);
|
||||||
assert_eq!(l.has_space(), false);
|
assert!(!l.has_space());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,11 +145,7 @@ fn stats_correct_if_no_changes() {
|
|||||||
let pred_group = initial_edges.get(&i);
|
let pred_group = initial_edges.get(&i);
|
||||||
|
|
||||||
// Need Option<i64> not Option<&i64>
|
// Need Option<i64> not Option<&i64>
|
||||||
let prev;
|
let prev = pred_group.copied();
|
||||||
match pred_group {
|
|
||||||
Some(i) => prev = Some(*i),
|
|
||||||
None => prev = None,
|
|
||||||
}
|
|
||||||
|
|
||||||
// insert that edge into the initial map
|
// insert that edge into the initial map
|
||||||
initial.insert(
|
initial.insert(
|
||||||
|
|||||||
@@ -372,12 +372,11 @@ fn get_initial_data_from_db(
|
|||||||
// Copy the data from the database into a map
|
// Copy the data from the database into a map
|
||||||
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
|
||||||
|
|
||||||
let pb: ProgressBar;
|
let pb = if cfg!(feature = "no-progress-bars") {
|
||||||
if cfg!(feature = "no-progress-bars") {
|
ProgressBar::hidden()
|
||||||
pb = ProgressBar::hidden();
|
|
||||||
} else {
|
} else {
|
||||||
pb = ProgressBar::new_spinner();
|
ProgressBar::new_spinner()
|
||||||
}
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
|
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
|
||||||
);
|
);
|
||||||
@@ -537,12 +536,11 @@ pub fn send_changes_to_db(
|
|||||||
debug!("Writing changes...");
|
debug!("Writing changes...");
|
||||||
|
|
||||||
// setup the progress bar
|
// setup the progress bar
|
||||||
let pb: ProgressBar;
|
let pb = if cfg!(feature = "no-progress-bars") {
|
||||||
if cfg!(feature = "no-progress-bars") {
|
ProgressBar::hidden()
|
||||||
pb = ProgressBar::hidden();
|
|
||||||
} else {
|
} else {
|
||||||
pb = ProgressBar::new(old_map.len() as u64);
|
ProgressBar::new(old_map.len() as u64)
|
||||||
}
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
);
|
);
|
||||||
|
|||||||
113
src/lib.rs
113
src/lib.rs
@@ -23,11 +23,11 @@
|
|||||||
use log::{info, warn, LevelFilter};
|
use log::{info, warn, LevelFilter};
|
||||||
use pyo3::{exceptions, prelude::*};
|
use pyo3::{exceptions, prelude::*};
|
||||||
|
|
||||||
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
|
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use state_map::StateMap;
|
use state_map::StateMap;
|
||||||
use std::{collections::BTreeMap, fs::File, io::Write, str::FromStr};
|
use std::{collections::BTreeMap, convert::TryInto, fs::File, io::Write, str::FromStr};
|
||||||
use string_cache::DefaultAtom as Atom;
|
use string_cache::DefaultAtom as Atom;
|
||||||
|
|
||||||
mod compressor;
|
mod compressor;
|
||||||
@@ -109,18 +109,21 @@ pub struct Config {
|
|||||||
// Whether or not to commit changes to the database automatically
|
// Whether or not to commit changes to the database automatically
|
||||||
// N.B. currently assumes transactions is true (to be on the safe side)
|
// N.B. currently assumes transactions is true (to be on the safe side)
|
||||||
commit_changes: bool,
|
commit_changes: bool,
|
||||||
|
// Whether to verify the correctness of the compressed state groups by
|
||||||
|
// comparing them to the original groups
|
||||||
|
verify: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
/// Build up config from command line arguments
|
/// Build up config from command line arguments
|
||||||
pub fn parse_arguments() -> Config {
|
pub fn parse_arguments() -> Config {
|
||||||
let matches = App::new(crate_name!())
|
let matches = Command::new(crate_name!())
|
||||||
.version(crate_version!())
|
.version(crate_version!())
|
||||||
.author(crate_authors!("\n"))
|
.author(crate_authors!("\n"))
|
||||||
.about(crate_description!())
|
.about(crate_description!())
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("postgres-url")
|
Arg::new("postgres-url")
|
||||||
.short("p")
|
.short('p')
|
||||||
.value_name("POSTGRES_LOCATION")
|
.value_name("POSTGRES_LOCATION")
|
||||||
.help("The configruation for connecting to the postgres database.")
|
.help("The configruation for connecting to the postgres database.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -133,8 +136,8 @@ impl Config {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("room_id")
|
Arg::new("room_id")
|
||||||
.short("r")
|
.short('r')
|
||||||
.value_name("ROOM_ID")
|
.value_name("ROOM_ID")
|
||||||
.help("The room to process")
|
.help("The room to process")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -144,39 +147,39 @@ impl Config {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("min_state_group")
|
Arg::new("min_state_group")
|
||||||
.short("b")
|
.short('b')
|
||||||
.value_name("MIN_STATE_GROUP")
|
.value_name("MIN_STATE_GROUP")
|
||||||
.help("The state group to start processing from (non inclusive)")
|
.help("The state group to start processing from (non inclusive)")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("min_saved_rows")
|
Arg::new("min_saved_rows")
|
||||||
.short("m")
|
.short('m')
|
||||||
.value_name("COUNT")
|
.value_name("COUNT")
|
||||||
.help("Abort if fewer than COUNT rows would be saved")
|
.help("Abort if fewer than COUNT rows would be saved")
|
||||||
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
|
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("groups_to_compress")
|
Arg::new("groups_to_compress")
|
||||||
.short("n")
|
.short('n')
|
||||||
.value_name("GROUPS_TO_COMPRESS")
|
.value_name("GROUPS_TO_COMPRESS")
|
||||||
.help("How many groups to load into memory to compress")
|
.help("How many groups to load into memory to compress")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"How many groups to load into memory to compress (starting from",
|
"How many groups to load into memory to compress (starting from",
|
||||||
" the 1st group in the room or the group specified by -s)"))
|
" the 1st group in the room or the group specified by -s)"))
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("output_file")
|
Arg::new("output_file")
|
||||||
.short("o")
|
.short('o')
|
||||||
.value_name("FILE")
|
.value_name("FILE")
|
||||||
.help("File to output the changes to in SQL")
|
.help("File to output the changes to in SQL")
|
||||||
.takes_value(true),
|
.takes_value(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("max_state_group")
|
Arg::new("max_state_group")
|
||||||
.short("s")
|
.short('s')
|
||||||
.value_name("MAX_STATE_GROUP")
|
.value_name("MAX_STATE_GROUP")
|
||||||
.help("The maximum state group to process up to")
|
.help("The maximum state group to process up to")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -185,8 +188,8 @@ impl Config {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("level_sizes")
|
Arg::new("level_sizes")
|
||||||
.short("l")
|
.short('l')
|
||||||
.value_name("LEVELS")
|
.value_name("LEVELS")
|
||||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -202,27 +205,34 @@ impl Config {
|
|||||||
.default_value("100,50,25")
|
.default_value("100,50,25")
|
||||||
.takes_value(true),
|
.takes_value(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("transactions")
|
Arg::new("transactions")
|
||||||
.short("t")
|
.short('t')
|
||||||
.help("Whether to wrap each state group change in a transaction")
|
.help("Whether to wrap each state group change in a transaction")
|
||||||
.long_help(concat!("If this flag is set then then each change to a particular",
|
.long_help(concat!("If this flag is set then then each change to a particular",
|
||||||
" state group is wrapped in a transaction. This should be done if you wish to",
|
" state group is wrapped in a transaction. This should be done if you wish to",
|
||||||
" apply the changes while synapse is still running."))
|
" apply the changes while synapse is still running."))
|
||||||
.requires("output_file"),
|
.requires("output_file"),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("graphs")
|
Arg::new("graphs")
|
||||||
.short("g")
|
.short('g')
|
||||||
.help("Output before and after graphs")
|
.help("Output before and after graphs")
|
||||||
.long_help(concat!("If this flag is set then output the node and edge information for",
|
.long_help(concat!("If this flag is set then output the node and edge information for",
|
||||||
" the state_group directed graph built up from the predecessor state_group links.",
|
" the state_group directed graph built up from the predecessor state_group links.",
|
||||||
" These can be looked at in something like Gephi (https://gephi.org)")),
|
" These can be looked at in something like Gephi (https://gephi.org)")),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("commit_changes")
|
Arg::new("commit_changes")
|
||||||
.short("c")
|
.short('c')
|
||||||
.help("Commit changes to the database")
|
.help("Commit changes to the database")
|
||||||
.long_help(concat!("If this flag is set then the changes the compressor makes will",
|
.long_help(concat!("If this flag is set then the changes the compressor makes will",
|
||||||
" be committed to the database. This should be safe to use while synapse is running",
|
" be committed to the database. This should be safe to use while synapse is running",
|
||||||
" as it assumes by default that the transactions flag is set")),
|
" as it assumes by default that the transactions flag is set")),
|
||||||
|
).arg(
|
||||||
|
Arg::new("no_verify")
|
||||||
|
.short('N')
|
||||||
|
.help("Do not double-check that the compression was performed correctly")
|
||||||
|
.long_help(concat!("If this flag is set then the verification of the compressed",
|
||||||
|
" state groups, which compares them to the original groups, is skipped. This",
|
||||||
|
" saves time at the cost of potentially generating mismatched state.")),
|
||||||
).get_matches();
|
).get_matches();
|
||||||
|
|
||||||
let db_url = matches
|
let db_url = matches
|
||||||
@@ -253,7 +263,8 @@ impl Config {
|
|||||||
.value_of("max_state_group")
|
.value_of("max_state_group")
|
||||||
.map(|s| s.parse().expect("max_state_group must be an integer"));
|
.map(|s| s.parse().expect("max_state_group must be an integer"));
|
||||||
|
|
||||||
let level_sizes = value_t!(matches, "level_sizes", LevelSizes)
|
let level_sizes = matches
|
||||||
|
.value_of_t::<LevelSizes>("level_sizes")
|
||||||
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
|
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
|
||||||
|
|
||||||
let transactions = matches.is_present("transactions");
|
let transactions = matches.is_present("transactions");
|
||||||
@@ -262,6 +273,8 @@ impl Config {
|
|||||||
|
|
||||||
let commit_changes = matches.is_present("commit_changes");
|
let commit_changes = matches.is_present("commit_changes");
|
||||||
|
|
||||||
|
let verify = !matches.is_present("no_verify");
|
||||||
|
|
||||||
Config {
|
Config {
|
||||||
db_url: String::from(db_url),
|
db_url: String::from(db_url),
|
||||||
output_file,
|
output_file,
|
||||||
@@ -274,6 +287,7 @@ impl Config {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -362,8 +376,8 @@ pub fn run(mut config: Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(min) = config.min_saved_rows {
|
if let Some(min) = config.min_saved_rows {
|
||||||
let saving = (original_summed_size - compressed_summed_size) as i32;
|
let saving = original_summed_size.saturating_sub(compressed_summed_size);
|
||||||
if saving < min {
|
if saving < min.try_into().unwrap_or(0) {
|
||||||
warn!(
|
warn!(
|
||||||
"Only {} rows would be saved by this compression. Skipping output.",
|
"Only {} rows would be saved by this compression. Skipping output.",
|
||||||
saving
|
saving
|
||||||
@@ -372,7 +386,9 @@ pub fn run(mut config: Config) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
check_that_maps_match(&state_group_map, new_state_group_map);
|
if config.verify {
|
||||||
|
check_that_maps_match(&state_group_map, new_state_group_map);
|
||||||
|
}
|
||||||
|
|
||||||
// If we are given an output file, we output the changes as SQL. If the
|
// If we are given an output file, we output the changes as SQL. If the
|
||||||
// `transactions` argument is set we wrap each change to a state group in a
|
// `transactions` argument is set we wrap each change to a state group in a
|
||||||
@@ -492,12 +508,11 @@ fn output_sql(
|
|||||||
|
|
||||||
info!("Writing changes...");
|
info!("Writing changes...");
|
||||||
|
|
||||||
let pb: ProgressBar;
|
let pb = if cfg!(feature = "no-progress-bars") {
|
||||||
if cfg!(feature = "no-progress-bars") {
|
ProgressBar::hidden()
|
||||||
pb = ProgressBar::hidden();
|
|
||||||
} else {
|
} else {
|
||||||
pb = ProgressBar::new(old_map.len() as u64);
|
ProgressBar::new(old_map.len() as u64)
|
||||||
}
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
);
|
);
|
||||||
@@ -607,12 +622,11 @@ fn check_that_maps_match(
|
|||||||
) {
|
) {
|
||||||
info!("Checking that state maps match...");
|
info!("Checking that state maps match...");
|
||||||
|
|
||||||
let pb: ProgressBar;
|
let pb = if cfg!(feature = "no-progress-bars") {
|
||||||
if cfg!(feature = "no-progress-bars") {
|
ProgressBar::hidden()
|
||||||
pb = ProgressBar::hidden();
|
|
||||||
} else {
|
} else {
|
||||||
pb = ProgressBar::new(old_map.len() as u64);
|
ProgressBar::new(old_map.len() as u64)
|
||||||
}
|
};
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||||
);
|
);
|
||||||
@@ -695,6 +709,7 @@ impl Config {
|
|||||||
transactions: bool,
|
transactions: bool,
|
||||||
graphs: bool,
|
graphs: bool,
|
||||||
commit_changes: bool,
|
commit_changes: bool,
|
||||||
|
verify: bool,
|
||||||
) -> Result<Config, String> {
|
) -> Result<Config, String> {
|
||||||
let mut output: Option<File> = None;
|
let mut output: Option<File> = None;
|
||||||
if let Some(file) = output_file {
|
if let Some(file) = output_file {
|
||||||
@@ -722,6 +737,7 @@ impl Config {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -746,6 +762,7 @@ impl Config {
|
|||||||
transactions = true,
|
transactions = true,
|
||||||
graphs = false,
|
graphs = false,
|
||||||
commit_changes = false,
|
commit_changes = false,
|
||||||
|
verify = true,
|
||||||
)]
|
)]
|
||||||
fn run_compression(
|
fn run_compression(
|
||||||
db_url: String,
|
db_url: String,
|
||||||
@@ -759,6 +776,7 @@ fn run_compression(
|
|||||||
transactions: bool,
|
transactions: bool,
|
||||||
graphs: bool,
|
graphs: bool,
|
||||||
commit_changes: bool,
|
commit_changes: bool,
|
||||||
|
verify: bool,
|
||||||
) -> PyResult<()> {
|
) -> PyResult<()> {
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url,
|
db_url,
|
||||||
@@ -772,6 +790,7 @@ fn run_compression(
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
);
|
);
|
||||||
match config {
|
match config {
|
||||||
Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)),
|
Err(e) => Err(PyErr::new::<exceptions::PyException, _>(e)),
|
||||||
@@ -788,7 +807,7 @@ fn synapse_compress_state(_py: Python, m: &PyModule) -> PyResult<()> {
|
|||||||
let _ = pyo3_log::Logger::default()
|
let _ = pyo3_log::Logger::default()
|
||||||
// don't send out anything lower than a warning from other crates
|
// don't send out anything lower than a warning from other crates
|
||||||
.filter(LevelFilter::Warn)
|
.filter(LevelFilter::Warn)
|
||||||
// don't log warnings from synapse_compress_state, the auto_compressor handles these
|
// don't log warnings from synapse_compress_state, the synapse_auto_compressor handles these
|
||||||
// situations and provides better log messages
|
// situations and provides better log messages
|
||||||
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Debug)
|
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Debug)
|
||||||
.install();
|
.install();
|
||||||
@@ -955,7 +974,6 @@ mod lib_tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn check_that_maps_match_returns_if_both_empty() {
|
fn check_that_maps_match_returns_if_both_empty() {
|
||||||
check_that_maps_match(&BTreeMap::new(), &BTreeMap::new());
|
check_that_maps_match(&BTreeMap::new(), &BTreeMap::new());
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -988,7 +1006,6 @@ mod lib_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
check_that_maps_match(&old_map, &BTreeMap::new());
|
check_that_maps_match(&old_map, &BTreeMap::new());
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1024,7 +1041,6 @@ mod lib_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
check_that_maps_match(&BTreeMap::new(), &new_map);
|
check_that_maps_match(&BTreeMap::new(), &new_map);
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1056,7 +1072,6 @@ mod lib_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
check_that_maps_match(&BTreeMap::new(), &old_map.clone());
|
check_that_maps_match(&BTreeMap::new(), &old_map.clone());
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1119,7 +1134,6 @@ mod lib_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
check_that_maps_match(&old_map, &new_map);
|
check_that_maps_match(&old_map, &new_map);
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1201,7 +1215,6 @@ mod lib_tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
check_that_maps_match(&old_map, &new_map);
|
check_that_maps_match(&old_map, &new_map);
|
||||||
assert!(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: tests for correct SQL code produced by output_sql
|
//TODO: tests for correct SQL code produced by output_sql
|
||||||
@@ -1224,6 +1237,7 @@ mod pyo3_tests {
|
|||||||
let transactions = false;
|
let transactions = false;
|
||||||
let graphs = false;
|
let graphs = false;
|
||||||
let commit_changes = false;
|
let commit_changes = false;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url.clone(),
|
db_url.clone(),
|
||||||
@@ -1237,6 +1251,7 @@ mod pyo3_tests {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@@ -1270,6 +1285,7 @@ mod pyo3_tests {
|
|||||||
let transactions = true;
|
let transactions = true;
|
||||||
let graphs = true;
|
let graphs = true;
|
||||||
let commit_changes = true;
|
let commit_changes = true;
|
||||||
|
let verify = true;
|
||||||
|
|
||||||
let config = Config::new(
|
let config = Config::new(
|
||||||
db_url.clone(),
|
db_url.clone(),
|
||||||
@@ -1283,11 +1299,12 @@ mod pyo3_tests {
|
|||||||
transactions,
|
transactions,
|
||||||
graphs,
|
graphs,
|
||||||
commit_changes,
|
commit_changes,
|
||||||
|
verify,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(config.db_url, db_url);
|
assert_eq!(config.db_url, db_url);
|
||||||
assert!(!config.output_file.is_none());
|
assert!(config.output_file.is_some());
|
||||||
assert_eq!(config.room_id, room_id);
|
assert_eq!(config.room_id, room_id);
|
||||||
assert_eq!(config.min_state_group, Some(3225));
|
assert_eq!(config.min_state_group, Some(3225));
|
||||||
assert_eq!(config.groups_to_compress, Some(970));
|
assert_eq!(config.groups_to_compress, Some(970));
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
#[cfg(feature = "jemalloc")]
|
#[cfg(feature = "jemalloc")]
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||||
|
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use std::env;
|
use std::env;
|
||||||
|
|||||||
39
synapse_auto_compressor/Cargo.toml
Normal file
39
synapse_auto_compressor/Cargo.toml
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
[package]
|
||||||
|
name = "synapse_auto_compressor"
|
||||||
|
authors = ["William Ashton"]
|
||||||
|
version = "0.1.3"
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[package.metadata.maturin]
|
||||||
|
requires-python = ">=3.7"
|
||||||
|
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
|
||||||
|
classifier = [
|
||||||
|
"Development Status :: 4 - Beta",
|
||||||
|
"Programming Language :: Rust",
|
||||||
|
]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
openssl = "0.10.32"
|
||||||
|
postgres = "0.19.0"
|
||||||
|
postgres-openssl = "0.5.0"
|
||||||
|
tikv-jemallocator = "0.5.0"
|
||||||
|
rand = "0.8.0"
|
||||||
|
serial_test = "0.5.1"
|
||||||
|
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
||||||
|
env_logger = "0.9.0"
|
||||||
|
log = "0.4.14"
|
||||||
|
log-panics = "2.0.0"
|
||||||
|
anyhow = "1.0.42"
|
||||||
|
pyo3-log = "0.6.0"
|
||||||
|
|
||||||
|
# Needed for pyo3 support
|
||||||
|
[lib]
|
||||||
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
|
||||||
|
[dependencies.clap]
|
||||||
|
version = "3.1.14"
|
||||||
|
features = ["cargo"]
|
||||||
|
|
||||||
|
[dependencies.pyo3]
|
||||||
|
version = "0.16.4"
|
||||||
|
features = ["extension-module"]
|
||||||
12
synapse_auto_compressor/README.md
Normal file
12
synapse_auto_compressor/README.md
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
# Auto Compressor
|
||||||
|
|
||||||
|
See the top level readme for information.
|
||||||
|
|
||||||
|
|
||||||
|
## Publishing to PyPI
|
||||||
|
|
||||||
|
Bump the version number and run from the root directory of the repo:
|
||||||
|
|
||||||
|
```
|
||||||
|
docker run -it --rm -v $(pwd):/io -e OPENSSL_STATIC=1 konstin2/maturin publish -m synapse_auto_compressor/Cargo.toml --cargo-extra-args "\--features='openssl/vendored'"
|
||||||
|
```
|
||||||
@@ -57,15 +57,16 @@ impl FromStr for LevelInfo {
|
|||||||
|
|
||||||
// PyO3 INTERFACE STARTS HERE
|
// PyO3 INTERFACE STARTS HERE
|
||||||
#[pymodule]
|
#[pymodule]
|
||||||
fn auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
||||||
let _ = pyo3_log::Logger::default()
|
let _ = pyo3_log::Logger::default()
|
||||||
// don't send out anything lower than a warning from other crates
|
// don't send out anything lower than a warning from other crates
|
||||||
.filter(LevelFilter::Warn)
|
.filter(LevelFilter::Warn)
|
||||||
// don't log warnings from synapse_compress_state, the auto_compressor handles these
|
// don't log warnings from synapse_compress_state, the
|
||||||
// situations and provides better log messages
|
// synapse_auto_compressor handles these situations and provides better
|
||||||
|
// log messages
|
||||||
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Error)
|
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Error)
|
||||||
// log info and above for the auto_compressor
|
// log info and above for the synapse_auto_compressor
|
||||||
.filter_target("auto_compressor".to_owned(), LevelFilter::Debug)
|
.filter_target("synapse_auto_compressor".to_owned(), LevelFilter::Debug)
|
||||||
.install();
|
.install();
|
||||||
// ensure any panics produce error messages in the log
|
// ensure any panics produce error messages in the log
|
||||||
log_panics::init();
|
log_panics::init();
|
||||||
@@ -92,7 +93,7 @@ fn auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
|||||||
number_of_chunks: i64,
|
number_of_chunks: i64,
|
||||||
) -> PyResult<()> {
|
) -> PyResult<()> {
|
||||||
// Announce the start of the program to the logs
|
// Announce the start of the program to the logs
|
||||||
log::info!("auto_compressor started");
|
log::info!("synapse_auto_compressor started");
|
||||||
|
|
||||||
// Parse the default_level string into a LevelInfo struct
|
// Parse the default_level string into a LevelInfo struct
|
||||||
let default_levels: LevelInfo = match default_levels.parse() {
|
let default_levels: LevelInfo = match default_levels.parse() {
|
||||||
@@ -120,7 +121,7 @@ fn auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
|||||||
return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e)));
|
return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e)));
|
||||||
}
|
}
|
||||||
|
|
||||||
log::info!("auto_compressor finished");
|
log::info!("synapse_auto_compressor finished");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -17,53 +17,46 @@
|
|||||||
//! continue from where it left off.
|
//! continue from where it left off.
|
||||||
|
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||||
|
|
||||||
use auto_compressor::{manager, state_saving, LevelInfo};
|
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
|
||||||
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
|
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use std::{env, fs::OpenOptions};
|
use std::env;
|
||||||
|
use synapse_auto_compressor::{manager, state_saving, LevelInfo};
|
||||||
|
|
||||||
/// Execution starts here
|
/// Execution starts here
|
||||||
fn main() {
|
fn main() {
|
||||||
// setup the logger for the auto_compressor
|
// setup the logger for the synapse_auto_compressor
|
||||||
// The default can be overwritten with RUST_LOG
|
// The default can be overwritten with RUST_LOG
|
||||||
// see the README for more information
|
// see the README for more information
|
||||||
let log_file = OpenOptions::new()
|
|
||||||
.append(true)
|
|
||||||
.create(true)
|
|
||||||
.open("auto_compressor.log")
|
|
||||||
.unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e));
|
|
||||||
|
|
||||||
if env::var("RUST_LOG").is_err() {
|
if env::var("RUST_LOG").is_err() {
|
||||||
let mut log_builder = env_logger::builder();
|
let mut log_builder = env_logger::builder();
|
||||||
// Ensure panics still come through
|
// Ensure panics still come through
|
||||||
log_builder.filter_module("panic", LevelFilter::Error);
|
log_builder.filter_module("panic", LevelFilter::Error);
|
||||||
// Only output errors from the synapse_compress state library
|
// Only output errors from the synapse_compress state library
|
||||||
log_builder.filter_module("synapse_compress_state", LevelFilter::Error);
|
log_builder.filter_module("synapse_compress_state", LevelFilter::Error);
|
||||||
// Output log levels info and above from auto_compressor
|
// Output log levels info and above from synapse_auto_compressor
|
||||||
log_builder.filter_module("auto_compressor", LevelFilter::Info);
|
log_builder.filter_module("synapse_auto_compressor", LevelFilter::Info);
|
||||||
log_builder.init();
|
log_builder.init();
|
||||||
} else {
|
} else {
|
||||||
// If RUST_LOG was set then use that
|
// If RUST_LOG was set then use that
|
||||||
let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
|
let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
|
||||||
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
|
|
||||||
// Ensure panics still come through
|
// Ensure panics still come through
|
||||||
log_builder.filter_module("panic", LevelFilter::Error);
|
log_builder.filter_module("panic", LevelFilter::Error);
|
||||||
log_builder.init();
|
log_builder.init();
|
||||||
}
|
}
|
||||||
log_panics::init();
|
log_panics::init();
|
||||||
// Announce the start of the program to the logs
|
// Announce the start of the program to the logs
|
||||||
log::info!("auto_compressor started");
|
log::info!("synapse_auto_compressor started");
|
||||||
|
|
||||||
// parse the command line arguments using the clap crate
|
// parse the command line arguments using the clap crate
|
||||||
let arguments = App::new(crate_name!())
|
let arguments = Command::new(crate_name!())
|
||||||
.version(crate_version!())
|
.version(crate_version!())
|
||||||
.author(crate_authors!("\n"))
|
.author(crate_authors!("\n"))
|
||||||
.about(crate_description!())
|
.about(crate_description!())
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("postgres-url")
|
Arg::new("postgres-url")
|
||||||
.short("p")
|
.short('p')
|
||||||
.value_name("POSTGRES_LOCATION")
|
.value_name("POSTGRES_LOCATION")
|
||||||
.help("The configruation for connecting to the postgres database.")
|
.help("The configruation for connecting to the postgres database.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -76,8 +69,8 @@ fn main() {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("chunk_size")
|
Arg::new("chunk_size")
|
||||||
.short("c")
|
.short('c')
|
||||||
.value_name("COUNT")
|
.value_name("COUNT")
|
||||||
.help("The maximum number of state groups to load into memroy at once")
|
.help("The maximum number of state groups to load into memroy at once")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -92,8 +85,8 @@ fn main() {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true),
|
.required(true),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("default_levels")
|
Arg::new("default_levels")
|
||||||
.short("l")
|
.short('l')
|
||||||
.value_name("LEVELS")
|
.value_name("LEVELS")
|
||||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
@@ -110,10 +103,10 @@ fn main() {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(false),
|
.required(false),
|
||||||
).arg(
|
).arg(
|
||||||
Arg::with_name("number_of_chunks")
|
Arg::new("number_of_chunks")
|
||||||
.short("n")
|
.short('n')
|
||||||
.value_name("CHUNKS_TO_COMPRESS")
|
.value_name("CHUNKS_TO_COMPRESS")
|
||||||
.help("The number of chunks to compress")
|
.help("The number of chunks to compress")
|
||||||
.long_help(concat!(
|
.long_help(concat!(
|
||||||
"This many chunks of the database will be compressed. The higher this number is set to, ",
|
"This many chunks of the database will be compressed. The higher this number is set to, ",
|
||||||
"the longer the compressor will run for."
|
"the longer the compressor will run for."
|
||||||
@@ -134,7 +127,8 @@ fn main() {
|
|||||||
.expect("A chunk size is required");
|
.expect("A chunk size is required");
|
||||||
|
|
||||||
// The default structure to use when compressing
|
// The default structure to use when compressing
|
||||||
let default_levels = value_t!(arguments, "default_levels", LevelInfo)
|
let default_levels = arguments
|
||||||
|
.value_of_t::<LevelInfo>("default_levels")
|
||||||
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
|
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
|
||||||
|
|
||||||
// The number of rooms to compress with this tool
|
// The number of rooms to compress with this tool
|
||||||
@@ -155,5 +149,5 @@ fn main() {
|
|||||||
manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks)
|
manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
log::info!("auto_compressor finished");
|
log::info!("synapse_auto_compressor finished");
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user