diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index cd8aa62..41e9d19 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -1,4 +1,3 @@ -use eventsource_client as es; use futures::future::FutureExt; use launchdarkly_server_sdk::{ Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference, @@ -20,8 +19,8 @@ use crate::command_params::{ ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse, }; -use crate::HttpsConnector; -use crate::StreamingHttpsConnector; +use crate::HttpsTransport; +use crate::StreamingHttpsTransport; use crate::{ command_params::{ CommandParams, CommandResponse, EvaluateAllFlagsParams, EvaluateAllFlagsResponse, @@ -37,8 +36,8 @@ pub struct ClientEntity { impl ClientEntity { pub async fn new( create_instance_params: CreateInstanceParams, - connector: HttpsConnector, - streaming_https_connector: StreamingHttpsConnector, + transport: HttpsTransport, + streaming_https_transport: StreamingHttpsTransport, ) -> Result { let mut config_builder = ConfigBuilder::new(&create_instance_params.configuration.credential); @@ -74,8 +73,6 @@ impl ClientEntity { } if let Some(streaming) = create_instance_params.configuration.streaming { - let transport = - es::HyperTransport::builder().build_with_connector(streaming_https_connector); if let Some(base_uri) = streaming.base_uri { service_endpoints_builder.streaming_base_url(&base_uri); } @@ -84,7 +81,7 @@ impl ClientEntity { if let Some(delay) = streaming.initial_retry_delay_ms { streaming_builder.initial_reconnect_delay(Duration::from_millis(delay)); } - streaming_builder.transport(transport); + streaming_builder.transport(streaming_https_transport.clone()); config_builder = config_builder.data_source(&streaming_builder); } else if let Some(polling) = create_instance_params.configuration.polling { @@ -96,18 +93,15 @@ impl ClientEntity { if let Some(delay) = polling.poll_interval_ms { polling_builder.poll_interval(Duration::from_millis(delay)); } - let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone()); - polling_builder.transport(transport); + polling_builder.transport(transport.clone()); config_builder = config_builder.data_source(&polling_builder); } else { // If we didn't specify streaming or polling, we fall back to basic streaming. The only - // customization we provide is the https connector to support testing multiple - // connectors. - let transport = - es::HyperTransport::builder().build_with_connector(streaming_https_connector); + // customization we provide is the transport to support testing multiple + // transport implementations. let mut streaming_builder = StreamingDataSourceBuilder::new(); - streaming_builder.transport(transport); + streaming_builder.transport(streaming_https_transport); config_builder = config_builder.data_source(&streaming_builder); } @@ -132,7 +126,6 @@ impl ClientEntity { if let Some(attributes) = events.global_private_attributes { processor_builder.private_attributes(attributes); } - let transport = launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone()); processor_builder.transport(transport); processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts); diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index 7d5e0f3..3e73f2b 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -7,8 +7,7 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Resu use async_mutex::Mutex; use client_entity::ClientEntity; use futures::executor; -use hyper_util::client::legacy::connect::HttpConnector; -use launchdarkly_server_sdk::Reference; +use launchdarkly_server_sdk::{HyperTransport, Reference}; use serde::{self, Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::mpsc; @@ -131,8 +130,8 @@ async fn create_client( ) -> HttpResponse { let client_entity = match ClientEntity::new( create_instance_params.into_inner(), - app_state.https_connector.clone(), - app_state.streaming_https_connector.clone(), + app_state.https_transport.clone(), + app_state.streaming_https_transport.clone(), ) .await { @@ -206,19 +205,22 @@ async fn stop_client(req: HttpRequest, app_state: web::Data) -> HttpRe struct AppState { counter: Mutex, client_entities: Mutex>, - https_connector: HttpsConnector, - streaming_https_connector: StreamingHttpsConnector, + https_transport: HttpsTransport, + streaming_https_transport: StreamingHttpsTransport, } #[cfg(feature = "hyper-rustls")] -type HttpsConnector = hyper_rustls::HttpsConnector; +type HttpsTransport = HyperTransport< + hyper_rustls::HttpsConnector, +>; #[cfg(feature = "hyper-rustls")] -type StreamingHttpsConnector = hyper_util::client::legacy::connect::HttpConnector; +type StreamingHttpsTransport = eventsource_client::HyperTransport; #[cfg(feature = "tls")] -type HttpsConnector = hyper_tls::HttpsConnector; +type HttpsTransport = + HyperTransport>; #[cfg(feature = "tls")] -type StreamingHttpsConnector = hyper_tls::HttpsConnector; +type StreamingHttpsTransport = eventsource_client::HyperTransport; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -238,26 +240,32 @@ async fn main() -> std::io::Result<()> { let (tx, rx) = mpsc::channel::<()>(); #[cfg(feature = "hyper-rustls")] - let streaming_https_connector = hyper_util::client::legacy::connect::HttpConnector::new(); + let streaming_https_transport = eventsource_client::HyperTransport::new(); #[cfg(feature = "hyper-rustls")] - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .expect("Failed to load native root certificates") - .https_or_http() - .enable_http1() - .enable_http2() - .build(); + let https_transport = { + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .expect("Failed to load native root certificates") + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + HyperTransport::new_with_connector(connector) + }; #[cfg(feature = "tls")] - let streaming_https_connector = hyper_tls::HttpsConnector::new(); + let streaming_https_transport = eventsource_client::HyperTransport::new(); #[cfg(feature = "tls")] - let connector = hyper_tls::HttpsConnector::new(); + let https_transport = { + let connector = hyper_tls::HttpsConnector::new(); + HyperTransport::new_with_connector(connector) + }; let state = web::Data::new(AppState { counter: Mutex::new(0), client_entities: Mutex::new(HashMap::new()), - https_connector: connector, - streaming_https_connector, + https_transport, + streaming_https_transport, }); let server = HttpServer::new(move || { diff --git a/launchdarkly-server-sdk/examples/custom_transport.rs b/launchdarkly-server-sdk/examples/custom_transport.rs new file mode 100644 index 0000000..88f39dd --- /dev/null +++ b/launchdarkly-server-sdk/examples/custom_transport.rs @@ -0,0 +1,120 @@ +use bytes::Bytes; +use http::Request; +use launchdarkly_server_sdk::{ConfigBuilder, EventProcessorBuilder, HttpTransport, ResponseFuture}; +use std::time::Instant; + +/// Example of a custom transport that wraps another transport and adds logging. +/// +/// This demonstrates how to implement the HttpTransport trait to add middleware +/// functionality like logging, metrics, retries, circuit breakers, etc. +#[derive(Clone)] +struct LoggingTransport { + inner: T, +} + +impl LoggingTransport { + fn new(inner: T) -> Self { + Self { inner } + } +} + +impl HttpTransport for LoggingTransport { + fn request(&self, request: Request) -> ResponseFuture { + let method = request.method().clone(); + let uri = request.uri().clone(); + let start = Instant::now(); + + println!("[REQUEST] {method} {uri}"); + + let inner = self.inner.clone(); + Box::pin(async move { + let result = inner.request(request).await; + let elapsed = start.elapsed(); + + match &result { + Ok(response) => { + println!( + "[RESPONSE] {} {} - Status: {} - Duration: {:?}", + method, + uri, + response.status(), + elapsed + ); + } + Err(e) => { + println!( + "[ERROR] {method} {uri} - Error: {e} - Duration: {elapsed:?}" + ); + } + } + + result + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Get SDK key from environment + let sdk_key = std::env::var("LAUNCHDARKLY_SDK_KEY") + .unwrap_or_else(|_| "your-sdk-key".to_string()); + + if sdk_key == "your-sdk-key" { + eprintln!("Please set LAUNCHDARKLY_SDK_KEY environment variable"); + std::process::exit(1); + } + + // Create the base HTTPS transport + let base_transport = launchdarkly_server_sdk::HyperTransport::new_https(); + + // Wrap it with logging middleware + let logging_transport = LoggingTransport::new(base_transport); + + // Configure the SDK to use the custom transport + let config = ConfigBuilder::new(&sdk_key) + .event_processor( + EventProcessorBuilder::new() + .transport(logging_transport.clone()) + .flush_interval(std::time::Duration::from_secs(5)), + ) + .build()?; + + // Create the client - you'll see all HTTP requests logged + println!("Initializing LaunchDarkly client with logging transport..."); + let client = launchdarkly_server_sdk::Client::build(config)?; + + // Wait for initialization + println!("Waiting for client initialization..."); + match client + .wait_for_initialization(std::time::Duration::from_secs(10)) + .await + { + Some(true) => { + println!("Client initialized successfully!"); + + // Evaluate a flag (will trigger HTTP events) + let context = launchdarkly_server_sdk::ContextBuilder::new("example-user-key") + .build() + .expect("Failed to create context"); + + let flag_value = client.bool_variation(&context, "example-flag", false); + println!("Flag 'example-flag' evaluated to: {flag_value}"); + + // Wait a bit to see event flushing + println!("Waiting to observe event flushing..."); + tokio::time::sleep(std::time::Duration::from_secs(6)).await; + } + Some(false) => { + eprintln!("Client failed to initialize"); + } + None => { + eprintln!("Client initialization timed out"); + } + } + + // Shutdown the client + println!("Shutting down client..."); + client.close(); + + Ok(()) +}