diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 15cd941..cd8aa62 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -96,7 +96,8 @@ impl ClientEntity { if let Some(delay) = polling.poll_interval_ms { polling_builder.poll_interval(Duration::from_millis(delay)); } - polling_builder.https_connector(connector.clone()); + let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone()); + polling_builder.transport(transport); config_builder = config_builder.data_source(&polling_builder); } else { diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index 136b792..21f4411 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -310,7 +310,7 @@ impl ConfigBuilder { } #[cfg(not(feature = "hyper-rustls"))] None => Err(BuildError::InvalidConfig( - "data source builder required when hyper feature is disabled".into(), + "data source builder required when hyper-rustls feature is disabled".into(), )), }; let data_source_builder = data_source_builder_result?; diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 022abcc..9fefcec 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -366,14 +366,13 @@ mod tests { time::Duration, }; - use hyper_util::client::legacy::connect::HttpConnector; use mockito::Matcher; use parking_lot::RwLock; use test_case::test_case; use tokio::sync::broadcast; use super::{DataSource, PollingDataSource, StreamingDataSource}; - use crate::feature_requester_builders::HyperFeatureRequesterBuilder; + use crate::feature_requester_builders::HttpFeatureRequesterBuilder; use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER}; use eventsource_client as es; @@ -453,8 +452,9 @@ mod tests { let (shutdown_tx, _) = broadcast::channel::<()>(1); let initialized = Arc::new(AtomicBool::new(false)); + let transport = crate::HyperTransport::new(); let hyper_builder = - HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new()); + HttpFeatureRequesterBuilder::new(&server.url(), "sdk-key", transport); let polling = PollingDataSource::new( Arc::new(Mutex::new(Box::new(hyper_builder))), diff --git a/launchdarkly-server-sdk/src/data_source_builders.rs b/launchdarkly-server-sdk/src/data_source_builders.rs index 384d5c6..39ed7d7 100644 --- a/launchdarkly-server-sdk/src/data_source_builders.rs +++ b/launchdarkly-server-sdk/src/data_source_builders.rs @@ -1,10 +1,8 @@ use super::service_endpoints; use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource}; -use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder}; +use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder}; +use crate::transport::HttpTransport; use eventsource_client as es; -use http::Uri; -#[cfg(feature = "hyper-rustls")] -use hyper_rustls::HttpsConnectorBuilder; use std::sync::{Arc, Mutex}; use std::time::Duration; use thiserror::Error; @@ -173,19 +171,17 @@ impl Default for NullDataSourceBuilder { /// /// Adjust the initial reconnect delay. /// ``` -/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper_util::client::legacy::connect::HttpConnector; +/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport}; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::::new() /// .poll_interval(Duration::from_secs(60))); /// # } /// ``` #[derive(Clone)] -pub struct PollingDataSourceBuilder { +pub struct PollingDataSourceBuilder { poll_interval: Duration, - connector: Option, + transport: Option, } /// Contains methods for configuring the polling data source. @@ -203,21 +199,19 @@ pub struct PollingDataSourceBuilder { /// /// Adjust the poll interval. /// ``` -/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; +/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport}; /// # use std::time::Duration; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper_util::client::legacy::connect::HttpConnector; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::::new() /// .poll_interval(Duration::from_secs(60))); /// # } /// ``` -impl PollingDataSourceBuilder { +impl PollingDataSourceBuilder { /// Create a new instance of the [PollingDataSourceBuilder] with default values. pub fn new() -> Self { Self { poll_interval: MINIMUM_POLL_INTERVAL, - connector: None, + transport: None, } } @@ -230,27 +224,17 @@ impl PollingDataSourceBuilder { self } - /// Sets the connector for the polling client to use. This allows for re-use of a connector + /// Sets the transport for the polling client to use. This allows for re-use of a transport /// between multiple client instances. This is especially useful for the `sdk-test-harness` /// where many client instances are created throughout the test and reading the native /// certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } } -impl DataSourceFactory for PollingDataSourceBuilder -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection - + hyper::rt::Read - + hyper::rt::Write - + Send - + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl DataSourceFactory for PollingDataSourceBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, @@ -258,30 +242,25 @@ where tags: Option, ) -> Result, BuildError> { let feature_requester_builder: Result, BuildError> = - match &self.connector { + match &self.transport { #[cfg(feature = "hyper-rustls")] None => { - let connector = HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - Ok(Box::new(HyperFeatureRequesterBuilder::new( + let transport = crate::HyperTransport::new_https(); + + Ok(Box::new(HttpFeatureRequesterBuilder::new( endpoints.polling_base_url(), sdk_key, - connector, + transport, ))) } #[cfg(not(feature = "hyper-rustls"))] None => Err(BuildError::InvalidConfig( - "https connector required when rustls is disabled".into(), + "transport is required when hyper-rustls feature is disabled".into(), )), - Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new( + Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new( endpoints.polling_base_url(), sdk_key, - connector.clone(), + transport.clone(), ))), }; @@ -298,7 +277,7 @@ where } } -impl Default for PollingDataSourceBuilder { +impl Default for PollingDataSourceBuilder { fn default() -> Self { PollingDataSourceBuilder::new() } @@ -345,7 +324,6 @@ impl DataSourceFactory for MockDataSourceBuilder { #[cfg(test)] mod tests { use eventsource_client::{HyperTransport, ResponseFuture}; - use hyper_util::client::legacy::connect::HttpConnector; use super::*; @@ -388,7 +366,7 @@ mod tests { #[test] fn default_polling_builder_has_correct_defaults() { - let builder = PollingDataSourceBuilder::::new(); + let builder = PollingDataSourceBuilder::::new(); assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,); } diff --git a/launchdarkly-server-sdk/src/feature_requester.rs b/launchdarkly-server-sdk/src/feature_requester.rs index 6d3c560..4ab5068 100644 --- a/launchdarkly-server-sdk/src/feature_requester.rs +++ b/launchdarkly-server-sdk/src/feature_requester.rs @@ -1,10 +1,9 @@ use crate::reqwest::is_http_error_recoverable; +use crate::transport::HttpTransport; use bytes::Bytes; use futures::future::BoxFuture; -use http_body_util::{BodyExt, Empty}; -use hyper_util::client::legacy::Client as HyperClient; +use futures::stream::StreamExt; use std::collections::HashMap; -use std::sync::Arc; use super::stores::store_types::AllData; use launchdarkly_server_sdk_evaluation::{Flag, Segment}; @@ -22,23 +21,23 @@ pub trait FeatureRequester: Send { fn get_all(&mut self) -> BoxFuture, FeatureRequesterError>>; } -pub struct HyperFeatureRequester { - http: Arc>>>, +pub struct HttpFeatureRequester { + transport: T, url: http::Uri, sdk_key: String, cache: Option, default_headers: HashMap<&'static str, String>, } -impl HyperFeatureRequester { +impl HttpFeatureRequester { pub fn new( - http: HyperClient>>, + transport: T, url: http::Uri, sdk_key: String, default_headers: HashMap<&'static str, String>, ) -> Self { Self { - http: Arc::new(http), + transport, url, sdk_key, cache: None, @@ -47,19 +46,16 @@ impl HyperFeatureRequester { } } -impl FeatureRequester for HyperFeatureRequester -where - C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, -{ +impl FeatureRequester for HttpFeatureRequester { fn get_all(&mut self) -> BoxFuture, FeatureRequesterError>> { Box::pin(async { let uri = self.url.clone(); let key = self.sdk_key.clone(); - let http = self.http.clone(); + let transport = self.transport.clone(); let cache = self.cache.clone(); - let mut request_builder = hyper::http::Request::builder() + let mut request_builder = http::Request::builder() .uri(uri) .method("GET") .header("Content-Type", "application/json") @@ -76,14 +72,9 @@ where } // Create empty body for GET request - let empty_body: http_body_util::combinators::BoxBody> = - Empty::::new() - .map_err(|e| Box::new(e) as Box) - .boxed(); + let request = request_builder.body(Bytes::new()).unwrap(); - let result = http - .request(request_builder.body(empty_body).unwrap()) - .await; + let result = transport.request(request).await; let response = match result { Ok(response) => response, @@ -109,15 +100,19 @@ where .map_or_else(|_| "".into(), |s| s.into()); if response.status().is_success() { - let body_bytes = response.into_body().collect().await - .map_err(|e| { + // Collect streaming body + let mut body_bytes = Vec::new(); + let mut stream = response.into_body(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| { error!( "An error occurred while reading the polling response body: {e}" ); FeatureRequesterError::Temporary - })? - .to_bytes(); - let json = serde_json::from_slice::>(body_bytes.as_ref()); + })?; + body_bytes.extend_from_slice(&chunk); + } + let json = serde_json::from_slice::>(&body_bytes); return match json { Ok(all_data) => { @@ -253,14 +248,12 @@ mod tests { } } - fn build_feature_requester(url: String) -> HyperFeatureRequester { - use hyper_util::rt::TokioExecutor; - let connector = hyper_util::client::legacy::connect::HttpConnector::new(); - let http = HyperClient::builder(TokioExecutor::new()).build(connector); + fn build_feature_requester(url: String) -> HttpFeatureRequester { let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); + let transport = crate::HyperTransport::new(); - HyperFeatureRequester::new( - http, + HttpFeatureRequester::new( + transport, url, "sdk-key".to_string(), HashMap::<&str, String>::new(), diff --git a/launchdarkly-server-sdk/src/feature_requester_builders.rs b/launchdarkly-server-sdk/src/feature_requester_builders.rs index f009393..448fbd0 100644 --- a/launchdarkly-server-sdk/src/feature_requester_builders.rs +++ b/launchdarkly-server-sdk/src/feature_requester_builders.rs @@ -1,7 +1,7 @@ -use crate::feature_requester::{FeatureRequester, HyperFeatureRequester}; +use crate::feature_requester::{FeatureRequester, HttpFeatureRequester}; +use crate::transport::HttpTransport; use crate::LAUNCHDARKLY_TAGS_HEADER; use http::Uri; -use hyper_util::{client::legacy::Client as HyperClient, rt::TokioExecutor}; use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; @@ -24,35 +24,23 @@ pub trait FeatureRequesterFactory: Send { fn build(&self, tags: Option) -> Result, BuildError>; } -pub struct HyperFeatureRequesterBuilder { +pub struct HttpFeatureRequesterBuilder { url: String, sdk_key: String, - http: HyperClient>>, + transport: T, } -impl HyperFeatureRequesterBuilder -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ - pub fn new(url: &str, sdk_key: &str, connector: C) -> Self { +impl HttpFeatureRequesterBuilder { + pub fn new(url: &str, sdk_key: &str, transport: T) -> Self { Self { - http: HyperClient::builder(TokioExecutor::new()).build(connector), + transport, url: url.into(), sdk_key: sdk_key.into(), } } } -impl FeatureRequesterFactory for HyperFeatureRequesterBuilder -where - C: tower::Service + Clone + Send + Sync + 'static, - C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, - C::Future: Send + Unpin + 'static, - C::Error: Into>, -{ +impl FeatureRequesterFactory for HttpFeatureRequesterBuilder { fn build(&self, tags: Option) -> Result, BuildError> { let url = format!("{}/sdk/latest-all", self.url); @@ -65,8 +53,8 @@ where let url = Uri::from_str(url.as_str()) .map_err(|_| BuildError::InvalidConfig("Invalid base url provided".into()))?; - Ok(Box::new(HyperFeatureRequester::new( - self.http.clone(), + Ok(Box::new(HttpFeatureRequester::new( + self.transport.clone(), url, self.sdk_key.clone(), default_headers, @@ -76,16 +64,15 @@ where #[cfg(test)] mod tests { - use hyper_util::client::legacy::connect::HttpConnector; - use super::*; #[test] fn factory_handles_url_parsing_failure() { - let builder = HyperFeatureRequesterBuilder::new( + let transport = crate::HyperTransport::new(); + let builder = HttpFeatureRequesterBuilder::new( "This is clearly not a valid URL", "sdk-key", - HttpConnector::new(), + transport, ); let result = builder.build(None);