diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index f4931e7..dc9d399 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -131,7 +131,9 @@ impl ClientEntity { if let Some(attributes) = events.global_private_attributes { processor_builder.private_attributes(attributes); } - processor_builder.https_connector(connector.clone()); + let transport = + launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone()); + processor_builder.transport(transport); processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts); config_builder.event_processor(&processor_builder) diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index a157c65..f3ece21 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -828,7 +828,6 @@ mod tests { use crossbeam_channel::Receiver; use eval::{ContextBuilder, MultiContextBuilder}; use futures::FutureExt; - use hyper_util::client::legacy::connect::HttpConnector; use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment}; use maplit::hashmap; use std::collections::HashMap; @@ -2600,7 +2599,8 @@ mod tests { .daemon_mode(daemon_mode) .data_source(MockDataSourceBuilder::new().data_source(updates)) .event_processor( - EventProcessorBuilder::::new().event_sender(Arc::new(event_sender)), + EventProcessorBuilder::::new() + .event_sender(Arc::new(event_sender)), ) .build() .expect("config should build"); diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index bb75eff..21f4411 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -302,12 +302,15 @@ impl ConfigBuilder { } Some(builder) => Ok(builder), #[cfg(feature = "hyper-rustls")] - None => Ok(Box::new( - StreamingDataSourceBuilder::::new(), - )), + None => { + let transport = es::HyperTransport::new_https(); + let mut builder = StreamingDataSourceBuilder::new(); + builder.transport(transport); + Ok(Box::new(builder)) + } #[cfg(not(feature = "hyper-rustls"))] None => Err(BuildError::InvalidConfig( - "data source builder required when rustls is disabled".into(), + "data source builder required when hyper-rustls feature is disabled".into(), )), }; let data_source_builder = data_source_builder_result?; @@ -321,14 +324,15 @@ impl ConfigBuilder { } Some(builder) => Ok(builder), #[cfg(feature = "hyper-rustls")] - None => Ok(Box::new(EventProcessorBuilder::< - hyper_rustls::HttpsConnector< - hyper_util::client::legacy::connect::HttpConnector, - >, - >::new())), + None => { + let transport = crate::HyperTransport::new_https(); + let mut builder = EventProcessorBuilder::new(); + builder.transport(transport); + Ok(Box::new(builder)) + } #[cfg(not(feature = "hyper-rustls"))] None => Err(BuildError::InvalidConfig( - "event processor factory required when rustls is disabled".into(), + "event processor factory required when hyper-rustls feature is disabled".into(), )), }; let event_processor_builder = event_processor_builder_result?; diff --git a/launchdarkly-server-sdk/src/events/processor_builders.rs b/launchdarkly-server-sdk/src/events/processor_builders.rs index 4f3e49e..5aa1cf4 100644 --- a/launchdarkly-server-sdk/src/events/processor_builders.rs +++ b/launchdarkly-server-sdk/src/events/processor_builders.rs @@ -5,12 +5,11 @@ use std::sync::Arc; use std::time::Duration; use http::Uri; -#[cfg(feature = "hyper-rustls")] -use hyper_rustls::HttpsConnectorBuilder; use launchdarkly_server_sdk_evaluation::Reference; use thiserror::Error; -use crate::events::sender::HyperEventSender; +use crate::events::sender::HttpEventSender; +use crate::transport::HttpTransport; use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER}; use super::processor::{ @@ -61,17 +60,15 @@ pub trait EventProcessorFactory { /// /// Adjust the flush interval /// ``` -/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper_util::client::legacy::connect::HttpConnector; +/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder, HyperTransport}; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::>::new() +/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::::new() /// .flush_interval(Duration::from_secs(10))); /// # } /// ``` #[derive(Clone)] -pub struct EventProcessorBuilder { +pub struct EventProcessorBuilder { capacity: usize, flush_interval: Duration, context_keys_capacity: NonZeroUsize, @@ -79,23 +76,13 @@ pub struct EventProcessorBuilder { event_sender: Option>, all_attributes_private: bool, private_attributes: HashSet, - connector: Option, + transport: Option, omit_anonymous_contexts: bool, compress_events: bool, // diagnostic_recording_interval: Duration } -impl EventProcessorFactory for EventProcessorBuilder -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection - + hyper::rt::Read - + hyper::rt::Write - + Send - + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl EventProcessorFactory for EventProcessorBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, @@ -114,10 +101,10 @@ where // NOTE: This would only be possible under unit testing conditions. if let Some(event_sender) = &self.event_sender { Ok(event_sender.clone()) - } else if let Some(connector) = &self.connector { - Ok(Arc::new(HyperEventSender::new( - connector.clone(), - hyper::Uri::from_str(url_string.as_str()).unwrap(), + } else if let Some(transport) = &self.transport { + Ok(Arc::new(HttpEventSender::new( + transport.clone(), + Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, self.compress_events, @@ -125,19 +112,9 @@ where } else { #[cfg(feature = "hyper-rustls")] { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .unwrap_or_else(|_| { - log::debug!("Falling back to webpki roots for event HTTPS connector"); - HttpsConnectorBuilder::new().with_webpki_roots() - }) - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - Ok(Arc::new(HyperEventSender::new( - connector, + let transport = crate::HyperTransport::new_https(); + Ok(Arc::new(HttpEventSender::new( + transport, Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, @@ -146,7 +123,7 @@ where } #[cfg(not(feature = "hyper-rustls"))] Err(BuildError::InvalidConfig( - "https connector is required when rustls is disabled".into(), + "transport is required when hyper-rustls feature is disabled".into(), )) }; let event_sender = event_sender_result?; @@ -173,7 +150,7 @@ where } } -impl EventProcessorBuilder { +impl EventProcessorBuilder { /// Create a new [EventProcessorBuilder] with all default values. pub fn new() -> Self { Self { @@ -186,7 +163,7 @@ impl EventProcessorBuilder { all_attributes_private: false, private_attributes: HashSet::new(), omit_anonymous_contexts: false, - connector: None, + transport: None, compress_events: false, } } @@ -250,12 +227,12 @@ impl EventProcessorBuilder { self } - /// Sets the connector for the event sender to use. This allows for re-use of a connector + /// Sets the transport for the event sender to use. This allows for re-use of a transport /// between multiple client instances. This is especially useful for the `sdk-test-harness` /// where many client instances are created throughout the test and reading the native /// certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } @@ -288,7 +265,7 @@ impl EventProcessorBuilder { } } -impl Default for EventProcessorBuilder { +impl Default for EventProcessorBuilder { fn default() -> Self { Self::new() } @@ -329,7 +306,6 @@ impl Default for NullEventProcessorBuilder { #[cfg(test)] mod tests { - use hyper_util::client::legacy::connect::HttpConnector; use launchdarkly_server_sdk_evaluation::ContextBuilder; use maplit::hashset; use mockito::Matcher; @@ -341,28 +317,28 @@ mod tests { #[test] fn default_builder_has_correct_defaults() { - let builder = EventProcessorBuilder::::new(); + let builder = EventProcessorBuilder::::new(); assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY); assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL); } #[test] fn capacity_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); builder.capacity(1234); assert_eq!(builder.capacity, 1234); } #[test] fn flush_interval_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); builder.flush_interval(Duration::from_secs(1234)); assert_eq!(builder.flush_interval, Duration::from_secs(1234)); } #[test] fn context_keys_capacity_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); let cap = NonZeroUsize::new(1234).expect("1234 > 0"); builder.context_keys_capacity(cap); assert_eq!(builder.context_keys_capacity, cap); @@ -370,7 +346,7 @@ mod tests { #[test] fn context_keys_flush_interval_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); builder.context_keys_flush_interval(Duration::from_secs(1000)); assert_eq!( builder.context_keys_flush_interval, @@ -380,7 +356,7 @@ mod tests { #[test] fn all_attribute_private_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); assert!(!builder.all_attributes_private); builder.all_attributes_private(true); @@ -389,7 +365,7 @@ mod tests { #[test] fn attribte_names_can_be_adjusted() { - let mut builder = EventProcessorBuilder::::new(); + let mut builder = EventProcessorBuilder::::new(); assert!(builder.private_attributes.is_empty()); builder.private_attributes(hashset!["name"]); @@ -414,7 +390,7 @@ mod tests { .build() .expect("Service endpoints failed to be created"); - let builder = EventProcessorBuilder::::new(); + let builder = EventProcessorBuilder::::new(); let processor = builder .build(&service_endpoints, "sdk-key", tag) .expect("Processor failed to build"); diff --git a/launchdarkly-server-sdk/src/events/sender.rs b/launchdarkly-server-sdk/src/events/sender.rs index 4a4fbd8..8594c03 100644 --- a/launchdarkly-server-sdk/src/events/sender.rs +++ b/launchdarkly-server-sdk/src/events/sender.rs @@ -1,5 +1,5 @@ use crate::{ - reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER, + reqwest::is_http_error_recoverable, transport::HttpTransport, LAUNCHDARKLY_EVENT_SCHEMA_HEADER, LAUNCHDARKLY_PAYLOAD_ID_HEADER, }; use chrono::DateTime; @@ -15,8 +15,6 @@ use std::io::Write; use bytes::Bytes; use futures::future::BoxFuture; -use http_body_util::{BodyExt, Full}; -use hyper_util::{client::legacy::Client as HyperClient, rt::TokioExecutor}; use tokio::time::{sleep, Duration}; use uuid::Uuid; @@ -37,13 +35,10 @@ pub trait EventSender: Send + Sync { } #[derive(Clone)] -pub struct HyperEventSender { +pub struct HttpEventSender { url: http::Uri, sdk_key: String, - http: HyperClient< - C, - http_body_util::combinators::BoxBody>, - >, + transport: T, default_headers: HashMap<&'static str, String>, // used with event-compression feature @@ -51,19 +46,9 @@ pub struct HyperEventSender { compress_events: bool, } -impl HyperEventSender -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection - + hyper::rt::Read - + hyper::rt::Write - + Send - + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl HttpEventSender { pub fn new( - connector: C, + transport: T, url: http::Uri, sdk_key: &str, default_headers: HashMap<&'static str, String>, @@ -72,7 +57,7 @@ where Self { url, sdk_key: sdk_key.to_owned(), - http: HyperClient::builder(TokioExecutor::new()).build(connector), + transport, default_headers, compress_events, } @@ -94,17 +79,7 @@ where } } -impl EventSender for HyperEventSender -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection - + hyper::rt::Read - + hyper::rt::Write - + Send - + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl EventSender for HttpEventSender { fn send_event_data( &self, events: Vec, @@ -149,7 +124,7 @@ where sleep(Duration::from_secs(1)).await; } - let mut request_builder = hyper::Request::builder() + let mut request_builder = http::Request::builder() .uri(self.url.clone()) .method("POST") .header("Content-Type", "application/json") @@ -166,17 +141,11 @@ where request_builder.header(*default_header.0, default_header.1.as_str()); } - // Convert Vec to BoxBody for hyper 1.0 + // Create request with Bytes body for transport let body_bytes = Bytes::from(payload.clone()); - let boxed_body: http_body_util::combinators::BoxBody< - Bytes, - Box, - > = Full::new(body_bytes) - .map_err(|e| Box::new(e) as Box) - .boxed(); - let request = request_builder.body(boxed_body); + let request = request_builder.body(body_bytes).unwrap(); - let result = self.http.request(request.unwrap()).await; + let result = self.transport.request(request).await; let response = match result { Ok(response) => response, @@ -371,14 +340,13 @@ mod tests { assert_eq!(sender_result.time_from_server, 1234567890000); } - fn build_event_sender( - url: String, - ) -> HyperEventSender { + fn build_event_sender(url: String) -> HttpEventSender { let url = format!("{}/bulk", &url); let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); - HyperEventSender::new( - hyper_util::client::legacy::connect::HttpConnector::new(), + let transport = crate::HyperTransport::new(); + HttpEventSender::new( + transport, url, "sdk-key", HashMap::<&str, String>::new(),