Skip to content

Commit 68f9028

Browse files
authored
Merge pull request #309 from haardikk21/brotli-compression-on-proxy
ws proxy: brotli compress each message from upstream before sending downstream
2 parents 4f2f07f + efaaeb6 commit 68f9028

File tree

7 files changed

+38
-7
lines changed

7 files changed

+38
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/websocket-proxy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ redis = "0.30.0"
3232
redis-test = { version = "0.10.0", optional = true }
3333
uuid = { version = "1.16.0", features = ["v4"] }
3434
tokio-util = "0.7.12"
35+
brotli = "8.0.1"
3536

3637
[dependencies.ring]
3738
version = "0.17.12"

crates/websocket-proxy/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,8 @@ When Redis is enabled, the following features are available:
6262

6363
If the Redis connection fails, the proxy will automatically fall back to in-memory rate limiting.
6464

65+
### Brotli Compression
66+
67+
The proxy supports compressing messages to downstream clients using Brotli.
68+
69+
To enable this, pass the parameter `--enable-compression`

crates/websocket-proxy/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ impl ClientConnection {
1818
}
1919
}
2020

21-
pub async fn send(&mut self, data: String) -> Result<(), Error> {
22-
self.websocket.send(data.into_bytes().into()).await
21+
pub async fn send(&mut self, data: Vec<u8>) -> Result<(), Error> {
22+
self.websocket.send(data.into()).await
2323
}
2424

2525
pub fn id(&self) -> String {

crates/websocket-proxy/src/integration.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod test {
2727
server: Server,
2828
server_addr: SocketAddr,
2929
client_id_to_handle: HashMap<usize, JoinHandle<()>>,
30-
sender: Sender<String>,
30+
sender: Sender<Vec<u8>>,
3131
}
3232

3333
impl TestHarness {
@@ -153,7 +153,10 @@ mod test {
153153
}
154154

155155
fn send_messages(&mut self, messages: Vec<&str>) {
156-
let messages: Vec<String> = messages.into_iter().map(String::from).collect();
156+
let messages: Vec<Vec<u8>> = messages
157+
.into_iter()
158+
.map(|m| m.as_bytes().to_vec())
159+
.collect();
157160

158161
for message in messages.iter() {
159162
match self.sender.send(message.clone()) {

crates/websocket-proxy/src/main.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use clap::Parser;
1818
use dotenvy::dotenv;
1919
use metrics_exporter_prometheus::PrometheusBuilder;
2020
use rate_limit::RedisRateLimit;
21+
use std::io::Write;
2122
use std::net::SocketAddr;
2223
use std::sync::Arc;
2324
use tokio::signal::unix::{signal, SignalKind};
@@ -68,6 +69,13 @@ struct Args {
6869
help = "Maximum number of concurrently connected clients"
6970
)]
7071
per_ip_connections_limit: usize,
72+
#[arg(
73+
long,
74+
env,
75+
default_value = "false",
76+
help = "Enable brotli compression on messages to downstream clients"
77+
)]
78+
enable_compression: bool,
7179

7280
#[arg(
7381
long,
@@ -208,7 +216,20 @@ async fn main() {
208216
.active_connections
209217
.set((send.receiver_count() - 1) as f64);
210218

211-
match send.send(data) {
219+
let message_data = if args.enable_compression {
220+
let data_bytes = data.as_bytes();
221+
let mut compressed_data_bytes = Vec::new();
222+
{
223+
let mut compressor =
224+
brotli::CompressorWriter::new(&mut compressed_data_bytes, 4096, 5, 22);
225+
compressor.write_all(data_bytes).unwrap();
226+
}
227+
compressed_data_bytes
228+
} else {
229+
data.into_bytes()
230+
};
231+
232+
match send.send(message_data) {
212233
Ok(_) => (),
213234
Err(e) => error!(message = "failed to send data", error = e.to_string()),
214235
}

crates/websocket-proxy/src/registry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use tracing::{info, trace, warn};
77

88
#[derive(Clone)]
99
pub struct Registry {
10-
sender: Sender<String>,
10+
sender: Sender<Vec<u8>>,
1111
metrics: Arc<Metrics>,
1212
}
1313

1414
impl Registry {
15-
pub fn new(sender: Sender<String>, metrics: Arc<Metrics>) -> Self {
15+
pub fn new(sender: Sender<Vec<u8>>, metrics: Arc<Metrics>) -> Self {
1616
Self { sender, metrics }
1717
}
1818

0 commit comments

Comments
 (0)