Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use eventsource_client as es;
use futures::future::FutureExt;
use launchdarkly_server_sdk::{
Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference,
Expand All @@ -20,8 +19,8 @@ use crate::command_params::{
ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse,
MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse,
};
use crate::HttpsConnector;
use crate::StreamingHttpsConnector;
use crate::HttpsTransport;
use crate::StreamingHttpsTransport;
use crate::{
command_params::{
CommandParams, CommandResponse, EvaluateAllFlagsParams, EvaluateAllFlagsResponse,
Expand All @@ -37,8 +36,8 @@ pub struct ClientEntity {
impl ClientEntity {
pub async fn new(
create_instance_params: CreateInstanceParams,
connector: HttpsConnector,
streaming_https_connector: StreamingHttpsConnector,
transport: HttpsTransport,
streaming_https_transport: StreamingHttpsTransport,
) -> Result<Self, BuildError> {
let mut config_builder =
ConfigBuilder::new(&create_instance_params.configuration.credential);
Expand Down Expand Up @@ -74,8 +73,6 @@ impl ClientEntity {
}

if let Some(streaming) = create_instance_params.configuration.streaming {
let transport =
es::HyperTransport::builder().build_with_connector(streaming_https_connector);
if let Some(base_uri) = streaming.base_uri {
service_endpoints_builder.streaming_base_url(&base_uri);
}
Expand All @@ -84,7 +81,7 @@ impl ClientEntity {
if let Some(delay) = streaming.initial_retry_delay_ms {
streaming_builder.initial_reconnect_delay(Duration::from_millis(delay));
}
streaming_builder.transport(transport);
streaming_builder.transport(streaming_https_transport.clone());

config_builder = config_builder.data_source(&streaming_builder);
} else if let Some(polling) = create_instance_params.configuration.polling {
Expand All @@ -96,18 +93,15 @@ impl ClientEntity {
if let Some(delay) = polling.poll_interval_ms {
polling_builder.poll_interval(Duration::from_millis(delay));
}
let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
polling_builder.transport(transport);
polling_builder.transport(transport.clone());

config_builder = config_builder.data_source(&polling_builder);
} else {
// If we didn't specify streaming or polling, we fall back to basic streaming. The only
// customization we provide is the https connector to support testing multiple
// connectors.
let transport =
es::HyperTransport::builder().build_with_connector(streaming_https_connector);
// customization we provide is the transport to support testing multiple
// transport implementations.
let mut streaming_builder = StreamingDataSourceBuilder::new();
streaming_builder.transport(transport);
streaming_builder.transport(streaming_https_transport);
config_builder = config_builder.data_source(&streaming_builder);
}

Expand All @@ -132,7 +126,6 @@ impl ClientEntity {
if let Some(attributes) = events.global_private_attributes {
processor_builder.private_attributes(attributes);
}
let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
processor_builder.transport(transport);
processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts);

Expand Down
52 changes: 30 additions & 22 deletions contract-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Resu
use async_mutex::Mutex;
use client_entity::ClientEntity;
use futures::executor;
use hyper_util::client::legacy::connect::HttpConnector;
use launchdarkly_server_sdk::Reference;
use launchdarkly_server_sdk::{HyperTransport, Reference};
use serde::{self, Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::mpsc;
Expand Down Expand Up @@ -131,8 +130,8 @@ async fn create_client(
) -> HttpResponse {
let client_entity = match ClientEntity::new(
create_instance_params.into_inner(),
app_state.https_connector.clone(),
app_state.streaming_https_connector.clone(),
app_state.https_transport.clone(),
app_state.streaming_https_transport.clone(),
)
.await
{
Expand Down Expand Up @@ -206,19 +205,22 @@ async fn stop_client(req: HttpRequest, app_state: web::Data<AppState>) -> HttpRe
struct AppState {
counter: Mutex<u32>,
client_entities: Mutex<HashMap<u32, ClientEntity>>,
https_connector: HttpsConnector,
streaming_https_connector: StreamingHttpsConnector,
https_transport: HttpsTransport,
streaming_https_transport: StreamingHttpsTransport,
}

#[cfg(feature = "hyper-rustls")]
type HttpsConnector = hyper_rustls::HttpsConnector<HttpConnector>;
type HttpsTransport = HyperTransport<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
>;
#[cfg(feature = "hyper-rustls")]
type StreamingHttpsConnector = hyper_util::client::legacy::connect::HttpConnector;
type StreamingHttpsTransport = eventsource_client::HyperTransport;

#[cfg(feature = "tls")]
type HttpsConnector = hyper_tls::HttpsConnector<HttpConnector>;
type HttpsTransport =
HyperTransport<hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>>;
#[cfg(feature = "tls")]
type StreamingHttpsConnector = hyper_tls::HttpsConnector<HttpConnector>;
type StreamingHttpsTransport = eventsource_client::HyperTransport;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
Expand All @@ -238,26 +240,32 @@ async fn main() -> std::io::Result<()> {
let (tx, rx) = mpsc::channel::<()>();

#[cfg(feature = "hyper-rustls")]
let streaming_https_connector = hyper_util::client::legacy::connect::HttpConnector::new();
let streaming_https_transport = eventsource_client::HyperTransport::new();
#[cfg(feature = "hyper-rustls")]
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("Failed to load native root certificates")
.https_or_http()
.enable_http1()
.enable_http2()
.build();
let https_transport = {
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("Failed to load native root certificates")
.https_or_http()
.enable_http1()
.enable_http2()
.build();
HyperTransport::new_with_connector(connector)
};

#[cfg(feature = "tls")]
let streaming_https_connector = hyper_tls::HttpsConnector::new();
let streaming_https_transport = eventsource_client::HyperTransport::new();
#[cfg(feature = "tls")]
let connector = hyper_tls::HttpsConnector::new();
let https_transport = {
let connector = hyper_tls::HttpsConnector::new();
HyperTransport::new_with_connector(connector)
};

let state = web::Data::new(AppState {
counter: Mutex::new(0),
client_entities: Mutex::new(HashMap::new()),
https_connector: connector,
streaming_https_connector,
https_transport,
streaming_https_transport,
});

let server = HttpServer::new(move || {
Expand Down
120 changes: 120 additions & 0 deletions launchdarkly-server-sdk/examples/custom_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use bytes::Bytes;
use http::Request;
use launchdarkly_server_sdk::{ConfigBuilder, EventProcessorBuilder, HttpTransport, ResponseFuture};
use std::time::Instant;

/// Example of a custom transport that wraps another transport and adds logging.
///
/// This demonstrates how to implement the HttpTransport trait to add middleware
/// functionality like logging, metrics, retries, circuit breakers, etc.
#[derive(Clone)]
struct LoggingTransport<T: HttpTransport> {
inner: T,
}

impl<T: HttpTransport> LoggingTransport<T> {
fn new(inner: T) -> Self {
Self { inner }
}
}

impl<T: HttpTransport> HttpTransport for LoggingTransport<T> {
fn request(&self, request: Request<Bytes>) -> ResponseFuture {
let method = request.method().clone();
let uri = request.uri().clone();
let start = Instant::now();

println!("[REQUEST] {method} {uri}");

let inner = self.inner.clone();
Box::pin(async move {
let result = inner.request(request).await;
let elapsed = start.elapsed();

match &result {
Ok(response) => {
println!(
"[RESPONSE] {} {} - Status: {} - Duration: {:?}",
method,
uri,
response.status(),
elapsed
);
}
Err(e) => {
println!(
"[ERROR] {method} {uri} - Error: {e} - Duration: {elapsed:?}"
);
}
}

result
})
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Get SDK key from environment
let sdk_key = std::env::var("LAUNCHDARKLY_SDK_KEY")
.unwrap_or_else(|_| "your-sdk-key".to_string());

if sdk_key == "your-sdk-key" {
eprintln!("Please set LAUNCHDARKLY_SDK_KEY environment variable");
std::process::exit(1);
}

// Create the base HTTPS transport
let base_transport = launchdarkly_server_sdk::HyperTransport::new_https();

// Wrap it with logging middleware
let logging_transport = LoggingTransport::new(base_transport);

// Configure the SDK to use the custom transport
let config = ConfigBuilder::new(&sdk_key)
.event_processor(
EventProcessorBuilder::new()
.transport(logging_transport.clone())
.flush_interval(std::time::Duration::from_secs(5)),
)
.build()?;

// Create the client - you'll see all HTTP requests logged
println!("Initializing LaunchDarkly client with logging transport...");
let client = launchdarkly_server_sdk::Client::build(config)?;

// Wait for initialization
println!("Waiting for client initialization...");
match client
.wait_for_initialization(std::time::Duration::from_secs(10))
.await
{
Some(true) => {
println!("Client initialized successfully!");

// Evaluate a flag (will trigger HTTP events)
let context = launchdarkly_server_sdk::ContextBuilder::new("example-user-key")
.build()
.expect("Failed to create context");

let flag_value = client.bool_variation(&context, "example-flag", false);
println!("Flag 'example-flag' evaluated to: {flag_value}");

// Wait a bit to see event flushing
println!("Waiting to observe event flushing...");
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}
Some(false) => {
eprintln!("Client failed to initialize");
}
None => {
eprintln!("Client initialization timed out");
}
}

// Shutdown the client
println!("Shutting down client...");
client.close();

Ok(())
}