Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ async-mutex = "1.4.0"
[features]
default = ["hyper-rustls"]
hyper-rustls = ["dep:hyper-rustls", "launchdarkly-server-sdk/hyper-rustls"]
tls = ["hyper-tls"]
tls = ["hyper-tls", "launchdarkly-server-sdk/hyper"]
3 changes: 2 additions & 1 deletion contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ impl ClientEntity {
if let Some(attributes) = events.global_private_attributes {
processor_builder.private_attributes(attributes);
}
processor_builder.https_connector(connector.clone());
let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
processor_builder.transport(transport);
processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts);

config_builder.event_processor(&processor_builder)
Expand Down
3 changes: 1 addition & 2 deletions launchdarkly-server-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,6 @@ mod tests {
use crossbeam_channel::Receiver;
use eval::{ContextBuilder, MultiContextBuilder};
use futures::FutureExt;
use hyper_util::client::legacy::connect::HttpConnector;
use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
use maplit::hashmap;
use std::collections::HashMap;
Expand Down Expand Up @@ -2612,7 +2611,7 @@ mod tests {
.daemon_mode(daemon_mode)
.data_source(MockDataSourceBuilder::new().data_source(updates))
.event_processor(
EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
EventProcessorBuilder::<crate::HyperTransport>::new().event_sender(Arc::new(event_sender)),
)
.build()
.expect("config should build");
Expand Down
24 changes: 14 additions & 10 deletions launchdarkly-server-sdk/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,15 @@ impl ConfigBuilder {
}
Some(builder) => Ok(builder),
#[cfg(feature = "hyper-rustls")]
None => Ok(Box::new(
StreamingDataSourceBuilder::<es::HyperTransport>::new(),
)),
None => {
let transport = es::HyperTransport::new_https();
let mut builder = StreamingDataSourceBuilder::new();
builder.transport(transport);
Ok(Box::new(builder))
}
#[cfg(not(feature = "hyper-rustls"))]
None => Err(BuildError::InvalidConfig(
"data source builder required when rustls is disabled".into(),
"data source builder required when hyper feature is disabled".into(),
)),
};
let data_source_builder = data_source_builder_result?;
Expand All @@ -321,14 +324,15 @@ impl ConfigBuilder {
}
Some(builder) => Ok(builder),
#[cfg(feature = "hyper-rustls")]
None => Ok(Box::new(EventProcessorBuilder::<
hyper_rustls::HttpsConnector<
hyper_util::client::legacy::connect::HttpConnector,
>,
>::new())),
None => {
let transport = crate::HyperTransport::new_https();
let mut builder = EventProcessorBuilder::new();
builder.transport(transport);
Ok(Box::new(builder))
}
#[cfg(not(feature = "hyper-rustls"))]
None => Err(BuildError::InvalidConfig(
"event processor factory required when rustls is disabled".into(),
"event processor factory required when hyper-rustls feature is disabled".into(),
)),
};
let event_processor_builder = event_processor_builder_result?;
Expand Down
78 changes: 29 additions & 49 deletions launchdarkly-server-sdk/src/events/processor_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use std::sync::Arc;
use std::time::Duration;

use http::Uri;
#[cfg(feature = "hyper-rustls")]
use hyper_rustls::HttpsConnectorBuilder;
use launchdarkly_server_sdk_evaluation::Reference;
use thiserror::Error;

use crate::events::sender::HyperEventSender;
use crate::events::sender::HttpEventSender;
use crate::transport::HttpTransport;
use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};

use super::processor::{
Expand Down Expand Up @@ -61,41 +60,29 @@ pub trait EventProcessorFactory {
///
/// Adjust the flush interval
/// ```
/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder};
/// # use hyper_rustls::HttpsConnector;
/// # use hyper_util::client::legacy::connect::HttpConnector;
/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder, HyperTransport};
/// # use std::time::Duration;
/// # fn main() {
/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HttpsConnector<HttpConnector>>::new()
/// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HyperTransport>::new()
/// .flush_interval(Duration::from_secs(10)));
/// # }
/// ```
#[derive(Clone)]
pub struct EventProcessorBuilder<C> {
pub struct EventProcessorBuilder<T: HttpTransport = crate::HyperTransport> {
capacity: usize,
flush_interval: Duration,
context_keys_capacity: NonZeroUsize,
context_keys_flush_interval: Duration,
event_sender: Option<Arc<dyn EventSender>>,
all_attributes_private: bool,
private_attributes: HashSet<Reference>,
connector: Option<C>,
transport: Option<T>,
omit_anonymous_contexts: bool,
compress_events: bool,
// diagnostic_recording_interval: Duration
}

impl<C> EventProcessorFactory for EventProcessorBuilder<C>
where
C: tower::Service<Uri> + Clone + Send + Sync + 'static,
C::Response: hyper_util::client::legacy::connect::Connection
+ hyper::rt::Read
+ hyper::rt::Write
+ Send
+ Unpin,
C::Future: Send + Unpin + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
impl<T: HttpTransport> EventProcessorFactory for EventProcessorBuilder<T> {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
Expand All @@ -114,26 +101,20 @@ where
// NOTE: This would only be possible under unit testing conditions.
if let Some(event_sender) = &self.event_sender {
Ok(event_sender.clone())
} else if let Some(connector) = &self.connector {
Ok(Arc::new(HyperEventSender::new(
connector.clone(),
hyper::Uri::from_str(url_string.as_str()).unwrap(),
} else if let Some(transport) = &self.transport {
Ok(Arc::new(HttpEventSender::new(
transport.clone(),
Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
self.compress_events,
)))
} else {
#[cfg(feature = "hyper-rustls")]
{
let connector = HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();

Ok(Arc::new(HyperEventSender::new(
connector,
let transport = crate::HyperTransport::new_https();
Ok(Arc::new(HttpEventSender::new(
transport,
Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
Expand All @@ -142,7 +123,7 @@ where
}
#[cfg(not(feature = "hyper-rustls"))]
Err(BuildError::InvalidConfig(
"https connector is required when rustls is disabled".into(),
"transport is required when hyper-rustls feature is disabled".into(),
))
};
let event_sender = event_sender_result?;
Expand All @@ -169,7 +150,7 @@ where
}
}

impl<C> EventProcessorBuilder<C> {
impl<T: HttpTransport> EventProcessorBuilder<T> {
/// Create a new [EventProcessorBuilder] with all default values.
pub fn new() -> Self {
Self {
Expand All @@ -182,7 +163,7 @@ impl<C> EventProcessorBuilder<C> {
all_attributes_private: false,
private_attributes: HashSet::new(),
omit_anonymous_contexts: false,
connector: None,
transport: None,
compress_events: false,
}
}
Expand Down Expand Up @@ -246,12 +227,12 @@ impl<C> EventProcessorBuilder<C> {
self
}

/// Sets the connector for the event sender to use. This allows for re-use of a connector
/// Sets the transport for the event sender to use. This allows for re-use of a transport
/// between multiple client instances. This is especially useful for the `sdk-test-harness`
/// where many client instances are created throughout the test and reading the native
/// certificates is a substantial portion of the runtime.
pub fn https_connector(&mut self, connector: C) -> &mut Self {
self.connector = Some(connector);
pub fn transport(&mut self, transport: T) -> &mut Self {
self.transport = Some(transport);
self
}

Expand Down Expand Up @@ -284,7 +265,7 @@ impl<C> EventProcessorBuilder<C> {
}
}

impl<C> Default for EventProcessorBuilder<C> {
impl<T: HttpTransport> Default for EventProcessorBuilder<T> {
fn default() -> Self {
Self::new()
}
Expand Down Expand Up @@ -325,7 +306,6 @@ impl Default for NullEventProcessorBuilder {

#[cfg(test)]
mod tests {
use hyper_util::client::legacy::connect::HttpConnector;
use launchdarkly_server_sdk_evaluation::ContextBuilder;
use maplit::hashset;
use mockito::Matcher;
Expand All @@ -337,36 +317,36 @@ mod tests {

#[test]
fn default_builder_has_correct_defaults() {
let builder = EventProcessorBuilder::<HttpConnector>::new();
let builder = EventProcessorBuilder::<crate::HyperTransport>::new();
assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
}

#[test]
fn capacity_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();
builder.capacity(1234);
assert_eq!(builder.capacity, 1234);
}

#[test]
fn flush_interval_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();
builder.flush_interval(Duration::from_secs(1234));
assert_eq!(builder.flush_interval, Duration::from_secs(1234));
}

#[test]
fn context_keys_capacity_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();
let cap = NonZeroUsize::new(1234).expect("1234 > 0");
builder.context_keys_capacity(cap);
assert_eq!(builder.context_keys_capacity, cap);
}

#[test]
fn context_keys_flush_interval_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();
builder.context_keys_flush_interval(Duration::from_secs(1000));
assert_eq!(
builder.context_keys_flush_interval,
Expand All @@ -376,7 +356,7 @@ mod tests {

#[test]
fn all_attribute_private_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();

assert!(!builder.all_attributes_private);
builder.all_attributes_private(true);
Expand All @@ -385,7 +365,7 @@ mod tests {

#[test]
fn attribte_names_can_be_adjusted() {
let mut builder = EventProcessorBuilder::<HttpConnector>::new();
let mut builder = EventProcessorBuilder::<crate::HyperTransport>::new();

assert!(builder.private_attributes.is_empty());
builder.private_attributes(hashset!["name"]);
Expand All @@ -410,7 +390,7 @@ mod tests {
.build()
.expect("Service endpoints failed to be created");

let builder = EventProcessorBuilder::<HttpConnector>::new();
let builder = EventProcessorBuilder::<crate::HyperTransport>::new();
let processor = builder
.build(&service_endpoints, "sdk-key", tag)
.expect("Processor failed to build");
Expand Down
Loading