Conversation
… add tests and example server script
…ked HTTP messages; improve chunked gzip decompression tests
|
@copilot help me fix ci |
There was a problem hiding this comment.
Pull request overview
This PR adds automatic decompression of gzip/deflate HTTP response bodies to the collector’s analyzer pipeline, primarily to make captured SSL HTTP traffic easier to inspect (e.g., OpenAI API responses). It also introduces an SSL event merger intended to reconstruct full HTTP messages spanning multiple SSL read calls.
Changes:
- Added
HTTPDecompressoranalyzer to detectContent-Encoding: gzip/deflateon parsed HTTP responses and replace the body with decompressed content. - Added
SSLMergeranalyzer to merge consecutivesslREAD/RECVevents into a single event before parsing as HTTP. - Added
flate2dependency and new repro/tests/scripts related to gzip/chunked handling.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| script/test-python/test_openai_simple.py | Script to generate OpenAI API traffic for capture testing. |
| script/test-python/test_gzip_http.py | Script to generate local gzip-encoded HTTP traffic for capture testing. |
| collector/src/main.rs | Wires SSLMerger/HTTPDecompressor into analyzer pipelines. |
| collector/src/framework/analyzers/ssl_merger.rs | New analyzer to buffer/merge SSL reads into “complete” HTTP messages. |
| collector/src/framework/analyzers/http_decompressor.rs | New analyzer implementing gzip/deflate decompression (incl. chunked extraction). |
| collector/src/framework/analyzers/http_decompressor_test.rs | Added test file for decompressor logic (currently not wired into module tree). |
| collector/src/framework/analyzers/mod.rs | Exposes ssl_merger and http_decompressor modules/types. |
| collector/tests/test_chunked_gzip_bug.rs | Integration test reproducing chunked+gzip decompression scenario. |
| collector/test_chunked_gzip.rs | Standalone repro executable placed at crate root. |
| collector/Cargo.toml | Adds flate2 dependency. |
| collector/Cargo.lock | Locks flate2 (and transitive deps). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Add analyzers based on flags - when HTTP parser is enabled, always enable SSE merge first | ||
| if enable_http_parser { | ||
| ssl_runner = ssl_runner.add_analyzer(Box::new(SSEProcessor::new_with_timeout(30000))); | ||
|
|
||
| // Create HTTP parser with appropriate configuration | ||
| let http_parser = if include_raw_data { | ||
| HTTPParser::new() | ||
| } else { | ||
| HTTPParser::new().disable_raw_data() | ||
| }; | ||
| ssl_runner = ssl_runner.add_analyzer(Box::new(http_parser)); | ||
|
|
||
|
|
||
| // Add HTTP decompressor to decompress gzip/deflate responses | ||
| ssl_runner = ssl_runner.add_analyzer(Box::new(HTTPDecompressor::new())); | ||
|
|
There was a problem hiding this comment.
HTTPDecompressor is added when enable_http_parser is true, but there’s no corresponding SSL read merging in this code path. Since HTTPParser operates on single SSL events, chunked/compressed responses spanning multiple READs may never be fully available to decompress, leading to persistent decompression_failed events. Consider adding SSLMerger (or equivalent buffering) in this pipeline when HTTP parsing/decompression is enabled, or updating HTTPParser to accumulate per connection/thread.
| // Check if it's chunked encoding | ||
| let is_chunked = headers.to_lowercase().contains("transfer-encoding: chunked"); | ||
|
|
||
| if is_chunked { | ||
| // For chunked encoding, check if we have the terminating chunk (0\r\n\r\n) | ||
| return body.ends_with("0\r\n\r\n") || body.contains("\r\n0\r\n\r\n"); | ||
| } else { | ||
| // For non-chunked, check Content-Length | ||
| if let Some(cl_start) = headers.to_lowercase().find("content-length:") { | ||
| let cl_line = &headers[cl_start..]; | ||
| if let Some(cl_end) = cl_line.find("\r\n") { | ||
| let cl_value = &cl_line[15..cl_end].trim(); | ||
| if let Ok(content_length) = cl_value.parse::<usize>() { | ||
| return body.len() >= content_length; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // If no Content-Length and not chunked, consider it complete | ||
| // (some responses like 204 No Content have no body) | ||
| true |
There was a problem hiding this comment.
Chunked completion detection only treats 0\r\n\r\n as the terminator. Chunked responses with trailer headers end with 0\r\n<trailers>\r\n, so this will never mark them complete and will rely on timeout flushing. Consider parsing the chunked framing more robustly (or at least detecting \r\n0\r\n followed by a final \r\n).
| // Extract chunk size string (should be ASCII hex digits) | ||
| let chunk_size_bytes = &body_bytes[pos..newline_pos]; | ||
| let chunk_size_str = match std::str::from_utf8(chunk_size_bytes) { | ||
| Ok(s) => s, | ||
| Err(_) => return None, // Invalid UTF-8 in chunk size | ||
| }; | ||
|
|
||
| // Parse chunk size as hex | ||
| let chunk_size = match usize::from_str_radix(chunk_size_str.trim(), 16) { | ||
| Ok(size) => size, | ||
| Err(_) => return None, // Invalid chunk size | ||
| }; | ||
|
|
There was a problem hiding this comment.
extract_from_chunked parses the chunk size line with from_str_radix on the entire line. This will fail for valid chunk extensions (e.g. 1a;foo=bar) and cause chunked bodies to fall back to direct decoding (and likely decompression failure). Consider parsing only up to the first ; before hex decoding, and validating/consuming the required \r\n after each chunk.
| use super::*; | ||
| use crate::framework::core::Event; | ||
| use crate::framework::analyzers::event::HTTPEvent; | ||
| use std::collections::HashMap; | ||
| use flate2::write::GzEncoder; | ||
| use flate2::Compression; | ||
| use std::io::Write; | ||
|
|
||
| #[cfg(test)] | ||
| mod http_decompressor_tests { | ||
| use super::*; | ||
|
|
There was a problem hiding this comment.
This file won’t be compiled or executed by Rust’s test runner unless it’s included from the module tree (e.g. #[cfg(test)] mod http_decompressor_test; in analyzers/mod.rs or by moving it under collector/tests/ as an integration test). As-is, the PR’s “dedicated test suite” here won’t run in CI.
| fn main() { | ||
| // This is the actual chunked gzip data from OpenAI API response | ||
| // Captured from: {"function":"READ/RECV",...,"data":"..."} | ||
|
|
||
| // First chunk with gzip header (size: f = 15 bytes) | ||
| let chunk1 = "f\r\n\u{1f}\u{8b}\u{08}\u{00}\u{00}\u{00}\u{00}\u{00}\u{00}\u{03}\u{00}\u{00}\u{00}\u{ff}\u{ff}\r\n"; | ||
|
|
||
| // Second chunk with compressed data (size: 199 = 409 bytes) |
There was a problem hiding this comment.
This appears to be a standalone debugging executable (has fn main() and lots of println!) placed at the crate root. Cargo won’t build/run it by default (binaries are usually under src/bin/ or declared in Cargo.toml), so it may be dead weight/confusing in the repository. Consider moving it to collector/examples/ or collector/src/bin/ (or removing it if it was only for local repro).
| Err(_err) => { | ||
| // Decompression failed, pass through original event | ||
| // Optionally add error metadata | ||
| if let Some(data) = event.data.as_object_mut() { | ||
| data.insert("decompression_failed".to_string(), serde_json::json!(true)); |
There was a problem hiding this comment.
When decompression fails, the error is discarded (Err(_err)), making it hard to debug why decompression failed in the field. Consider storing a short error string in the event (e.g. decompression_error) in addition to decompression_failed, potentially behind a debug flag if size/noise is a concern.
| Err(_err) => { | |
| // Decompression failed, pass through original event | |
| // Optionally add error metadata | |
| if let Some(data) = event.data.as_object_mut() { | |
| data.insert("decompression_failed".to_string(), serde_json::json!(true)); | |
| Err(err) => { | |
| // Decompression failed, pass through original event | |
| // Optionally add error metadata | |
| if let Some(data) = event.data.as_object_mut() { | |
| data.insert("decompression_failed".to_string(), serde_json::json!(true)); | |
| // Store a short error string to aid debugging. Truncate to limit size. | |
| let err_str = err.to_string(); | |
| let max_len = 256; | |
| let truncated = if err_str.len() > max_len { | |
| format!("{}...", &err_str[..max_len.saturating_sub(3)]) | |
| } else { | |
| err_str | |
| }; | |
| data.insert("decompression_error".to_string(), serde_json::json!(truncated)); |
| // Convert to the format that comes from eBPF (simulating JSON escaped string) | ||
| compressed.iter().map(|&b| { | ||
| if b < 128 { | ||
| (b as char).to_string() | ||
| } else { | ||
| format!("\\u{:04x}", b) | ||
| } | ||
| }).collect() |
There was a problem hiding this comment.
create_gzip_compressed_data constructs strings containing literal \u00XX sequences, but the decompressor’s decode_json_escaped_string expects the parsed JSON string to already contain the corresponding Unicode codepoints (0..=255) for those bytes. If this test file is wired into the module tree, this encoding mismatch will make the decompression test unreliable. Consider generating the body by mapping each byte to char::from(byte) (0..=255), or by building a JSON string with \u00XX escapes and parsing it to obtain the decoded String.
| // Convert to the format that comes from eBPF (simulating JSON escaped string) | |
| compressed.iter().map(|&b| { | |
| if b < 128 { | |
| (b as char).to_string() | |
| } else { | |
| format!("\\u{:04x}", b) | |
| } | |
| }).collect() | |
| // Represent each byte as a single char with the corresponding codepoint (0..=255). | |
| // This matches the expectation of decode_json_escaped_string, which operates on | |
| // a parsed JSON string whose contents are already decoded bytes, not literal | |
| // "\uXXXX" escape sequences. | |
| compressed.iter().map(|&b| char::from(b)).collect() |
| if ssl_http { | ||
| // Add SSL merger FIRST to combine consecutive READ events into complete HTTP messages | ||
| ssl_runner = ssl_runner.add_analyzer(Box::new(SSLMerger::new())); | ||
|
|
||
| ssl_runner = ssl_runner.add_analyzer(Box::new(SSEProcessor::new_with_timeout(30000))); | ||
|
|
There was a problem hiding this comment.
SSLMerger is inserted before SSEProcessor, but SSLMerger buffers SSL READ/RECV events until it sees a complete HTTP message (or times out). For streaming/SSE responses there is typically no terminating chunk, so this will delay or prevent SSEProcessor from seeing incremental chunks and can break SSE aggregation/latency. Consider moving SSLMerger after SSEProcessor (so SSE traffic is consumed first) and before HTTPParser, or make SSLMerger opt-in/conditional so it doesn't buffer SSE streams indefinitely.
…gth parsing (#25) * Initial plan * Fix CI build issues: initialize submodules, build eBPF binaries and frontend, fix ssl_merger test Co-authored-by: yunwei37 <[email protected]> * Suppress unused method warning for keep_compressed API method Co-authored-by: yunwei37 <[email protected]> * Fix Content-Length header parsing to prevent false matches and use constant instead of magic number Co-authored-by: yunwei37 <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: yunwei37 <[email protected]>
Co-authored-by: Copilot <[email protected]> Signed-off-by: 云微 <[email protected]>
Co-authored-by: Copilot <[email protected]> Signed-off-by: 云微 <[email protected]>
This pull request introduces a new HTTP decompression analyzer to the collector framework, enabling automatic decompression of gzip and deflate-encoded HTTP response bodies. The analyzer is integrated into the framework and includes comprehensive unit tests to verify its functionality. Key changes are grouped below:
New Feature: HTTP Decompression Analyzer
HTTPDecompressoranalyzer, which detects and decompresses gzip and deflate-encoded HTTP response bodies, updates event data with decompressed content, and optionally preserves the original compressed data. (collector/src/framework/analyzers/http_decompressor.rs)Framework Integration
mod.rs, makingHTTPDecompressoravailable for use in the analyzer pipeline. (collector/src/framework/analyzers/mod.rs) [1] [2]Dependency Management
flate2crate toCargo.tomlto support gzip and deflate decompression. (collector/Cargo.toml)Testing
collector/src/framework/analyzers/http_decompressor_test.rs)collector/src/framework/analyzers/http_decompressor.rs)