Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions collector/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log = "0.4"
env_logger = "0.10"
chunked_transfer = "1.5"
num_cpus = "1.16"
flate2 = "1.0"

# Web server dependencies
hyper = { version = "1.0", features = ["full"] }
Expand Down
335 changes: 335 additions & 0 deletions collector/src/framework/analyzers/http_decompressor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
use super::{Analyzer, AnalyzerError};
use super::event::HTTPEvent;
use crate::framework::runners::EventStream;
use crate::framework::core::Event;
use async_trait::async_trait;
use futures::stream::StreamExt;
use flate2::read::GzDecoder;
use std::io::Read;

/// HTTP Decompressor Analyzer that decompresses gzip/deflate encoded HTTP response bodies
pub struct HTTPDecompressor {
/// Flag to keep the raw compressed data alongside decompressed data
keep_compressed: bool,
}

impl HTTPDecompressor {
/// Create a new HTTPDecompressor with default settings
pub fn new() -> Self {
HTTPDecompressor {
keep_compressed: false,
}
}

/// Keep the compressed data in the event (adds a compressed_body field)
#[allow(dead_code)]
pub fn keep_compressed(mut self) -> Self {
self.keep_compressed = true;
self
}

/// Check if the HTTP event has gzip encoding
fn has_gzip_encoding(http_event: &HTTPEvent) -> bool {
http_event.headers.get("content-encoding")
.map(|v| v.to_lowercase().contains("gzip"))
.unwrap_or(false)
}

/// Check if the HTTP event has deflate encoding
fn has_deflate_encoding(http_event: &HTTPEvent) -> bool {
http_event.headers.get("content-encoding")
.map(|v| v.to_lowercase().contains("deflate"))
.unwrap_or(false)
}

/// Decode JSON-escaped string to raw bytes (handles the eBPF output format)
fn decode_json_escaped_string(s: &str) -> Vec<u8> {
let mut result = Vec::new();
for c in s.chars() {
let cp = c as u32;
if cp < 256 {
result.push(cp as u8);
} else {
// This came from valid UTF-8 in binary data
let utf8_bytes = c.to_string().into_bytes();
result.extend_from_slice(&utf8_bytes);
}
}
result
}

/// Try to decompress gzip data
fn decompress_gzip(data: &[u8]) -> Result<String, String> {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();

decoder.read_to_end(&mut decompressed)
.map_err(|e| format!("Gzip decompression failed: {}", e))?;

String::from_utf8(decompressed)
.map_err(|e| format!("UTF-8 conversion failed: {}", e))
}

/// Try to decompress deflate data
fn decompress_deflate(data: &[u8]) -> Result<String, String> {
use flate2::read::DeflateDecoder;
let mut decoder = DeflateDecoder::new(data);
let mut decompressed = Vec::new();

decoder.read_to_end(&mut decompressed)
.map_err(|e| format!("Deflate decompression failed: {}", e))?;

Comment on lines +75 to +81
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Content-Encoding: deflate in HTTP is commonly a zlib-wrapped stream (RFC 2616 ambiguity), but this implementation only tries DeflateDecoder (raw DEFLATE). This will cause decompression failures for many real servers. Consider attempting flate2::read::ZlibDecoder first (or as a fallback when raw deflate fails).

Suggested change
use flate2::read::DeflateDecoder;
let mut decoder = DeflateDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)
.map_err(|e| format!("Deflate decompression failed: {}", e))?;
use flate2::read::{DeflateDecoder, ZlibDecoder};
let mut decompressed = Vec::new();
// First, try as a zlib-wrapped deflate stream (common HTTP behavior)
let zlib_result = {
let mut zlib_decoder = ZlibDecoder::new(data);
zlib_decoder.read_to_end(&mut decompressed)
};
if zlib_result.is_err() {
// Clear any partial data and fall back to raw DEFLATE
decompressed.clear();
let mut deflate_decoder = DeflateDecoder::new(data);
deflate_decoder
.read_to_end(&mut decompressed)
.map_err(|e| format!("Deflate decompression failed: {}", e))?;
}

Copilot uses AI. Check for mistakes.
String::from_utf8(decompressed)
.map_err(|e| format!("UTF-8 conversion failed: {}", e))
}

/// Handle chunked transfer encoding - extract the actual data from chunks
/// Works with bytes instead of strings to handle binary gzip data
fn extract_from_chunked(body: &str) -> Option<Vec<u8>> {
// Parse chunked transfer encoding format:
// <chunk-size-hex>\r\n<chunk-data>\r\n...
let mut result = Vec::new();

// Convert string to bytes for binary-safe processing
let body_bytes = Self::decode_json_escaped_string(body);
let mut pos = 0;

loop {
// Find the first \r\n which separates chunk size from data
let newline_pos = body_bytes[pos..].windows(2)
.position(|w| w == b"\r\n")
.map(|p| pos + p);

if let Some(newline_pos) = newline_pos {
// Extract chunk size string (should be ASCII hex digits)
let chunk_size_bytes = &body_bytes[pos..newline_pos];
let chunk_size_str = match std::str::from_utf8(chunk_size_bytes) {
Ok(s) => s,
Err(_) => return None, // Invalid UTF-8 in chunk size
};

// Parse chunk size as hex
let chunk_size = match usize::from_str_radix(chunk_size_str.trim(), 16) {
Ok(size) => size,
Err(_) => return None, // Invalid chunk size
};

Comment on lines +104 to +116
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract_from_chunked parses the chunk size line with from_str_radix on the entire line. This will fail for valid chunk extensions (e.g. 1a;foo=bar) and cause chunked bodies to fall back to direct decoding (and likely decompression failure). Consider parsing only up to the first ; before hex decoding, and validating/consuming the required \r\n after each chunk.

Copilot uses AI. Check for mistakes.
// If chunk size is 0, we've reached the end
if chunk_size == 0 {
break;
}

// Extract chunk data (binary safe)
let data_start = newline_pos + 2; // Skip \r\n
if data_start + chunk_size > body_bytes.len() {
return None; // Incomplete chunk
}

let chunk_data = &body_bytes[data_start..data_start + chunk_size];
result.extend_from_slice(chunk_data);

// Move to next chunk (skip chunk data and trailing \r\n)
pos = data_start + chunk_size + 2;

if pos >= body_bytes.len() {
break;
}
} else {
break;
}
}

if result.is_empty() {
None
} else {
Some(result)
}
}

/// Process an HTTP event and decompress if needed
fn process_http_event(
mut event: Event,
keep_compressed: bool,
) -> Event {
// Try to deserialize as HTTPEvent
let http_event: HTTPEvent = match serde_json::from_value(event.data.clone()) {
Ok(h) => h,
Err(_) => return event, // Not an HTTP event, pass through
};

// Only process responses with body
if http_event.message_type != "response" || http_event.body.is_none() {
return event;
}

let body = http_event.body.as_ref().unwrap();
let is_gzip = Self::has_gzip_encoding(&http_event);
let is_deflate = Self::has_deflate_encoding(&http_event);

if !is_gzip && !is_deflate {
return event; // No compression
}

// Extract bytes from the body
let body_bytes = if http_event.is_chunked {
// Handle chunked transfer encoding
match Self::extract_from_chunked(body) {
Some(bytes) => bytes,
None => {
// Failed to parse chunks, try direct decoding
Self::decode_json_escaped_string(body)
}
}
} else {
Self::decode_json_escaped_string(body)
};

// Try to decompress
let decompressed = if is_gzip {
Self::decompress_gzip(&body_bytes)
} else {
Self::decompress_deflate(&body_bytes)
};

match decompressed {
Ok(decompressed_text) => {
// Update the event data with decompressed body
if let Some(data) = event.data.as_object_mut() {
// Update body with decompressed content
data.insert("body".to_string(), serde_json::json!(decompressed_text));

// Optionally keep the compressed data
if keep_compressed {
data.insert("compressed_body".to_string(), serde_json::json!(body));
}

// Add decompression metadata
data.insert("decompressed".to_string(), serde_json::json!(true));
data.insert("original_encoding".to_string(),
serde_json::json!(if is_gzip { "gzip" } else { "deflate" }));
data.insert("decompressed_size".to_string(),
serde_json::json!(decompressed_text.len()));
data.insert("compressed_size".to_string(),
serde_json::json!(body_bytes.len()));
}
event
}
Err(_err) => {
// Decompression failed, pass through original event
// Optionally add error metadata
if let Some(data) = event.data.as_object_mut() {
data.insert("decompression_failed".to_string(), serde_json::json!(true));
Comment on lines +217 to +221
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When decompression fails, the error is discarded (Err(_err)), making it hard to debug why decompression failed in the field. Consider storing a short error string in the event (e.g. decompression_error) in addition to decompression_failed, potentially behind a debug flag if size/noise is a concern.

Suggested change
Err(_err) => {
// Decompression failed, pass through original event
// Optionally add error metadata
if let Some(data) = event.data.as_object_mut() {
data.insert("decompression_failed".to_string(), serde_json::json!(true));
Err(err) => {
// Decompression failed, pass through original event
// Optionally add error metadata
if let Some(data) = event.data.as_object_mut() {
data.insert("decompression_failed".to_string(), serde_json::json!(true));
// Store a short error string to aid debugging. Truncate to limit size.
let err_str = err.to_string();
let max_len = 256;
let truncated = if err_str.len() > max_len {
format!("{}...", &err_str[..max_len.saturating_sub(3)])
} else {
err_str
};
data.insert("decompression_error".to_string(), serde_json::json!(truncated));

Copilot uses AI. Check for mistakes.
}
event
}
}
}
}

#[async_trait]
impl Analyzer for HTTPDecompressor {
async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
let keep_compressed = self.keep_compressed;

let processed_stream = stream.map(move |event| {
// Only process HTTP parser events
if event.source == "http_parser" {
Self::process_http_event(event, keep_compressed)
} else {
event // Pass through other events
}
});

Ok(Box::pin(processed_stream))
}

fn name(&self) -> &str {
"HTTPDecompressor"
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

#[test]
fn test_extract_from_chunked() {
// Test chunked transfer encoding with simple text
let chunked_data = "e\r\ntest data here\r\n0\r\n\r\n";
let result = HTTPDecompressor::extract_from_chunked(chunked_data);
assert!(result.is_some());
let bytes = result.unwrap();
assert_eq!(bytes, b"test data here");
}

#[test]
fn test_has_gzip_encoding() {
let mut headers = HashMap::new();
headers.insert("content-encoding".to_string(), "gzip".to_string());

let event = HTTPEvent::new(
1,
"response".to_string(),
"HTTP/1.1 200 OK".to_string(),
None,
None,
Some("HTTP/1.1".to_string()),
Some(200),
Some("OK".to_string()),
headers,
Some("test".to_string()),
100,
true,
false,
None,
"ssl".to_string(),
);

assert!(HTTPDecompressor::has_gzip_encoding(&event));
}

#[test]
fn test_has_deflate_encoding() {
let mut headers = HashMap::new();
headers.insert("content-encoding".to_string(), "deflate".to_string());

let event = HTTPEvent::new(
1,
"response".to_string(),
"HTTP/1.1 200 OK".to_string(),
None,
None,
Some("HTTP/1.1".to_string()),
Some(200),
Some("OK".to_string()),
headers,
Some("test".to_string()),
100,
true,
false,
None,
"ssl".to_string(),
);

assert!(HTTPDecompressor::has_deflate_encoding(&event));
}

#[test]
fn test_decompress_gzip_real_data() {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write as IoWrite;

// Create actual gzip compressed data
let original_text = "Hello, this is test data for gzip decompression!";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(original_text.as_bytes()).unwrap();
let compressed = encoder.finish().unwrap();

// Decompress using our function
let result = HTTPDecompressor::decompress_gzip(&compressed);
assert!(result.is_ok());
assert_eq!(result.unwrap(), original_text);
}
}
Loading