Compare commits
30 Commits
erikj/cont
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
01bce55ade
|
|||
|
|
bf92c82b7f | ||
|
|
4b9f2e2d64 | ||
|
|
71f24cf2b9 | ||
|
|
575d0fd878 | ||
|
|
2697e261da | ||
|
|
d3aad1a23f | ||
|
|
0890891bb0 | ||
|
|
8dc70fec8d | ||
|
|
982ee5ead8 | ||
|
|
8fca8adb04 | ||
|
|
9ee99cd547 | ||
|
|
6a065de6fc | ||
|
|
f4d96c73a8 | ||
|
|
923ca65f67 | ||
|
|
13882d7654 | ||
|
|
c0dac572c1 | ||
|
|
856b799c53 | ||
|
|
aab4d37123 | ||
|
|
fce2a7eee8 | ||
|
|
74bd719262 | ||
|
|
e3075d1451 | ||
|
|
d22acc6906 | ||
|
|
88d97ea413 | ||
|
|
152808baca | ||
|
|
2596f25eea | ||
|
|
4d3049d3ed | ||
|
|
9ff021f32e | ||
|
|
019b100521 | ||
|
|
da6271a331 |
46
.gitea/workflows/docker.yaml
Normal file
46
.gitea/workflows/docker.yaml
Normal file
@@ -0,0 +1,46 @@
|
||||
name: Build and push docker images
|
||||
|
||||
on:
|
||||
push:
|
||||
tags: ["v*"]
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Log in to Gitea Container Registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: git.yongyuancv.cn
|
||||
username: ${{ gitea.repository_owner }}
|
||||
password: ${{ secrets.GITEA_TOKEN }}
|
||||
|
||||
- name: Calculate docker image tag
|
||||
id: set-tag
|
||||
uses: docker/metadata-action@master
|
||||
with:
|
||||
images: |
|
||||
git.yongyuancv.cn/${{ gitea.repository }}
|
||||
git.yongyuancv.cn/heimoshuiyu/${{ gitea.event.repository.name }}
|
||||
flavor: |
|
||||
latest=false
|
||||
tags: |
|
||||
type=raw,value=latest,enable=${{ gitea.ref == 'refs/heads/main' }}
|
||||
type=sha,prefix=,format=long
|
||||
type=semver,pattern=v{{version}}
|
||||
type=semver,pattern=v{{major}}.{{minor}}
|
||||
|
||||
- name: Build and push all platforms
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
push: true
|
||||
labels: "gitsha1=${{ gitea.sha }}"
|
||||
tags: "${{ steps.set-tag.outputs.tags }}"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
cache-from: type=registry,ref=git.yongyuancv.cn/${{ gitea.repository }}:buildcache
|
||||
cache-to: type=registry,ref=git.yongyuancv.cn/${{ gitea.repository }}:buildcache,mode=max
|
||||
58
.github/workflows/docker.yaml
vendored
Normal file
58
.github/workflows/docker.yaml
vendored
Normal file
@@ -0,0 +1,58 @@
|
||||
# GitHub actions workflow which builds and publishes the docker images.
|
||||
|
||||
name: Build and push docker images
|
||||
|
||||
on:
|
||||
push:
|
||||
tags: ["v*"]
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Log in to DockerHub
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_HUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_HUB_TOKEN }}
|
||||
|
||||
- name: Log in to GHCR
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Calculate docker image tag
|
||||
id: set-tag
|
||||
uses: docker/metadata-action@master
|
||||
with:
|
||||
images: |
|
||||
ghcr.io/${{ github.repository }}
|
||||
docker.io/${{ secrets.DOCKER_HUB_USERNAME }}/${{ github.event.repository.name }}
|
||||
flavor: |
|
||||
latest=false
|
||||
tags: |
|
||||
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
|
||||
type=sha,prefix=,format=long
|
||||
type=semver,pattern=v{{version}}
|
||||
type=semver,pattern=v{{major}}.{{minor}}
|
||||
|
||||
- name: Build and push all platforms
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
push: true
|
||||
labels: "gitsha1=${{ github.sha }}"
|
||||
tags: "${{ steps.set-tag.outputs.tags }}"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
cache-from: type=registry,ref=ghcr.io/${{ github.repository }}:buildcache
|
||||
cache-to: type=registry,ref=ghcr.io/${{ github.repository }}:buildcache,mode=max
|
||||
28
.github/workflows/triage_incoming.yml
vendored
Normal file
28
.github/workflows/triage_incoming.yml
vendored
Normal file
@@ -0,0 +1,28 @@
|
||||
name: Move new issues into the issue triage board
|
||||
|
||||
on:
|
||||
issues:
|
||||
types: [ opened ]
|
||||
|
||||
jobs:
|
||||
add_new_issues:
|
||||
name: Add new issues to the triage board
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: octokit/graphql-action@v2.x
|
||||
id: add_to_project
|
||||
with:
|
||||
headers: '{"GraphQL-Features": "projects_next_graphql"}'
|
||||
query: |
|
||||
mutation add_to_project($projectid:ID!,$contentid:ID!) {
|
||||
addProjectV2ItemById(input: {projectId: $projectid contentId: $contentid}) {
|
||||
item {
|
||||
id
|
||||
}
|
||||
}
|
||||
}
|
||||
projectid: ${{ env.PROJECT_ID }}
|
||||
contentid: ${{ github.event.issue.node_id }}
|
||||
env:
|
||||
PROJECT_ID: "PVT_kwDOAIB0Bs4AFDdZ"
|
||||
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
||||
44
.github/workflows/triage_labelled.yml
vendored
Normal file
44
.github/workflows/triage_labelled.yml
vendored
Normal file
@@ -0,0 +1,44 @@
|
||||
name: Move labelled issues to correct projects
|
||||
|
||||
on:
|
||||
issues:
|
||||
types: [ labeled ]
|
||||
|
||||
jobs:
|
||||
move_needs_info:
|
||||
name: Move X-Needs-Info on the triage board
|
||||
runs-on: ubuntu-latest
|
||||
if: >
|
||||
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
|
||||
steps:
|
||||
- uses: actions/add-to-project@main
|
||||
id: add_project
|
||||
with:
|
||||
project-url: "https://github.com/orgs/matrix-org/projects/67"
|
||||
github-token: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
||||
- name: Set status
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.ELEMENT_BOT_TOKEN }}
|
||||
run: |
|
||||
gh api graphql -f query='
|
||||
mutation(
|
||||
$project: ID!
|
||||
$item: ID!
|
||||
$fieldid: ID!
|
||||
$columnid: String!
|
||||
) {
|
||||
updateProjectV2ItemFieldValue(
|
||||
input: {
|
||||
projectId: $project
|
||||
itemId: $item
|
||||
fieldId: $fieldid
|
||||
value: {
|
||||
singleSelectOptionId: $columnid
|
||||
}
|
||||
}
|
||||
) {
|
||||
projectV2Item {
|
||||
id
|
||||
}
|
||||
}
|
||||
}' -f project="PVT_kwDOAIB0Bs4AFDdZ" -f item=${{ steps.add_project.outputs.itemId }} -f fieldid="PVTSSF_lADOAIB0Bs4AFDdZzgC6ZA4" -f columnid=ba22e43c --silent
|
||||
80
CONTRIBUTING.md
Normal file
80
CONTRIBUTING.md
Normal file
@@ -0,0 +1,80 @@
|
||||
# Contributing
|
||||
|
||||
## Sign off
|
||||
|
||||
In order to have a concrete record that your contribution is intentional
|
||||
and you agree to license it under the same terms as the project's license, we've adopted the
|
||||
same lightweight approach that the Linux Kernel
|
||||
[submitting patches process](
|
||||
https://www.kernel.org/doc/html/latest/process/submitting-patches.html#sign-your-work-the-developer-s-certificate-of-origin>),
|
||||
[Docker](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), and many other
|
||||
projects use: the DCO ([Developer Certificate of Origin](http://developercertificate.org/)).
|
||||
This is a simple declaration that you wrote
|
||||
the contribution or otherwise have the right to contribute it to Matrix:
|
||||
|
||||
```
|
||||
Developer Certificate of Origin
|
||||
Version 1.1
|
||||
|
||||
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
|
||||
660 York Street, Suite 102,
|
||||
San Francisco, CA 94110 USA
|
||||
|
||||
Everyone is permitted to copy and distribute verbatim copies of this
|
||||
license document, but changing it is not allowed.
|
||||
|
||||
Developer's Certificate of Origin 1.1
|
||||
|
||||
By making a contribution to this project, I certify that:
|
||||
|
||||
(a) The contribution was created in whole or in part by me and I
|
||||
have the right to submit it under the open source license
|
||||
indicated in the file; or
|
||||
|
||||
(b) The contribution is based upon previous work that, to the best
|
||||
of my knowledge, is covered under an appropriate open source
|
||||
license and I have the right under that license to submit that
|
||||
work with modifications, whether created in whole or in part
|
||||
by me, under the same open source license (unless I am
|
||||
permitted to submit under a different license), as indicated
|
||||
in the file; or
|
||||
|
||||
(c) The contribution was provided directly to me by some other
|
||||
person who certified (a), (b) or (c) and I have not modified
|
||||
it.
|
||||
|
||||
(d) I understand and agree that this project and the contribution
|
||||
are public and that a record of the contribution (including all
|
||||
personal information I submit with it, including my sign-off) is
|
||||
maintained indefinitely and may be redistributed consistent with
|
||||
this project or the open source license(s) involved.
|
||||
```
|
||||
|
||||
If you agree to this for your contribution, then all that's needed is to
|
||||
include the line in your commit or pull request comment:
|
||||
|
||||
```
|
||||
Signed-off-by: Your Name <your@email.example.org>
|
||||
```
|
||||
|
||||
We accept contributions under a legally identifiable name, such as
|
||||
your name on government documentation or common-law names (names
|
||||
claimed by legitimate usage or repute). Unfortunately, we cannot
|
||||
accept anonymous contributions at this time.
|
||||
|
||||
Git allows you to add this signoff automatically when using the `-s`
|
||||
flag to `git commit`, which uses the name and email set in your
|
||||
`user.name` and `user.email` git configs.
|
||||
|
||||
### Private Sign off
|
||||
|
||||
If you would like to provide your legal name privately to the Matrix.org
|
||||
Foundation (instead of in a public commit or comment), you can do so
|
||||
by emailing your legal name and a link to the pull request to
|
||||
[dco@matrix.org](mailto:dco@matrix.org?subject=Private%20sign%20off).
|
||||
It helps to include "sign off" or similar in the subject line. You will then
|
||||
be instructed further.
|
||||
|
||||
Once private sign off is complete, doing so for future contributions will not
|
||||
be required.
|
||||
|
||||
1093
Cargo.lock
generated
1093
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
38
Cargo.toml
38
Cargo.toml
@@ -8,18 +8,21 @@ name = "synapse_compress_state"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
[[bin]]
|
||||
name = "synapse_compress_state"
|
||||
required-features = ["clap"]
|
||||
|
||||
[dependencies]
|
||||
indicatif = "0.16.0"
|
||||
openssl = "0.10.32"
|
||||
postgres = "0.19.0"
|
||||
indicatif = "0.17.6"
|
||||
openssl = "0.10.60"
|
||||
postgres = "0.19.7"
|
||||
postgres-openssl = "0.5.0"
|
||||
rand = "0.8.0"
|
||||
rayon = "1.3.0"
|
||||
string_cache = "0.8.0"
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.14"
|
||||
pyo3-log = "0.6.0"
|
||||
log-panics = "2.0.0"
|
||||
rand = "0.8.5"
|
||||
rayon = "1.7.0"
|
||||
string_cache = "0.8.7"
|
||||
env_logger = "0.10.0"
|
||||
log = "0.4.20"
|
||||
log-panics = "2.1.0"
|
||||
|
||||
[dependencies.state-map]
|
||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||
@@ -29,18 +32,25 @@ git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||
crate-type = ["cdylib", "rlib"]
|
||||
|
||||
[dependencies.clap]
|
||||
version = "3.1.14"
|
||||
version = "4.4.2"
|
||||
features = ["cargo"]
|
||||
optional = true
|
||||
|
||||
[dependencies.pyo3]
|
||||
version = "0.16.4"
|
||||
version = "0.19.2"
|
||||
features = ["extension-module"]
|
||||
optional = true
|
||||
|
||||
[dependencies.pyo3-log]
|
||||
version = "0.8.3"
|
||||
optional = true
|
||||
|
||||
[dependencies.tikv-jemallocator]
|
||||
version = "0.5.0"
|
||||
version = "0.5.4"
|
||||
optional = true
|
||||
|
||||
[features]
|
||||
default = ["jemalloc"]
|
||||
default = ["clap", "jemalloc"]
|
||||
jemalloc = ["tikv-jemallocator"]
|
||||
no-progress-bars = []
|
||||
pyo3 = ["dep:pyo3", "dep:pyo3-log"]
|
||||
|
||||
56
Dockerfile
56
Dockerfile
@@ -1,22 +1,54 @@
|
||||
FROM rust:alpine AS builder
|
||||
# This uses the multi-stage build feature of Docker to build the binaries for multiple architectures without QEMU.
|
||||
# The first stage is responsible for building binaries for all the supported architectures (amd64 and arm64), and the
|
||||
# second stage only copies the binaries for the target architecture.
|
||||
# We leverage Zig and cargo-zigbuild for providing a cross-compilation-capable C compiler and linker.
|
||||
|
||||
RUN apk add python3 musl-dev pkgconfig openssl-dev make
|
||||
ARG RUSTC_VERSION=1.72.0
|
||||
ARG ZIG_VERSION=0.11.0
|
||||
ARG CARGO_ZIGBUILD_VERSION=0.17.1
|
||||
|
||||
ENV RUSTFLAGS="-C target-feature=-crt-static"
|
||||
FROM --platform=${BUILDPLATFORM} docker.io/rust:${RUSTC_VERSION} AS builder
|
||||
|
||||
# Install cargo-zigbuild for cross-compilation
|
||||
ARG CARGO_ZIGBUILD_VERSION
|
||||
RUN cargo install --locked cargo-zigbuild@=${CARGO_ZIGBUILD_VERSION}
|
||||
|
||||
# Download zig compiler for cross-compilation
|
||||
ARG ZIG_VERSION
|
||||
RUN curl -L "https://ziglang.org/download/${ZIG_VERSION}/zig-linux-$(uname -m)-${ZIG_VERSION}.tar.xz" | tar -J -x -C /usr/local && \
|
||||
ln -s "/usr/local/zig-linux-$(uname -m)-${ZIG_VERSION}/zig" /usr/local/bin/zig
|
||||
|
||||
# Install all cross-compilation targets
|
||||
ARG RUSTC_VERSION
|
||||
RUN rustup target add \
|
||||
--toolchain "${RUSTC_VERSION}" \
|
||||
x86_64-unknown-linux-musl \
|
||||
aarch64-unknown-linux-musl
|
||||
|
||||
WORKDIR /opt/synapse-compressor/
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN cargo build
|
||||
# Build for all targets
|
||||
RUN cargo zigbuild \
|
||||
--release \
|
||||
--workspace \
|
||||
--bins \
|
||||
--features "openssl/vendored" \
|
||||
--target aarch64-unknown-linux-musl \
|
||||
--target x86_64-unknown-linux-musl
|
||||
|
||||
WORKDIR /opt/synapse-compressor/synapse_auto_compressor/
|
||||
# Move the binaries in a separate folder per architecture, so we can copy them using the TARGETARCH build arg
|
||||
RUN mkdir -p /opt/binaries/amd64 /opt/binaries/arm64
|
||||
RUN mv target/x86_64-unknown-linux-musl/release/synapse_compress_state \
|
||||
target/x86_64-unknown-linux-musl/release/synapse_auto_compressor \
|
||||
/opt/binaries/amd64
|
||||
RUN mv target/aarch64-unknown-linux-musl/release/synapse_compress_state \
|
||||
target/aarch64-unknown-linux-musl/release/synapse_auto_compressor \
|
||||
/opt/binaries/arm64
|
||||
|
||||
RUN cargo build
|
||||
FROM --platform=${TARGETPLATFORM} docker.io/alpine
|
||||
|
||||
FROM alpine
|
||||
ARG TARGETARCH
|
||||
|
||||
RUN apk add --no-cache libgcc
|
||||
|
||||
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_compress_state /usr/local/bin/synapse_compress_state
|
||||
COPY --from=builder /opt/synapse-compressor/target/debug/synapse_auto_compressor /usr/local/bin/synapse_auto_compressor
|
||||
COPY --from=builder /opt/binaries/${TARGETARCH}/synapse_compress_state /usr/local/bin/synapse_compress_state
|
||||
COPY --from=builder /opt/binaries/${TARGETARCH}/synapse_auto_compressor /usr/local/bin/synapse_auto_compressor
|
||||
13
README.md
13
README.md
@@ -26,6 +26,9 @@ periodically.
|
||||
This tool requires `cargo` to be installed. See https://www.rust-lang.org/tools/install
|
||||
for instructions on how to do this.
|
||||
|
||||
This project follows the deprecation policy of [Synapse](https://matrix-org.github.io/synapse/latest/deprecation_policy.html)
|
||||
on Rust and will assume a recent stable version of Rust and the ability to fetch a more recent one if necessary.
|
||||
|
||||
To build `synapse_auto_compressor`, clone this repository and navigate to the
|
||||
`synapse_auto_compressor/` subdirectory. Then execute `cargo build`.
|
||||
|
||||
@@ -33,9 +36,17 @@ This will create an executable and store it in
|
||||
`synapse_auto_compressor/target/debug/synapse_auto_compressor`.
|
||||
|
||||
## Example usage
|
||||
|
||||
Compress 100 chunks of size 500 in a remote PostgreSQL database:
|
||||
```
|
||||
$ synapse_auto_compressor -p postgresql://user:pass@localhost/synapse -c 500 -n 100
|
||||
```
|
||||
|
||||
Compress 100 chunks of size 500 using local PostgreSQL socket:
|
||||
```
|
||||
$ sudo -u postgres synapse_auto_compressor -p "user=postgres dbname=matrix-synapse host=/var/run/postgresql" -c 500 -n 100
|
||||
```
|
||||
|
||||
## Running Options
|
||||
|
||||
- -p [POSTGRES_LOCATION] **Required**
|
||||
@@ -56,7 +67,7 @@ in) then the entire chunk is skipped.
|
||||
*CHUNKS_TO_COMPRESS* chunks of size *CHUNK_SIZE* will be compressed. The higher this
|
||||
number is set to, the longer the compressor will run for.
|
||||
|
||||
- -d [LEVELS]
|
||||
- -l [LEVELS]
|
||||
Sizes of each new level in the compression algorithm, as a comma-separated list.
|
||||
The first entry in the list is for the lowest, most granular level, with each
|
||||
subsequent entry being for the next highest level. The number of entries in the
|
||||
|
||||
@@ -6,16 +6,16 @@ edition = "2018"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
string_cache = "0.8.0"
|
||||
serial_test = "0.5.1"
|
||||
openssl = "0.10.32"
|
||||
postgres = "0.19.0"
|
||||
string_cache = "0.8.7"
|
||||
serial_test = "2.0.0"
|
||||
openssl = "0.10.60"
|
||||
postgres = "0.19.7"
|
||||
postgres-openssl = "0.5.0"
|
||||
rand = "0.8.0"
|
||||
rand = "0.8.5"
|
||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
||||
synapse_auto_compressor = { path = "../synapse_auto_compressor/" }
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.14"
|
||||
env_logger = "0.10.0"
|
||||
log = "0.4.20"
|
||||
|
||||
[dependencies.state-map]
|
||||
git = "https://github.com/matrix-org/rust-matrix-state-map"
|
||||
|
||||
@@ -4,7 +4,12 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
|
||||
use postgres_openssl::MakeTlsConnector;
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
use state_map::StateMap;
|
||||
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::BTreeMap,
|
||||
env,
|
||||
fmt::{self, Write as _},
|
||||
};
|
||||
use string_cache::DefaultAtom as Atom;
|
||||
|
||||
use synapse_compress_state::StateGroupEntry;
|
||||
@@ -23,47 +28,48 @@ pub fn add_contents_to_database(room_id: &str, state_group_map: &BTreeMap<i64, S
|
||||
let mut client = Client::connect(DB_URL, connector).unwrap();
|
||||
|
||||
// build up the query
|
||||
let mut sql = "".to_string();
|
||||
let mut sql = String::new();
|
||||
|
||||
let room_id = PGEscape(room_id);
|
||||
let event_id = PGEscape("left_blank");
|
||||
|
||||
for (sg, entry) in state_group_map {
|
||||
// create the entry for state_groups
|
||||
sql.push_str(&format!(
|
||||
"INSERT INTO state_groups (id, room_id, event_id) VALUES ({},{},{});\n",
|
||||
sg,
|
||||
PGEscape(room_id),
|
||||
PGEscape("left_blank")
|
||||
));
|
||||
writeln!(
|
||||
sql,
|
||||
"INSERT INTO state_groups (id, room_id, event_id) \
|
||||
VALUES ({sg}, {room_id}, {event_id});",
|
||||
)
|
||||
.expect("Writing to a String cannot fail");
|
||||
|
||||
// create the entry in state_group_edges IF exists
|
||||
if let Some(prev_sg) = entry.prev_state_group {
|
||||
sql.push_str(&format!(
|
||||
"INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n",
|
||||
sg, prev_sg
|
||||
));
|
||||
writeln!(
|
||||
sql,
|
||||
"INSERT INTO state_group_edges (state_group, prev_state_group) \
|
||||
VALUES ({sg}, {prev_sg});",
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// write entry for each row in delta
|
||||
if !entry.state_map.is_empty() {
|
||||
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES");
|
||||
sql.push_str(
|
||||
"INSERT INTO state_groups_state \
|
||||
(state_group, room_id, type, state_key, event_id) \
|
||||
VALUES\n",
|
||||
);
|
||||
|
||||
let mut first = true;
|
||||
for ((t, s), e) in entry.state_map.iter() {
|
||||
if first {
|
||||
sql.push_str(" ");
|
||||
first = false;
|
||||
} else {
|
||||
sql.push_str(" ,");
|
||||
}
|
||||
sql.push_str(&format!(
|
||||
"({}, {}, {}, {}, {})",
|
||||
sg,
|
||||
PGEscape(room_id),
|
||||
PGEscape(t),
|
||||
PGEscape(s),
|
||||
PGEscape(e)
|
||||
));
|
||||
let t = PGEscape(t);
|
||||
let s = PGEscape(s);
|
||||
let e = PGEscape(e);
|
||||
|
||||
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
|
||||
}
|
||||
sql.push_str(";\n");
|
||||
|
||||
// Replace the last comma with a semicolon
|
||||
sql.replace_range((sql.len() - 2).., ";\n");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,7 +195,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
|
||||
|
||||
while let Some(sg) = next_group {
|
||||
// get predecessor from state_group_edges
|
||||
let mut pred = client.query_raw(query_pred, &[sg]).unwrap();
|
||||
let mut pred = client.query_raw(query_pred, [sg]).unwrap();
|
||||
|
||||
// set next_group to predecessor
|
||||
next_group = match pred.next().unwrap() {
|
||||
@@ -203,7 +209,7 @@ fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
|
||||
}
|
||||
drop(pred);
|
||||
|
||||
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
|
||||
let mut rows = client.query_raw(query_deltas, [sg]).unwrap();
|
||||
|
||||
while let Some(row) = rows.next().unwrap() {
|
||||
// Copy the single delta from the predecessor stored in this row
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
[build-system]
|
||||
requires = ["maturin>=0.11,<0.12"]
|
||||
requires = ["maturin>=1.0,<2.0"]
|
||||
build-backend = "maturin"
|
||||
cargo-extra-args = "--no-default-features"
|
||||
|
||||
[tool.maturin]
|
||||
profile = "release"
|
||||
features = ["pyo3"]
|
||||
no-default-features = true
|
||||
@@ -30,7 +30,7 @@
|
||||
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use state_map::StateMap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
use string_cache::DefaultAtom as Atom;
|
||||
|
||||
use super::{collapse_state_maps, StateGroupEntry};
|
||||
@@ -156,7 +156,7 @@ impl<'a> Compressor<'a> {
|
||||
) -> Compressor<'a> {
|
||||
let levels = level_info
|
||||
.iter()
|
||||
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).head))
|
||||
.map(|l| Level::restore(l.max_length, l.current_chain_length, l.head))
|
||||
.collect();
|
||||
|
||||
let mut compressor = Compressor {
|
||||
@@ -187,10 +187,12 @@ impl<'a> Compressor<'a> {
|
||||
ProgressBar::new(self.original_state_map.len() as u64)
|
||||
};
|
||||
pb.set_style(
|
||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||
ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
||||
.unwrap(),
|
||||
);
|
||||
pb.set_message("state groups");
|
||||
pb.enable_steady_tick(100);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
for (&state_group, entry) in self.original_state_map {
|
||||
// Check whether this entry is in_range or is just present in the map due to being
|
||||
|
||||
@@ -18,7 +18,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
||||
use postgres::{fallible_iterator::FallibleIterator, types::ToSql, Client};
|
||||
use postgres_openssl::MakeTlsConnector;
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
use std::{borrow::Cow, collections::BTreeMap, fmt};
|
||||
use std::{borrow::Cow, collections::BTreeMap, fmt, time::Duration};
|
||||
|
||||
use crate::{compressor::Level, generate_sql};
|
||||
|
||||
@@ -237,15 +237,9 @@ fn load_map_from_db(
|
||||
let mut missing_sgs: Vec<_> = state_group_map
|
||||
.iter()
|
||||
.filter_map(|(_sg, entry)| {
|
||||
if let Some(prev_sg) = entry.prev_state_group {
|
||||
if state_group_map.contains_key(&prev_sg) {
|
||||
None
|
||||
} else {
|
||||
Some(prev_sg)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
entry
|
||||
.prev_state_group
|
||||
.filter(|&prev_sg| !state_group_map.contains_key(&prev_sg))
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -378,9 +372,11 @@ fn get_initial_data_from_db(
|
||||
ProgressBar::new_spinner()
|
||||
};
|
||||
pb.set_style(
|
||||
ProgressStyle::default_spinner().template("{spinner} [{elapsed}] {pos} rows retrieved"),
|
||||
ProgressStyle::default_spinner()
|
||||
.template("{spinner} [{elapsed}] {pos} rows retrieved")
|
||||
.unwrap(),
|
||||
);
|
||||
pb.enable_steady_tick(100);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
while let Some(row) = rows.next().unwrap() {
|
||||
// The row in the map to copy the data to
|
||||
@@ -542,10 +538,12 @@ pub fn send_changes_to_db(
|
||||
ProgressBar::new(old_map.len() as u64)
|
||||
};
|
||||
pb.set_style(
|
||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||
ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
||||
.unwrap(),
|
||||
);
|
||||
pb.set_message("state groups");
|
||||
pb.enable_steady_tick(100);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
for sql_transaction in generate_sql(old_map, new_map, room_id) {
|
||||
if sql_transaction.is_empty() {
|
||||
|
||||
184
src/lib.rs
184
src/lib.rs
@@ -20,14 +20,19 @@
|
||||
// of arguments - this hopefully doesn't make the code unclear
|
||||
// #[allow(clippy::too_many_arguments)] is therefore used around some functions
|
||||
|
||||
use log::{info, warn, LevelFilter};
|
||||
use log::{info, warn};
|
||||
#[cfg(feature = "pyo3")]
|
||||
use pyo3::{exceptions, prelude::*};
|
||||
|
||||
#[cfg(feature = "clap")]
|
||||
use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command};
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use rayon::prelude::*;
|
||||
use state_map::StateMap;
|
||||
use std::{collections::BTreeMap, convert::TryInto, fs::File, io::Write, str::FromStr};
|
||||
use std::{
|
||||
collections::BTreeMap, convert::TryInto, fmt::Write as _, fs::File, io::Write, str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use string_cache::DefaultAtom as Atom;
|
||||
|
||||
mod compressor;
|
||||
@@ -49,7 +54,7 @@ pub struct StateGroupEntry {
|
||||
}
|
||||
|
||||
/// Helper struct for parsing the `level_sizes` argument.
|
||||
#[derive(PartialEq, Debug)]
|
||||
#[derive(PartialEq, Debug, Clone)]
|
||||
struct LevelSizes(Vec<usize>);
|
||||
|
||||
impl FromStr for LevelSizes {
|
||||
@@ -114,6 +119,7 @@ pub struct Config {
|
||||
verify: bool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "clap")]
|
||||
impl Config {
|
||||
/// Build up config from command line arguments
|
||||
pub fn parse_arguments() -> Config {
|
||||
@@ -133,7 +139,7 @@ impl Config {
|
||||
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
||||
"for the full details."
|
||||
))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(true),
|
||||
).arg(
|
||||
Arg::new("room_id")
|
||||
@@ -144,53 +150,58 @@ impl Config {
|
||||
"The room to process. This is the value found in the rooms table of the database",
|
||||
" not the common name for the room - is should look like: \"!wOlkWNmgkAZFxbTaqj:matrix.org\""
|
||||
))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(true),
|
||||
).arg(
|
||||
Arg::new("min_state_group")
|
||||
.short('b')
|
||||
.value_name("MIN_STATE_GROUP")
|
||||
.value_parser(clap::value_parser!(i64))
|
||||
.help("The state group to start processing from (non inclusive)")
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::new("min_saved_rows")
|
||||
.short('m')
|
||||
.value_name("COUNT")
|
||||
.value_parser(clap::value_parser!(i32))
|
||||
.help("Abort if fewer than COUNT rows would be saved")
|
||||
.long_help("If the compressor cannot save this many rows from the database then it will stop early")
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::new("groups_to_compress")
|
||||
.short('n')
|
||||
.value_name("GROUPS_TO_COMPRESS")
|
||||
.value_parser(clap::value_parser!(i64))
|
||||
.help("How many groups to load into memory to compress")
|
||||
.long_help(concat!(
|
||||
"How many groups to load into memory to compress (starting from",
|
||||
" the 1st group in the room or the group specified by -s)"))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::new("output_file")
|
||||
.short('o')
|
||||
.value_name("FILE")
|
||||
.help("File to output the changes to in SQL")
|
||||
.takes_value(true),
|
||||
.num_args(1),
|
||||
).arg(
|
||||
Arg::new("max_state_group")
|
||||
.short('s')
|
||||
.value_name("MAX_STATE_GROUP")
|
||||
.value_parser(clap::value_parser!(i64))
|
||||
.help("The maximum state group to process up to")
|
||||
.long_help(concat!(
|
||||
"If a max_state_group is specified then only state groups with id's lower",
|
||||
" than this number are able to be compressed."))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::new("level_sizes")
|
||||
.short('l')
|
||||
.value_name("LEVELS")
|
||||
.value_parser(clap::value_parser!(LevelSizes))
|
||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||
.long_help(concat!(
|
||||
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
||||
@@ -203,10 +214,11 @@ impl Config {
|
||||
" iterations needed to fetch a given set of state.",
|
||||
))
|
||||
.default_value("100,50,25")
|
||||
.takes_value(true),
|
||||
.num_args(1),
|
||||
).arg(
|
||||
Arg::new("transactions")
|
||||
.short('t')
|
||||
.action(clap::ArgAction::SetTrue)
|
||||
.help("Whether to wrap each state group change in a transaction")
|
||||
.long_help(concat!("If this flag is set then then each change to a particular",
|
||||
" state group is wrapped in a transaction. This should be done if you wish to",
|
||||
@@ -215,6 +227,7 @@ impl Config {
|
||||
).arg(
|
||||
Arg::new("graphs")
|
||||
.short('g')
|
||||
.action(clap::ArgAction::SetTrue)
|
||||
.help("Output before and after graphs")
|
||||
.long_help(concat!("If this flag is set then output the node and edge information for",
|
||||
" the state_group directed graph built up from the predecessor state_group links.",
|
||||
@@ -222,6 +235,7 @@ impl Config {
|
||||
).arg(
|
||||
Arg::new("commit_changes")
|
||||
.short('c')
|
||||
.action(clap::ArgAction::SetTrue)
|
||||
.help("Commit changes to the database")
|
||||
.long_help(concat!("If this flag is set then the changes the compressor makes will",
|
||||
" be committed to the database. This should be safe to use while synapse is running",
|
||||
@@ -229,6 +243,7 @@ impl Config {
|
||||
).arg(
|
||||
Arg::new("no_verify")
|
||||
.short('N')
|
||||
.action(clap::ArgAction::SetTrue)
|
||||
.help("Do not double-check that the compression was performed correctly")
|
||||
.long_help(concat!("If this flag is set then the verification of the compressed",
|
||||
" state groups, which compares them to the original groups, is skipped. This",
|
||||
@@ -236,44 +251,27 @@ impl Config {
|
||||
).get_matches();
|
||||
|
||||
let db_url = matches
|
||||
.value_of("postgres-url")
|
||||
.get_one::<String>("postgres-url")
|
||||
.expect("db url should be required");
|
||||
|
||||
let output_file = matches.value_of("output_file").map(|path| {
|
||||
let output_file = matches.get_one::<String>("output_file").map(|path| {
|
||||
File::create(path).unwrap_or_else(|e| panic!("Unable to create output file: {}", e))
|
||||
});
|
||||
|
||||
let room_id = matches
|
||||
.value_of("room_id")
|
||||
.get_one::<String>("room_id")
|
||||
.expect("room_id should be required since no file");
|
||||
|
||||
let min_state_group = matches
|
||||
.value_of("min_state_group")
|
||||
.map(|s| s.parse().expect("min_state_group must be an integer"));
|
||||
let min_state_group = matches.get_one("min_state_group").copied();
|
||||
let groups_to_compress = matches.get_one("groups_to_compress").copied();
|
||||
let min_saved_rows = matches.get_one("min_saved_rows").copied();
|
||||
let max_state_group = matches.get_one("max_state_group").copied();
|
||||
let level_sizes = matches.get_one("level_sizes").cloned().unwrap();
|
||||
|
||||
let groups_to_compress = matches
|
||||
.value_of("groups_to_compress")
|
||||
.map(|s| s.parse().expect("groups_to_compress must be an integer"));
|
||||
|
||||
let min_saved_rows = matches
|
||||
.value_of("min_saved_rows")
|
||||
.map(|v| v.parse().expect("COUNT must be an integer"));
|
||||
|
||||
let max_state_group = matches
|
||||
.value_of("max_state_group")
|
||||
.map(|s| s.parse().expect("max_state_group must be an integer"));
|
||||
|
||||
let level_sizes = matches
|
||||
.value_of_t::<LevelSizes>("level_sizes")
|
||||
.unwrap_or_else(|e| panic!("Unable to parse level_sizes: {}", e));
|
||||
|
||||
let transactions = matches.is_present("transactions");
|
||||
|
||||
let graphs = matches.is_present("graphs");
|
||||
|
||||
let commit_changes = matches.is_present("commit_changes");
|
||||
|
||||
let verify = !matches.is_present("no_verify");
|
||||
let transactions = matches.get_flag("transactions");
|
||||
let graphs = matches.get_flag("graphs");
|
||||
let commit_changes = matches.get_flag("commit_changes");
|
||||
let verify = !matches.get_flag("no_verify");
|
||||
|
||||
Config {
|
||||
db_url: String::from(db_url),
|
||||
@@ -424,8 +422,7 @@ fn generate_sql<'a>(
|
||||
new_map: &'a BTreeMap<i64, StateGroupEntry>,
|
||||
room_id: &'a str,
|
||||
) -> impl Iterator<Item = String> + 'a {
|
||||
old_map.iter().map(move |(sg,old_entry)| {
|
||||
|
||||
old_map.iter().map(move |(sg, old_entry)| {
|
||||
let new_entry = &new_map[sg];
|
||||
|
||||
// Check if the new map has a different entry for this state group
|
||||
@@ -435,48 +432,50 @@ fn generate_sql<'a>(
|
||||
let mut sql = String::new();
|
||||
|
||||
// remove the current edge
|
||||
sql.push_str(&format!(
|
||||
"DELETE FROM state_group_edges WHERE state_group = {};\n",
|
||||
sg
|
||||
));
|
||||
writeln!(
|
||||
sql,
|
||||
"DELETE FROM state_group_edges WHERE state_group = {sg};",
|
||||
)
|
||||
.expect("Writing to a String cannot fail");
|
||||
|
||||
// if the new entry has a predecessor then put that into state_group_edges
|
||||
if let Some(prev_sg) = new_entry.prev_state_group {
|
||||
sql.push_str(&format!("INSERT INTO state_group_edges (state_group, prev_state_group) VALUES ({}, {});\n", sg, prev_sg));
|
||||
writeln!(
|
||||
sql,
|
||||
"INSERT INTO state_group_edges (state_group, prev_state_group) \
|
||||
VALUES ({sg}, {prev_sg});",
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// remove the current deltas for this state group
|
||||
sql.push_str(&format!(
|
||||
"DELETE FROM state_groups_state WHERE state_group = {};\n",
|
||||
sg
|
||||
));
|
||||
writeln!(
|
||||
sql,
|
||||
"DELETE FROM state_groups_state WHERE state_group = {sg};",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if !new_entry.state_map.is_empty() {
|
||||
// place all the deltas for the state group in the new map into state_groups_state
|
||||
sql.push_str("INSERT INTO state_groups_state (state_group, room_id, type, state_key, event_id) VALUES\n");
|
||||
sql.push_str(
|
||||
"INSERT INTO state_groups_state \
|
||||
(state_group, room_id, type, state_key, event_id) \
|
||||
VALUES\n",
|
||||
);
|
||||
|
||||
let mut first = true;
|
||||
let room_id = PGEscape(room_id);
|
||||
for ((t, s), e) in new_entry.state_map.iter() {
|
||||
// Add a comma at the start if not the first row to be inserted
|
||||
if first {
|
||||
sql.push_str(" ");
|
||||
first = false;
|
||||
} else {
|
||||
sql.push_str(" ,");
|
||||
}
|
||||
let t = PGEscape(t);
|
||||
let s = PGEscape(s);
|
||||
let e = PGEscape(e);
|
||||
|
||||
// write the row to be insterted of the form:
|
||||
// write the row to be inserted of the form:
|
||||
// (state_group, room_id, type, state_key, event_id)
|
||||
sql.push_str(&format!(
|
||||
"({}, {}, {}, {}, {})",
|
||||
sg,
|
||||
PGEscape(room_id),
|
||||
PGEscape(t),
|
||||
PGEscape(s),
|
||||
PGEscape(e)
|
||||
));
|
||||
writeln!(sql, " ({sg}, {room_id}, {t}, {s}, {e}),").unwrap();
|
||||
}
|
||||
sql.push_str(";\n");
|
||||
|
||||
// Replace the last comma with a semicolon
|
||||
sql.replace_range((sql.len() - 2).., ";\n");
|
||||
}
|
||||
|
||||
sql
|
||||
@@ -514,10 +513,12 @@ fn output_sql(
|
||||
ProgressBar::new(old_map.len() as u64)
|
||||
};
|
||||
pb.set_style(
|
||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||
ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
||||
.unwrap(),
|
||||
);
|
||||
pb.set_message("state groups");
|
||||
pb.enable_steady_tick(100);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
if let Some(output) = &mut config.output_file {
|
||||
for mut sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
|
||||
@@ -565,7 +566,7 @@ pub fn continue_run(
|
||||
let (state_group_map, max_group_found) =
|
||||
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
|
||||
|
||||
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
|
||||
let original_num_rows = state_group_map.values().map(|v| v.state_map.len()).sum();
|
||||
|
||||
// Now we actually call the compression algorithm.
|
||||
let compressor = Compressor::compress_from_save(&state_group_map, level_info);
|
||||
@@ -628,10 +629,12 @@ fn check_that_maps_match(
|
||||
ProgressBar::new(old_map.len() as u64)
|
||||
};
|
||||
pb.set_style(
|
||||
ProgressStyle::default_bar().template("[{elapsed_precise}] {bar} {pos}/{len} {msg}"),
|
||||
ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar} {pos}/{len} {msg}")
|
||||
.unwrap(),
|
||||
);
|
||||
pb.set_message("state groups");
|
||||
pb.enable_steady_tick(100);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
// Now let's iterate through and assert that the state for each group
|
||||
// matches between the two versions.
|
||||
@@ -747,23 +750,31 @@ impl Config {
|
||||
/// Default arguments are equivalent to using the command line tool
|
||||
/// No default's are provided for db_url or room_id since these arguments
|
||||
/// are compulsory (so that new() act's like parse_arguments())
|
||||
#[cfg(feature = "pyo3")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[pyfunction(
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (
|
||||
// db_url has no default
|
||||
// room_id has no default
|
||||
output_file = "None",
|
||||
min_state_group = "None",
|
||||
groups_to_compress = "None",
|
||||
min_saved_rows = "None",
|
||||
max_state_group = "None",
|
||||
level_sizes = "String::from(\"100,50,25\")",
|
||||
db_url,
|
||||
|
||||
// room_id has no default
|
||||
room_id,
|
||||
|
||||
output_file = None,
|
||||
min_state_group = None,
|
||||
groups_to_compress = None,
|
||||
min_saved_rows = None,
|
||||
max_state_group = None,
|
||||
level_sizes = String::from("100,50,25"),
|
||||
|
||||
// have this default to true as is much worse to not have it if you need it
|
||||
// than to have it and not need it
|
||||
transactions = true,
|
||||
|
||||
graphs = false,
|
||||
commit_changes = false,
|
||||
verify = true,
|
||||
)]
|
||||
))]
|
||||
fn run_compression(
|
||||
db_url: String,
|
||||
room_id: String,
|
||||
@@ -802,14 +813,15 @@ fn run_compression(
|
||||
}
|
||||
|
||||
/// Python module - "import synapse_compress_state" to use
|
||||
#[cfg(feature = "pyo3")]
|
||||
#[pymodule]
|
||||
fn synapse_compress_state(_py: Python, m: &PyModule) -> PyResult<()> {
|
||||
let _ = pyo3_log::Logger::default()
|
||||
// don't send out anything lower than a warning from other crates
|
||||
.filter(LevelFilter::Warn)
|
||||
.filter(log::LevelFilter::Warn)
|
||||
// don't log warnings from synapse_compress_state, the synapse_auto_compressor handles these
|
||||
// situations and provides better log messages
|
||||
.filter_target("synapse_compress_state".to_owned(), LevelFilter::Debug)
|
||||
.filter_target("synapse_compress_state".to_owned(), log::LevelFilter::Debug)
|
||||
.install();
|
||||
// ensure any panics produce error messages in the log
|
||||
log_panics::init();
|
||||
|
||||
@@ -4,6 +4,10 @@ authors = ["William Ashton"]
|
||||
version = "0.1.3"
|
||||
edition = "2018"
|
||||
|
||||
[[bin]]
|
||||
name = "synapse_auto_compressor"
|
||||
required-features = ["clap"]
|
||||
|
||||
[package.metadata.maturin]
|
||||
requires-python = ">=3.7"
|
||||
project-url = {Source = "https://github.com/matrix-org/rust-synapse-compress-state"}
|
||||
@@ -13,27 +17,40 @@ classifier = [
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
openssl = "0.10.32"
|
||||
postgres = "0.19.0"
|
||||
openssl = { version = "0.10.60", features = ["vendored"] }
|
||||
postgres = "0.19.7"
|
||||
postgres-openssl = "0.5.0"
|
||||
tikv-jemallocator = "0.5.0"
|
||||
rand = "0.8.0"
|
||||
serial_test = "0.5.1"
|
||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"] }
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.14"
|
||||
log-panics = "2.0.0"
|
||||
anyhow = "1.0.42"
|
||||
pyo3-log = "0.6.0"
|
||||
rand = "0.8.5"
|
||||
serial_test = "2.0.0"
|
||||
synapse_compress_state = { path = "../", features = ["no-progress-bars"], default-features = false }
|
||||
env_logger = "0.10.0"
|
||||
log = "0.4.20"
|
||||
log-panics = "2.1.0"
|
||||
anyhow = "1.0.75"
|
||||
|
||||
# Needed for pyo3 support
|
||||
[lib]
|
||||
crate-type = ["cdylib", "rlib"]
|
||||
|
||||
[dependencies.clap]
|
||||
version = "3.1.14"
|
||||
version = "4.4.2"
|
||||
features = ["cargo"]
|
||||
optional = true
|
||||
|
||||
[dependencies.pyo3]
|
||||
version = "0.16.4"
|
||||
version = "0.19.2"
|
||||
features = ["extension-module"]
|
||||
optional = true
|
||||
|
||||
[dependencies.pyo3-log]
|
||||
version = "0.8.3"
|
||||
optional = true
|
||||
|
||||
[dependencies.tikv-jemallocator]
|
||||
version = "0.5.4"
|
||||
optional = true
|
||||
|
||||
[features]
|
||||
default = ["clap", "jemalloc"]
|
||||
jemalloc = ["tikv-jemallocator", "synapse_compress_state/jemalloc"]
|
||||
pyo3 = ["dep:pyo3", "dep:pyo3-log", "synapse_compress_state/pyo3"]
|
||||
|
||||
8
synapse_auto_compressor/pyproject.toml
Normal file
8
synapse_auto_compressor/pyproject.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[build-system]
|
||||
requires = ["maturin>=1.0,<2.0"]
|
||||
build-backend = "maturin"
|
||||
|
||||
[tool.maturin]
|
||||
profile = "release"
|
||||
features = ["pyo3"]
|
||||
no-default-features = true
|
||||
@@ -7,7 +7,9 @@
|
||||
//! on space reductions
|
||||
|
||||
use anyhow::Result;
|
||||
#[cfg(feature = "pyo3")]
|
||||
use log::{error, LevelFilter};
|
||||
#[cfg(feature = "pyo3")]
|
||||
use pyo3::{
|
||||
exceptions::PyRuntimeError, prelude::pymodule, types::PyModule, PyErr, PyResult, Python,
|
||||
};
|
||||
@@ -26,7 +28,7 @@ pub mod state_saving;
|
||||
///
|
||||
/// This is needed since FromStr cannot be implemented for structs
|
||||
/// that aren't defined in this scope
|
||||
#[derive(PartialEq, Debug)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub struct LevelInfo(pub Vec<Level>);
|
||||
|
||||
// Implement FromStr so that an argument of the form "100,50,25"
|
||||
@@ -56,6 +58,7 @@ impl FromStr for LevelInfo {
|
||||
}
|
||||
|
||||
// PyO3 INTERFACE STARTS HERE
|
||||
#[cfg(feature = "pyo3")]
|
||||
#[pymodule]
|
||||
fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
||||
let _ = pyo3_log::Logger::default()
|
||||
@@ -71,7 +74,8 @@ fn synapse_auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> {
|
||||
// ensure any panics produce error messages in the log
|
||||
log_panics::init();
|
||||
|
||||
#[pyfn(m, compress_largest_rooms)]
|
||||
#[pyfn(m)]
|
||||
#[pyo3(name = "compress_largest_rooms")]
|
||||
fn compress_state_events_table(
|
||||
py: Python,
|
||||
db_url: String,
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
//! the state_compressor_state table so that the compressor can seemlesly
|
||||
//! continue from where it left off.
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
@@ -66,12 +67,13 @@ fn main() {
|
||||
"See https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html ",
|
||||
"for the full details."
|
||||
))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(true),
|
||||
).arg(
|
||||
Arg::new("chunk_size")
|
||||
.short('c')
|
||||
.value_name("COUNT")
|
||||
.value_parser(clap::value_parser!(i64))
|
||||
.help("The maximum number of state groups to load into memroy at once")
|
||||
.long_help(concat!(
|
||||
"The number of state_groups to work on at once. All of the entries",
|
||||
@@ -82,12 +84,13 @@ fn main() {
|
||||
" chunk as a whole (which may well happen in rooms with lots",
|
||||
" of backfill in) then the entire chunk is skipped.)",
|
||||
))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(true),
|
||||
).arg(
|
||||
Arg::new("default_levels")
|
||||
.short('l')
|
||||
.value_name("LEVELS")
|
||||
.value_parser(clap::value_parser!(LevelInfo))
|
||||
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
|
||||
.long_help(concat!(
|
||||
"Sizes of each new level in the compression algorithm, as a comma separated list.",
|
||||
@@ -100,41 +103,43 @@ fn main() {
|
||||
" iterations needed to fetch a given set of state.",
|
||||
))
|
||||
.default_value("100,50,25")
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(false),
|
||||
).arg(
|
||||
Arg::new("number_of_chunks")
|
||||
.short('n')
|
||||
.value_name("CHUNKS_TO_COMPRESS")
|
||||
.value_parser(clap::value_parser!(i64))
|
||||
.help("The number of chunks to compress")
|
||||
.long_help(concat!(
|
||||
"This many chunks of the database will be compressed. The higher this number is set to, ",
|
||||
"the longer the compressor will run for."
|
||||
))
|
||||
.takes_value(true)
|
||||
.num_args(1)
|
||||
.required(true),
|
||||
).get_matches();
|
||||
|
||||
// The URL of the database
|
||||
let db_url = arguments
|
||||
.value_of("postgres-url")
|
||||
.get_one::<String>("postgres-url")
|
||||
.expect("A database url is required");
|
||||
|
||||
// The number of state groups to work on at once
|
||||
let chunk_size = arguments
|
||||
.value_of("chunk_size")
|
||||
.map(|s| s.parse().expect("chunk_size must be an integer"))
|
||||
.get_one("chunk_size")
|
||||
.copied()
|
||||
.expect("A chunk size is required");
|
||||
|
||||
// The default structure to use when compressing
|
||||
let default_levels = arguments
|
||||
.value_of_t::<LevelInfo>("default_levels")
|
||||
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
|
||||
.get_one::<LevelInfo>("default_levels")
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
// The number of rooms to compress with this tool
|
||||
let number_of_chunks = arguments
|
||||
.value_of("number_of_chunks")
|
||||
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
|
||||
.get_one("number_of_chunks")
|
||||
.copied()
|
||||
.expect("number_of_chunks is required");
|
||||
|
||||
// Connect to the database and create the 2 tables this tool needs
|
||||
|
||||
Reference in New Issue
Block a user