Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl ClientEntity {
if let Some(delay) = polling.poll_interval_ms {
polling_builder.poll_interval(Duration::from_millis(delay));
}
polling_builder.https_connector(connector.clone());
let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
polling_builder.transport(transport);

config_builder = config_builder.data_source(&polling_builder);
} else {
Expand Down
2 changes: 1 addition & 1 deletion launchdarkly-server-sdk/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl ConfigBuilder {
}
#[cfg(not(feature = "hyper-rustls"))]
None => Err(BuildError::InvalidConfig(
"data source builder required when hyper feature is disabled".into(),
"data source builder required when hyper-rustls feature is disabled".into(),
)),
};
let data_source_builder = data_source_builder_result?;
Expand Down
6 changes: 3 additions & 3 deletions launchdarkly-server-sdk/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,13 @@ mod tests {
time::Duration,
};

use hyper_util::client::legacy::connect::HttpConnector;
use mockito::Matcher;
use parking_lot::RwLock;
use test_case::test_case;
use tokio::sync::broadcast;

use super::{DataSource, PollingDataSource, StreamingDataSource};
use crate::feature_requester_builders::HyperFeatureRequesterBuilder;
use crate::feature_requester_builders::HttpFeatureRequesterBuilder;
use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER};
use eventsource_client as es;

Expand Down Expand Up @@ -453,8 +452,9 @@ mod tests {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let initialized = Arc::new(AtomicBool::new(false));

let transport = crate::HyperTransport::new();
let hyper_builder =
HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new());
HttpFeatureRequesterBuilder::new(&server.url(), "sdk-key", transport);

let polling = PollingDataSource::new(
Arc::new(Mutex::new(Box::new(hyper_builder))),
Expand Down
70 changes: 24 additions & 46 deletions launchdarkly-server-sdk/src/data_source_builders.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use super::service_endpoints;
use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder};
use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
use crate::transport::HttpTransport;
use eventsource_client as es;
use http::Uri;
#[cfg(feature = "hyper-rustls")]
use hyper_rustls::HttpsConnectorBuilder;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use thiserror::Error;
Expand Down Expand Up @@ -173,19 +171,17 @@ impl Default for NullDataSourceBuilder {
///
/// Adjust the initial reconnect delay.
/// ```
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
/// # use hyper_rustls::HttpsConnector;
/// # use hyper_util::client::legacy::connect::HttpConnector;
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport};
/// # use std::time::Duration;
/// # fn main() {
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HttpsConnector<HttpConnector>>::new()
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
/// .poll_interval(Duration::from_secs(60)));
/// # }
/// ```
#[derive(Clone)]
pub struct PollingDataSourceBuilder<C> {
pub struct PollingDataSourceBuilder<T: HttpTransport = crate::HyperTransport> {
poll_interval: Duration,
connector: Option<C>,
transport: Option<T>,
}

/// Contains methods for configuring the polling data source.
Expand All @@ -203,21 +199,19 @@ pub struct PollingDataSourceBuilder<C> {
///
/// Adjust the poll interval.
/// ```
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport};
/// # use std::time::Duration;
/// # use hyper_rustls::HttpsConnector;
/// # use hyper_util::client::legacy::connect::HttpConnector;
/// # fn main() {
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HttpsConnector<HttpConnector>>::new()
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
/// .poll_interval(Duration::from_secs(60)));
/// # }
/// ```
impl<C> PollingDataSourceBuilder<C> {
impl<T: HttpTransport> PollingDataSourceBuilder<T> {
/// Create a new instance of the [PollingDataSourceBuilder] with default values.
pub fn new() -> Self {
Self {
poll_interval: MINIMUM_POLL_INTERVAL,
connector: None,
transport: None,
}
}

Expand All @@ -230,58 +224,43 @@ impl<C> PollingDataSourceBuilder<C> {
self
}

/// Sets the connector for the polling client to use. This allows for re-use of a connector
/// Sets the transport for the polling client to use. This allows for re-use of a transport
/// between multiple client instances. This is especially useful for the `sdk-test-harness`
/// where many client instances are created throughout the test and reading the native
/// certificates is a substantial portion of the runtime.
pub fn https_connector(&mut self, connector: C) -> &mut Self {
self.connector = Some(connector);
pub fn transport(&mut self, transport: T) -> &mut Self {
self.transport = Some(transport);
self
}
}

impl<C> DataSourceFactory for PollingDataSourceBuilder<C>
where
C: tower::Service<Uri> + Clone + Send + Sync + 'static,
C::Response: hyper_util::client::legacy::connect::Connection
+ hyper::rt::Read
+ hyper::rt::Write
+ Send
+ Unpin,
C::Future: Send + Unpin + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
match &self.connector {
match &self.transport {
#[cfg(feature = "hyper-rustls")]
None => {
let connector = HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();

Ok(Box::new(HyperFeatureRequesterBuilder::new(
let transport = crate::HyperTransport::new_https();

Ok(Box::new(HttpFeatureRequesterBuilder::new(
endpoints.polling_base_url(),
sdk_key,
connector,
transport,
)))
}
#[cfg(not(feature = "hyper-rustls"))]
None => Err(BuildError::InvalidConfig(
"https connector required when rustls is disabled".into(),
"transport is required when hyper-rustls feature is disabled".into(),
)),
Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new(
Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new(
endpoints.polling_base_url(),
sdk_key,
connector.clone(),
transport.clone(),
))),
};

Expand All @@ -298,7 +277,7 @@ where
}
}

impl<C> Default for PollingDataSourceBuilder<C> {
impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
fn default() -> Self {
PollingDataSourceBuilder::new()
}
Expand Down Expand Up @@ -345,7 +324,6 @@ impl DataSourceFactory for MockDataSourceBuilder {
#[cfg(test)]
mod tests {
use eventsource_client::{HyperTransport, ResponseFuture};
use hyper_util::client::legacy::connect::HttpConnector;

use super::*;

Expand Down Expand Up @@ -388,7 +366,7 @@ mod tests {

#[test]
fn default_polling_builder_has_correct_defaults() {
let builder = PollingDataSourceBuilder::<HttpConnector>::new();
let builder = PollingDataSourceBuilder::<crate::HyperTransport>::new();
assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
}

Expand Down
57 changes: 25 additions & 32 deletions launchdarkly-server-sdk/src/feature_requester.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::reqwest::is_http_error_recoverable;
use crate::transport::HttpTransport;
use bytes::Bytes;
use futures::future::BoxFuture;
use http_body_util::{BodyExt, Empty};
use hyper_util::client::legacy::Client as HyperClient;
use futures::stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;

use super::stores::store_types::AllData;
use launchdarkly_server_sdk_evaluation::{Flag, Segment};
Expand All @@ -22,23 +21,23 @@ pub trait FeatureRequester: Send {
fn get_all(&mut self) -> BoxFuture<Result<AllData<Flag, Segment>, FeatureRequesterError>>;
}

pub struct HyperFeatureRequester<C> {
http: Arc<HyperClient<C, http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>>>,
pub struct HttpFeatureRequester<T: HttpTransport> {
transport: T,
url: http::Uri,
sdk_key: String,
cache: Option<CachedEntry>,
default_headers: HashMap<&'static str, String>,
}

impl<C> HyperFeatureRequester<C> {
impl<T: HttpTransport> HttpFeatureRequester<T> {
pub fn new(
http: HyperClient<C, http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>>,
transport: T,
url: http::Uri,
sdk_key: String,
default_headers: HashMap<&'static str, String>,
) -> Self {
Self {
http: Arc::new(http),
transport,
url,
sdk_key,
cache: None,
Expand All @@ -47,19 +46,16 @@ impl<C> HyperFeatureRequester<C> {
}
}

impl<C> FeatureRequester for HyperFeatureRequester<C>
where
C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
{
impl<T: HttpTransport> FeatureRequester for HttpFeatureRequester<T> {
fn get_all(&mut self) -> BoxFuture<Result<AllData<Flag, Segment>, FeatureRequesterError>> {
Box::pin(async {
let uri = self.url.clone();
let key = self.sdk_key.clone();

let http = self.http.clone();
let transport = self.transport.clone();
let cache = self.cache.clone();

let mut request_builder = hyper::http::Request::builder()
let mut request_builder = http::Request::builder()
.uri(uri)
.method("GET")
.header("Content-Type", "application/json")
Expand All @@ -76,14 +72,9 @@ where
}

// Create empty body for GET request
let empty_body: http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>> =
Empty::<Bytes>::new()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.boxed();
let request = request_builder.body(Bytes::new()).unwrap();

let result = http
.request(request_builder.body(empty_body).unwrap())
.await;
let result = transport.request(request).await;

let response = match result {
Ok(response) => response,
Expand All @@ -109,15 +100,19 @@ where
.map_or_else(|_| "".into(), |s| s.into());

if response.status().is_success() {
let body_bytes = response.into_body().collect().await
.map_err(|e| {
// Collect streaming body
let mut body_bytes = Vec::new();
let mut stream = response.into_body();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| {
error!(
"An error occurred while reading the polling response body: {e}"
);
FeatureRequesterError::Temporary
})?
.to_bytes();
let json = serde_json::from_slice::<AllData<Flag, Segment>>(body_bytes.as_ref());
})?;
body_bytes.extend_from_slice(&chunk);
}
let json = serde_json::from_slice::<AllData<Flag, Segment>>(&body_bytes);

return match json {
Ok(all_data) => {
Expand Down Expand Up @@ -253,14 +248,12 @@ mod tests {
}
}

fn build_feature_requester(url: String) -> HyperFeatureRequester<hyper_util::client::legacy::connect::HttpConnector> {
use hyper_util::rt::TokioExecutor;
let connector = hyper_util::client::legacy::connect::HttpConnector::new();
let http = HyperClient::builder(TokioExecutor::new()).build(connector);
fn build_feature_requester(url: String) -> HttpFeatureRequester<crate::HyperTransport> {
let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url");
let transport = crate::HyperTransport::new();

HyperFeatureRequester::new(
http,
HttpFeatureRequester::new(
transport,
url,
"sdk-key".to_string(),
HashMap::<&str, String>::new(),
Expand Down
Loading