Skip to content

Commit

Permalink
progress on new types
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruben2424 committed Jan 25, 2025
1 parent 9beb31c commit 00d5e8e
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 54 deletions.
6 changes: 4 additions & 2 deletions h3/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
connection::{ConnectionInner, SharedStateRef},

Check warning on line 14 in h3/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / Lint

unused import: `SharedStateRef`
error::Error,
quic::{self},
shared_state::SharedState2,
};

use super::connection::{Connection, SendRequest};
Expand Down Expand Up @@ -107,14 +108,15 @@ impl Builder {
B: Buf,
{
let open = quic.opener();
let conn_state = SharedStateRef::default();
let conn_state = Arc::new(SharedState2::default());

let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);

let inner = ConnectionInner::new(quic, conn_state.clone(), self.config).await?;
let send_request = SendRequest {
open,
conn_state,
conn_state: todo!(),
conn_state2: conn_state,
conn_waker,
max_field_section_size: self.config.settings.max_field_section_size,
sender_count: Arc::new(AtomicUsize::new(1)),
Expand Down
3 changes: 3 additions & 0 deletions h3/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
proto::{frame::Frame, headers::Header, push::PushId},
qpack,
quic::{self, StreamId},
shared_state::SharedState2,
stream::{self, BufRecvStream},
};

Expand Down Expand Up @@ -110,6 +111,7 @@ where
{
pub(super) open: T,
pub(super) conn_state: SharedStateRef,
pub(super) conn_state2: Arc<SharedState2>,
pub(super) max_field_section_size: u64, // maximum size for a header we receive
// counts instances of SendRequest to close the connection when the last is dropped.
pub(super) sender_count: Arc<AtomicUsize>,
Expand Down Expand Up @@ -219,6 +221,7 @@ where
.fetch_add(1, std::sync::atomic::Ordering::Release);

Self {
conn_state2: self.conn_state2.clone(),
open: self.open.clone(),
conn_state: self.conn_state.clone(),
max_field_section_size: self.max_field_section_size,
Expand Down
17 changes: 15 additions & 2 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
},
qpack,
quic::{self, SendStream},
shared_state::{ConnectionState2, SharedState2},
stream::{self, AcceptRecvStream, AcceptedRecvStream, BufRecvStream, UniStreamHeader},
webtransport::SessionId,
};
Expand Down Expand Up @@ -106,6 +107,7 @@ where
C: quic::Connection<B>,
B: Buf,
{
pub(super) shared2: Arc<SharedState2>,
pub(super) shared: SharedStateRef,
/// TODO: breaking encapsulation just to see if we can get this to work, will fix before merging
pub conn: C,
Expand Down Expand Up @@ -149,6 +151,16 @@ where
pub(crate) error_sender: UnboundedSender<(Code, &'static str)>,
}

impl<B, C> ConnectionState2 for ConnectionInner<C, B>
where
C: quic::Connection<B>,
B: Buf,
{
fn shared_state(&self) -> &SharedState2 {
&self.shared2
}
}

enum GreaseStatus<S, B>
where
S: SendStream<B>,
Expand Down Expand Up @@ -240,7 +252,7 @@ where

/// Initiates the connection and opens a control stream
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub async fn new(mut conn: C, shared: SharedStateRef, config: Config) -> Result<Self, Error> {
pub async fn new(mut conn: C, shared2: Arc<SharedState2>, config: Config) -> Result<Self, Error> {
//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2
//# Endpoints SHOULD create the HTTP control stream as well as the
//# unidirectional streams required by mandatory extensions (such as the
Expand All @@ -261,7 +273,8 @@ where
//# sender MUST NOT close the control stream, and the receiver MUST NOT
//# request that the sender close the control stream.
let mut conn_inner = Self {
shared,
shared2,
shared: todo!(),
conn,
control_send: control_send.map_err(Error::transport_err)?,
control_recv: None,
Expand Down
6 changes: 6 additions & 0 deletions h3/src/error2/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,9 @@ impl From<NewCode> for u64 {
code.code
}
}

impl From<u64> for NewCode {
fn from(code: u64) -> NewCode {
NewCode { code }
}
}
119 changes: 96 additions & 23 deletions h3/src/error2/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use std::sync::Arc;

use crate::quic;
use crate::quic::{self, ConnectionErrorIncoming, StreamErrorIncoming};

use super::codes::NewCode;
use super::{codes::NewCode, internal_error::{InternalConnectionError, InternalRequestStreamError}};

/// This enum represents wether the error occurred on the local or remote side of the connection
#[derive(Debug, Clone)]
Expand All @@ -19,7 +19,7 @@ pub enum ConnectionError {
/// Error returned by the quic layer
/// I might be an quic error or the remote h3 connection closed the connection with an error
#[non_exhaustive]
Remote(Arc<dyn quic::Error>),
Remote(ConnectionErrorIncoming),
/// Timeout occurred
#[non_exhaustive]
Timeout,
Expand All @@ -42,6 +42,24 @@ pub enum LocalError {
Closing,
}

impl From<&InternalConnectionError> for LocalError {
fn from(err: &InternalConnectionError) -> Self {
LocalError::Application {
code: err.code,
reason: err.message,
}
}
}

impl From<&InternalRequestStreamError> for LocalError {
fn from(err: &InternalRequestStreamError) -> Self {
LocalError::Application {
code: err.code,
reason: err.message,
}
}
}

/// This enum represents a stream error
#[derive(Debug, Clone)]
#[non_exhaustive]
Expand All @@ -54,26 +72,44 @@ pub enum StreamError {
/// The error reason
reason: &'static str,
},
/// Stream was Reset by the peer
RemoteReset {
/// Reset code received from the peer
code: NewCode,
},
/// The error occurred on the connection
#[non_exhaustive]
ConnectionError(ConnectionError),
}

impl From<StreamErrorIncoming> for StreamError {
fn from(err: StreamErrorIncoming) -> Self {
match err {
StreamErrorIncoming::ConnectionErrorIncoming { connection_error } => {
StreamError::ConnectionError(connection_error.into())
}
StreamErrorIncoming::StreamReset { error_code } => StreamError::RemoteReset {
code: error_code.into(),
},
}
}
}

impl From<ConnectionErrorIncoming> for ConnectionError {
fn from(value: ConnectionErrorIncoming) -> Self {
return match value {
ConnectionErrorIncoming::Timeout => ConnectionError::Timeout,
error => ConnectionError::Remote(error),
};
}
}

/// This enum represents a stream error
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ServerStreamError {
/// The error occurred on the stream
#[non_exhaustive]
StreamError {
/// The error code
code: NewCode,
/// The error reason
reason: &'static str,
},
/// The error occurred on the connection
#[non_exhaustive]
ConnectionError(ConnectionError),
/// A Stream error occurred
General(StreamError),
#[non_exhaustive]
/// The received header block is too big
/// The Request has been answered with a 431 Request Header Fields Too Large
Expand All @@ -85,13 +121,48 @@ pub enum ServerStreamError {
},
}

#[derive(Debug, Clone)]
#[non_exhaustive]
/// This enum represents a stream error
///
/// This type will be returned by the client in case of an error on the stream methods
pub enum ClientStreamError {
/// A Stream error occurred
General(StreamError),
#[non_exhaustive]
/// The request cannot be sent because the header block larger then permitted by the server
HeaderTooBig {
/// The actual size of the header block
actual_size: u64,
/// The maximum size of the header block
max_size: u64,
},
}

impl std::fmt::Display for ClientStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClientStreamError::General(stream_error) => {
write!(f, "Stream error: {}", stream_error)
}
ClientStreamError::HeaderTooBig {
actual_size,
max_size,
} => write!(
f,
"Request cannot be sent because the header is lager than permitted by the server: permitted size: {}, max size: {}",
actual_size, max_size
),
}
}
}

impl std::fmt::Display for ServerStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServerStreamError::StreamError { code, reason } => {
write!(f, "Stream error: {:?} - {}", code, reason)
ServerStreamError::General(stream_error) => {
write!(f, "Stream error: {}", stream_error)
}
ServerStreamError::ConnectionError(err) => write!(f, "Connection error: {}", err),
ServerStreamError::HeaderTooBig {
actual_size,
max_size,
Expand All @@ -108,12 +179,13 @@ impl std::error::Error for ServerStreamError {}

impl From<StreamError> for ServerStreamError {
fn from(err: StreamError) -> Self {
match err {
StreamError::StreamError { code, reason } => {
ServerStreamError::StreamError { code, reason }
}
StreamError::ConnectionError(err) => ServerStreamError::ConnectionError(err),
}
ServerStreamError::General(err)
}
}

impl From<StreamError> for ClientStreamError {
fn from(err: StreamError) -> Self {
ClientStreamError::General(err)
}
}

Expand All @@ -136,6 +208,7 @@ impl std::fmt::Display for StreamError {
write!(f, "Stream error: {:?} - {}", code, reason)
}
StreamError::ConnectionError(err) => write!(f, "Connection error: {}", err),
StreamError::RemoteReset { code } => write!(f, "Remote reset: {:?}", code),
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions h3/src/error2/internal_error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::error::Code;

use super::codes::NewCode;

/// This enum determines if the error affects the connection or the stream
///
/// The end goal is to make all stream errors from the spec potential connection errors if users of h3 decide to treat them as such
Expand All @@ -20,11 +22,11 @@ pub enum ErrorScope {
#[derive(Debug, Clone, Hash)]
pub struct InternalRequestStreamError {
/// The error scope
scope: ErrorScope,
pub(crate) scope: ErrorScope,
/// The error code
code: Code,
pub(crate) code: NewCode,
/// The error message
message: &'static str,
pub(crate) message: &'static str,
}

/// This error type represents an internal error type, which is used
Expand All @@ -36,7 +38,7 @@ pub struct InternalRequestStreamError {
#[derive(Debug, Clone, Hash)]
pub struct InternalConnectionError {
/// The error code
code: Code,
pub(super) code: NewCode,
/// The error message
message: &'static str,
pub(super) message: &'static str,
}
1 change: 1 addition & 0 deletions h3/src/error2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ pub(crate) mod internal_error;
mod error;

pub use error::{ConnectionError, LocalError, ServerStreamError, StreamError};
pub use codes::NewCode;
Loading

0 comments on commit 00d5e8e

Please sign in to comment.