Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in RequestStream #271

Draft
wants to merge 35 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c6f0bb6
progress
Ruben2424 Dec 29, 2024
d301569
Merge branch 'Main' into fix-test-01
Ruben2424 Dec 29, 2024
521d0e0
fix test
Ruben2424 Dec 29, 2024
c346a20
remove redundant method for client/server data handling
Ruben2424 Dec 30, 2024
872c778
Merge branch 'Main' into fix-test-01
Ruben2424 Dec 30, 2024
27394ac
fmt
Ruben2424 Dec 30, 2024
61fbf98
notify connection on error and fix tests
Ruben2424 Jan 5, 2025
9b27d58
more test fixing
Ruben2424 Jan 5, 2025
8e4f2c2
correct duvet citation in test
Ruben2424 Jan 5, 2025
9479dcc
fix a test
Ruben2424 Jan 5, 2025
3f8c41a
remove panic
Ruben2424 Jan 6, 2025
f4532c1
fix tests
Ruben2424 Jan 6, 2025
c0141c4
fix the rest of the errors
Ruben2424 Jan 6, 2025
5fd0a81
progress new error type
Ruben2424 Jan 9, 2025
216a06f
trying refactored error types
Ruben2424 Jan 9, 2025
939cc28
start to rethink shared state by removing the global Arc<RwLock<>> an…
Ruben2424 Jan 11, 2025
8832ef5
discover problem with blocked accept function if a request stream is …
Ruben2424 Jan 11, 2025
e735ab3
some progress with new error types
Ruben2424 Jan 11, 2025
6c88591
fmt
Ruben2424 Jan 11, 2025
368bf80
accept should not block
Ruben2424 Jan 15, 2025
d037851
Merge branch 'Main' into fix-test-01
Ruben2424 Jan 15, 2025
eb9302f
Merge branch 'Main' into fix-test-01
Ruben2424 Jan 19, 2025
a44b89f
ignore wt for now, as more changes are needed. fix in a later commit
Ruben2424 Jan 19, 2025
5f77f64
make it compile again
Ruben2424 Jan 19, 2025
e76e5a1
H3_REQUEST_INCOMPLETE is now a stream error
Ruben2424 Jan 22, 2025
29e8d96
check error on resolver
Ruben2424 Jan 22, 2025
b8a733d
fix tests with new api
Ruben2424 Jan 22, 2025
8b77f92
fix last test
Ruben2424 Jan 22, 2025
9beb31c
Merge branch 'Main' into fix-test-01
Ruben2424 Jan 24, 2025
00d5e8e
progress on new types
Ruben2424 Jan 25, 2025
2e22c2e
progress
Ruben2424 Jan 26, 2025
6581bcf
progress
Ruben2424 Jan 27, 2025
b79e0fe
new error types, avoid allocation and fix some violations with unidir…
Ruben2424 Feb 2, 2025
84ae2a4
progress
Ruben2424 Feb 3, 2025
50745a6
progress
Ruben2424 Feb 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ path = "client.rs"
name = "server"
path = "server.rs"

[[example]]
name = "webtransport_server"
path = "webtransport_server.rs"
#[[example]]
#name = "webtransport_server"
#path = "webtransport_server.rs"
25 changes: 15 additions & 10 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc};

use bytes::{Bytes, BytesMut};
use http::{Request, StatusCode};
use http::StatusCode;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use structopt::StructOpt;
use tokio::{fs::File, io::AsyncReadExt};
use tracing::{error, info, trace_span};

use h3::{error::ErrorLevel, quic::BidiStream, server::RequestStream};
use h3::{error::ErrorLevel, server::RequestResolver};
use h3_quinn::quinn::{self, crypto::rustls::QuicServerConfig};

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -118,13 +118,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

loop {
match h3_conn.accept().await {
Ok(Some((req, stream))) => {
info!("new request: {:#?}", req);

Ok(Some(resolver)) => {
let root = root.clone();

tokio::spawn(async {
if let Err(e) = handle_request(req, stream, root).await {
if let Err(e) = handle_request(resolver, root).await {
error!("handling request failed: {}", e);
}
});
Expand Down Expand Up @@ -159,14 +157,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

async fn handle_request<T>(
req: Request<()>,
mut stream: RequestStream<T, Bytes>,
async fn handle_request<C>(
resolver: RequestResolver<C, Bytes>,
serve_root: Arc<Option<PathBuf>>,
) -> Result<(), Box<dyn std::error::Error>>
where
T: BidiStream<Bytes>,
C: h3::quic::Connection<Bytes>,
{
let (req, mut stream) = if let Some((req, stream)) = resolver.resolve_request().await? {
info!("received request: {:?}", req);
(req, stream)
} else {
info!("no request to resolve");
return Ok(());
};

let (status, to_serve) = match serve_root.as_deref() {
None => (StatusCode::OK, None),
Some(_) if req.uri().path().contains("..") => (StatusCode::NOT_FOUND, None),
Expand Down
4 changes: 3 additions & 1 deletion h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ where

/// Accepts an incoming bidirectional stream or request
pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, Error> {
// Get the next stream
todo!("fix webtransport later")
/* // Get the next stream
// Accept the incoming stream
let stream = poll_fn(|cx| {
let mut conn = self.server_conn.lock().unwrap();
Expand Down Expand Up @@ -230,6 +231,7 @@ where
}
}
}
*/
}

/// Open a new bidirectional stream
Expand Down
28 changes: 17 additions & 11 deletions h3/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

use crate::{
config::Config,
connection::{ConnectionInner, SharedStateRef},

Check warning on line 14 in h3/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / Lint

unused import: `SharedStateRef`
error::Error,
quic::{self},
shared_state::SharedState2,
};

use super::connection::{Connection, SendRequest};
Expand Down Expand Up @@ -107,25 +108,30 @@
B: Buf,
{
let open = quic.opener();
let conn_state = SharedStateRef::default();
let conn_state = Arc::new(SharedState2::default());

let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);

let inner = ConnectionInner::new(quic, conn_state.clone(), self.config).await?;
let send_request = SendRequest {
open,
conn_state: todo!(),
conn_state2: conn_state,
conn_waker,
max_field_section_size: self.config.settings.max_field_section_size,
sender_count: Arc::new(AtomicUsize::new(1)),
send_grease_frame: self.config.send_grease,
_buf: PhantomData,
error_sender: inner.error_sender.clone(),

Check failure on line 125 in h3/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / Lint

mismatched types
};

Ok((
Connection {
inner: ConnectionInner::new(quic, conn_state.clone(), self.config).await?,
inner,
sent_closing: None,
recv_closing: None,
},
SendRequest {
open,
conn_state,
conn_waker,
max_field_section_size: self.config.settings.max_field_section_size,
sender_count: Arc::new(AtomicUsize::new(1)),
send_grease_frame: self.config.send_grease,
_buf: PhantomData,
},
send_request,
))
}
}
7 changes: 7 additions & 0 deletions h3/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use futures_util::future;
use http::request;

use tokio::sync::mpsc::UnboundedSender;
#[cfg(feature = "tracing")]
use tracing::{info, instrument, trace};

Expand All @@ -20,6 +21,7 @@
proto::{frame::Frame, headers::Header, push::PushId},
qpack,
quic::{self, StreamId},
shared_state::SharedState2,
stream::{self, BufRecvStream},
};

Expand Down Expand Up @@ -109,12 +111,14 @@
{
pub(super) open: T,
pub(super) conn_state: SharedStateRef,
pub(super) conn_state2: Arc<SharedState2>,
pub(super) max_field_section_size: u64, // maximum size for a header we receive
// counts instances of SendRequest to close the connection when the last is dropped.
pub(super) sender_count: Arc<AtomicUsize>,
pub(super) conn_waker: Option<Waker>,
pub(super) _buf: PhantomData<fn(B)>,
pub(super) send_grease_frame: bool,
pub(super) error_sender: UnboundedSender<(Code, &'static str)>,
}

impl<T, B> SendRequest<T, B>
Expand Down Expand Up @@ -153,7 +157,7 @@
//# client MUST send only a single request on a given stream.
let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
.await
.map_err(|e| self.maybe_conn_err(e))?;

Check failure on line 160 in h3/src/client/connection.rs

View workflow job for this annotation

GitHub Actions / Lint

the trait bound `error::Error: std::convert::From<quic::StreamErrorIncoming>` is not satisfied

//= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
//= type=TODO
Expand Down Expand Up @@ -188,6 +192,7 @@
self.max_field_section_size,
self.conn_state.clone(),
self.send_grease_frame,
self.error_sender.clone(),
),
};
// send the grease frame only once
Expand Down Expand Up @@ -216,13 +221,15 @@
.fetch_add(1, std::sync::atomic::Ordering::Release);

Self {
conn_state2: self.conn_state2.clone(),
open: self.open.clone(),
conn_state: self.conn_state.clone(),
max_field_section_size: self.max_field_section_size,
sender_count: self.sender_count.clone(),
conn_waker: self.conn_waker.clone(),
_buf: PhantomData,
send_grease_frame: self.send_grease_frame,
error_sender: self.error_sender.clone(),
}
}
}
Expand Down Expand Up @@ -388,7 +395,7 @@
Ok(Frame::Settings(_)) => {
#[cfg(feature = "tracing")]
trace!("Got settings");
()

Check warning on line 398 in h3/src/client/connection.rs

View workflow job for this annotation

GitHub Actions / Lint

unneeded unit expression
}

Ok(Frame::Goaway(id)) => {
Expand Down
2 changes: 1 addition & 1 deletion h3/src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
.await
.map_err(|e| self.maybe_conn_err(e))?

Check failure on line 95 in h3/src/client/stream.rs

View workflow job for this annotation

GitHub Actions / Lint

the trait bound `error::Error: std::convert::From<frame::FrameStreamError>` is not satisfied
.ok_or_else(|| {
Code::H3_GENERAL_PROTOCOL_ERROR.with_reason(
"Did not receive response headers",
Expand Down Expand Up @@ -150,7 +150,7 @@
// TODO what if called before recv_response ?
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
self.inner.recv_data().await
future::poll_fn(|cx| self.poll_recv_data(cx)).await
}

/// Receive request body
Expand Down
Loading
Loading