Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 137 additions & 16 deletions tools/src/bin/tquic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,35 @@ pub struct ClientOpt {
#[clap(value_delimiter = ' ')]
pub urls: Vec<Url>,

/// HTTP request method.
#[clap(
long,
default_value = "GET",
value_name = "METHOD",
help_heading = "Request"
)]
pub method: String,

/// HTTP request body file.
#[clap(long, value_name = "FILE", help_heading = "Request")]
pub body: Option<String>,

/// Client certificate file for mutual authentication.
#[clap(long, value_name = "FILE", help_heading = "TLS")]
pub client_cert: Option<String>,

/// Client private key file for mutual authentication.
#[clap(long, value_name = "FILE", help_heading = "TLS")]
pub client_key: Option<String>,

/// CA certificate file for server verification.
#[clap(long, value_name = "FILE", help_heading = "TLS")]
pub ca_cert: Option<String>,

/// Enable 0RTT data sending.
#[clap(long, help_heading = "Protocol")]
pub enable_0rtt: bool,

/// Number of threads.
#[clap(
short,
Expand Down Expand Up @@ -559,9 +588,22 @@ impl Worker {
config.enable_encryption(!option.disable_encryption);
let mut tls_config = TlsConfig::new_client_config(
ApplicationProto::convert_to_vec(&option.alpn),
option.enable_early_data,
option.enable_early_data || option.enable_0rtt,
)?;

// Configure mutual authentication if client certificate and key are provided
if let (Some(client_cert), Some(client_key)) = (&option.client_cert, &option.client_key) {
tls_config.set_certificate_file(client_cert)?;
tls_config.set_private_key_file(client_key)?;
info!("Enabled client certificate authentication");
}

// Configure CA certificate for server verification if provided
if let Some(ca_cert) = &option.ca_cert {
tls_config.set_ca_certs(ca_cert)?;
info!("Enabled CA certificate verification");
}

// Configure certificate compression if specified
if !option.certificate_compression.is_empty() {
let compression_algorithms: Vec<CertCompressionAlgorithm> = option
Expand Down Expand Up @@ -872,8 +914,9 @@ impl WorkerContext {

struct Request {
url: Url,
line: String, // Used in http/0.9.
headers: Vec<Header>, // Used in h3.
line: String, // Used in http/0.9.
headers: Vec<Header>, // Used in h3.
body: Option<Vec<u8>>, // Request body data
response_writer: Option<std::io::BufWriter<std::fs::File>>,
start_time: Option<Instant>,
}
Expand Down Expand Up @@ -911,7 +954,21 @@ impl Request {
}
}

// TODO: support custom headers.
// Load request body from file
fn load_body(body_file: &Option<String>) -> Option<Vec<u8>> {
if let Some(file_path) = body_file {
match std::fs::read(file_path) {
Ok(data) => Some(data),
Err(e) => {
error!("failed to read body file {}: {:?}", file_path, e);
None
}
}
} else {
None
}
}

fn new(
method: &str,
url: &Url,
Expand Down Expand Up @@ -945,8 +1002,9 @@ impl Request {
}
Self {
url: url.clone(),
line: format!("GET {}\r\n", url.path()),
line: format!("{} {}\r\n", method, url.path()),
headers,
body: body.clone(),
response_writer: Self::make_response_writer(url, dump_dir),
start_time: None,
}
Expand Down Expand Up @@ -1060,11 +1118,22 @@ impl RequestSender {

fn send_request(&mut self, conn: &mut Connection) -> Result<()> {
let url = &self.option.urls[self.current_url_idx];
let mut request =
Request::new("GET", url, &None, &self.option.dump_dir, &self.option.range);

// Load request body if specified
let body = Request::load_body(&self.option.body);

let mut request = Request::new(
&self.option.method,
url,
&body,
&self.option.dump_dir,
&self.option.range,
);

debug!(
"{} send request {} current index {}",
"{} send {} request {} current index {}",
conn.trace_id(),
self.option.method,
url,
self.current_url_idx
);
Expand Down Expand Up @@ -1092,10 +1161,12 @@ impl RequestSender {

fn send_http09_request(&mut self, conn: &mut Connection, request: &Request) -> Result<u64> {
let s = self.next_stream_id;

// Send request line
match conn.stream_write(
self.next_stream_id,
Bytes::copy_from_slice(request.line.as_bytes()),
true,
request.body.is_none(), // Only finish if no body
) {
Ok(v) => v,
Err(tquic::error::Error::StreamLimitError) => {
Expand All @@ -1107,6 +1178,28 @@ impl RequestSender {
);
}
};

// Send request body if present
if let Some(ref body) = request.body {
match conn.stream_write(
self.next_stream_id,
Bytes::copy_from_slice(body),
true, // Finish stream after body
) {
Ok(v) => v,
Err(tquic::error::Error::StreamLimitError) => {
return Err("stream limit reached".to_string().into());
}
Err(e) => {
return Err(format!(
"failed to send request body {:?}, error: {:?}",
request.url, e
)
.into());
}
};
}

self.next_stream_id += 4;
Ok(s)
}
Expand All @@ -1124,12 +1217,13 @@ impl RequestSender {
}
};

match self
.h3_conn
.as_mut()
.unwrap()
.send_headers(conn, s, &request.headers, true)
{
// Send headers
match self.h3_conn.as_mut().unwrap().send_headers(
conn,
s,
&request.headers,
request.body.is_none(),
) {
Ok(v) => v,
Err(tquic::h3::Http3Error::StreamBlocked) => {
return Err("stream is blocked".to_string().into());
Expand All @@ -1141,6 +1235,28 @@ impl RequestSender {
}
};

// Send body if present
if let Some(ref body) = request.body {
match self.h3_conn.as_mut().unwrap().send_body(
conn,
s,
Bytes::copy_from_slice(body),
true,
) {
Ok(v) => v,
Err(tquic::h3::Http3Error::StreamBlocked) => {
return Err("stream is blocked".to_string().into());
}
Err(e) => {
return Err(format!(
"failed to send request body {:?}, error: {:?}",
request.url, e
)
.into());
}
};
}

Ok(s)
}

Expand Down Expand Up @@ -1426,7 +1542,12 @@ impl TransportHandler for WorkerHandler {
}
}

if conn.is_in_early_data() {
// If 0RTT is enabled and connection is in early data state, send requests immediately
if self.option.enable_0rtt && conn.is_in_early_data() {
debug!(
"{} 0RTT enabled, sending requests in early data",
conn.trace_id()
);
self.try_new_request_sender(conn);
}
}
Expand Down
40 changes: 38 additions & 2 deletions tools/src/bin/tquic_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tquic_tools::Result;
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[clap(name = "server", version=env!("CARGO_PKG_VERSION"))]
pub struct ServerOpt {
/// Address to listen.
Expand All @@ -75,6 +75,18 @@ pub struct ServerOpt {
#[clap(short, long = "key", default_value = "./cert.key", value_name = "FILE")]
pub key_file: String,

/// CA certificate file for client verification (mutual authentication).
#[clap(long, value_name = "FILE", help_heading = "TLS")]
pub ca_cert: Option<String>,

/// Enable client certificate verification.
#[clap(long, help_heading = "TLS")]
pub require_client_cert: bool,

/// Enable 0RTT data acceptance.
#[clap(long, help_heading = "Protocol")]
pub enable_0rtt: bool,

/// Document root directory.
#[clap(short, long, default_value = "./", value_name = "DIR")]
pub root: String,
Expand Down Expand Up @@ -311,12 +323,23 @@ impl Server {
&option.cert_file,
&option.key_file,
application_protos,
true,
option.enable_0rtt,
)?;
let mut ticket_key = option.ticket_key.clone().into_bytes();
ticket_key.resize(48, 0);
tls_config.set_ticket_key(&ticket_key)?;

// Configure mutual authentication if CA certificate is provided
if let Some(ca_cert) = &option.ca_cert {
tls_config.set_ca_certs(ca_cert)?;
if option.require_client_cert {
tls_config.set_verify(true);
info!("Enabled client certificate verification (mutual authentication)");
} else {
info!("Enabled CA certificate verification (optional client cert)");
}
}

// Configure certificate compression if specified
if !option.certificate_compression.is_empty() {
let compression_algorithms: Vec<CertCompressionAlgorithm> = option
Expand Down Expand Up @@ -945,6 +968,9 @@ struct ServerHandler {
/// File root directory.
root: String,

/// Server options
option: ServerOpt,

/// HTTP connections
conns: FxHashMap<u64, ConnectionHandler>,

Expand Down Expand Up @@ -972,6 +998,7 @@ impl ServerHandler {

Ok(Self {
root: option.root.clone(),
option: option.clone(),
buf: vec![0; MAX_BUF_SIZE],
conns: FxHashMap::default(),
keylog,
Expand Down Expand Up @@ -1035,6 +1062,15 @@ impl TransportHandler for ServerHandler {
error!("{} set qlog {:?} failed", conn.trace_id(), qlog_file);
}
}

// If 0RTT is enabled and connection is in early data state, process requests immediately
if self.option.enable_0rtt && conn.is_in_early_data() {
debug!(
"{} 0RTT enabled, processing requests in early data",
conn.trace_id()
);
self.try_new_conn_handler(conn);
}
}

fn on_conn_established(&mut self, conn: &mut Connection) {
Expand Down
Loading