Skip to content

Commit

Permalink
Reduced response size
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 committed Jan 13, 2025
1 parent 3bff969 commit 22ce712
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 26 deletions.
2 changes: 1 addition & 1 deletion tests/python-e2e/app_chunked.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def do_GET(self):

# Send the response in chunks
chunks = [
"This is the first chunk.\n"*8000,
"This is the first chunk.\n"*800,
"This is the second chunk.\n"
]
for chunk in chunks:
Expand Down
77 changes: 52 additions & 25 deletions tests/src/traffic/steal.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
#![allow(dead_code, unused)]
#[cfg(test)]
mod steal_tests {
use std::{
io::{BufRead, BufReader, Read, Write},
net::{SocketAddr, TcpStream},
path::Path,
time::Duration,
};
use std::{net::SocketAddr, path::Path, time::Duration};

use futures_util::{SinkExt, StreamExt};
use hyper::{body, client::conn, Request, StatusCode};
use hyper_util::rt::TokioIo;
use k8s_openapi::api::core::v1::Pod;
use kube::{Api, Client};
use reqwest::{header::HeaderMap, Url};
use rstest::*;
use tokio::time::sleep;
use tempfile::tempdir;
use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
net::TcpStream,
time::sleep,
};
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, Message},
Expand Down Expand Up @@ -280,7 +277,7 @@ mod steal_tests {
.wait_for_line(Duration::from_secs(40), "daemon subscribed")
.await;

let mut tcp_stream = TcpStream::connect((addr, port as u16)).unwrap();
let mut tcp_stream = TcpStream::connect((addr, port as u16)).await.unwrap();

// Wait for the test app to close the socket and tell us about it.
process
Expand All @@ -289,10 +286,10 @@ mod steal_tests {

const DATA: &[u8; 16] = b"upper me please\n";

tcp_stream.write_all(DATA).unwrap();
tcp_stream.write_all(DATA).await.unwrap();

let mut response = [0u8; DATA.len()];
tcp_stream.read_exact(&mut response).unwrap();
tcp_stream.read_exact(&mut response).await.unwrap();

process
.write_to_stdin(b"Hey test app, please stop running and just exit successfuly.\n")
Expand Down Expand Up @@ -625,11 +622,11 @@ mod steal_tests {
.await;

let addr = SocketAddr::new(host.trim().parse().unwrap(), port as u16);
let mut stream = TcpStream::connect(addr).unwrap();
stream.write_all(tcp_data.as_bytes()).unwrap();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write_all(tcp_data.as_bytes()).await.unwrap();
let mut reader = BufReader::new(stream);
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
reader.read_line(&mut buf).await.unwrap();
println!("Got response: {buf}");
// replace "remote: " with empty string, since the response can be split into frames
// and we just need assert the final response
Expand Down Expand Up @@ -825,6 +822,29 @@ mod steal_tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[timeout(Duration::from_secs(120))]
async fn issue_3013(#[future] tcp_echo_service: KubeService, #[future] kube_client: Client) {
let config_file = tempdir().unwrap();
let config = serde_json::json!(
{
"feature": {
"network": {
"outgoing": false,
"dns": false,
"incoming": {
"mode": "steal",
"http_filter": {
"header_filter": "x-filter: yes",
},
}
},
"fs": "local",
},
}
);
let config_path = config_file.path().join("config.json");
tokio::fs::write(&config_path, serde_json::to_string_pretty(&config).unwrap())
.await
.unwrap();

let service = tcp_echo_service.await;
let kube_client = kube_client.await;
let (host, port) = get_service_host_and_port(kube_client.clone(), &service).await;
Expand All @@ -833,25 +853,24 @@ mod steal_tests {
.run(
&service.pod_container_target(),
Some(&service.namespace),
Some(vec!["--steal"]),
Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]),
Some(vec!["-f", &config_path.display().to_string()]),
None,
)
.await;
mirrorded_process
.wait_for_line(Duration::from_secs(40), "daemon subscribed")
.await;
println!("Application subscribed port");

let url = get_service_url(kube_client.clone(), &service).await;
let client = reqwest::Client::new();
let request = client
.get(&url)
.get(format!("http://{}:{port}", host.trim()))
.version(reqwest::Version::HTTP_11)
.header("x-filter", "yes")
.build()
.unwrap();
let expected_response = std::iter::repeat("This is the first chunk.\n".as_bytes())
.take(8000)
.take(800)
.chain(std::iter::once("This is the second chunk.\n".as_bytes()))
.flatten()
.copied()
Expand All @@ -867,8 +886,12 @@ mod steal_tests {
println!("First response received, reading body");
let body = response.bytes().await.unwrap();
println!("Finished reading first response's body");
assert_eq!(body.len(), expected_response.len(), "unexpected response length");
assert!(body == expected_response, "unexpected response"); // don't use `assert_eq`, we don't want to log 200kb strings
assert_eq!(
body.len(),
expected_response.len(),
"unexpected response length"
);
assert!(body == expected_response, "unexpected response"); // don't use `assert_eq`, we don't want to log 20kb strings

println!("Sending second request");
let response = client
Expand All @@ -880,8 +903,12 @@ mod steal_tests {
println!("Second response received, reading body");
let body = response.bytes().await.unwrap();
println!("Finished reading second response's body");
assert_eq!(body.len(), expected_response.len(), "unexpected response length");
assert!(body == expected_response, "unexpected response"); // don't use `assert_eq`, we don't want to log 200kb strings
assert_eq!(
body.len(),
expected_response.len(),
"unexpected response length"
);
assert!(body == expected_response, "unexpected response"); // don't use `assert_eq`, we don't want to log 20kb strings

mirrorded_process.child.start_kill().unwrap();
mirrorded_process.wait_assert_fail().await;
Expand Down

0 comments on commit 22ce712

Please sign in to comment.