diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b93834e..ad2741e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: ./.github/actions/ci @@ -49,7 +49,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy rustup target add x86_64-unknown-linux-musl diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index 9f350b0..6189f32 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -19,7 +19,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 2078f4c..c6dfe21 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -24,7 +24,7 @@ jobs: - name: Setup rust tooling if: ${{ steps.release.outputs['launchdarkly-server-sdk--release_created'] == 'true' }} run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index 228327a..1135866 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -2,13 +2,15 @@ name = "contract-tests" version = "0.1.0" edition = "2021" -rust-version = "1.83.0" # MSRV +rust-version = "1.85.0" # MSRV license = "Apache-2.0" [dependencies] actix = "0.13.0" actix-web = "4.2.1" env_logger = "0.10.0" +# eventsource-client = { version = "0.16.0", default-features = false } +eventsource-client = { git = "https://github.com/launchdarkly/rust-eventsource-client", branch = "feat/hyper-as-feature" } log = "0.4.14" launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]} serde = { version = "1.0.132", features = ["derive"] } @@ -17,10 +19,12 @@ futures = "0.3.12" hyper = { version = "0.14.19", features = ["client"] } hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]} hyper-tls = { version = "0.5.0", optional = true } +hyper1-tls = { package = "hyper-tls", version = "0.6.0", optional = true } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"], optional = true } reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] } async-mutex = "1.4.0" [features] default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "launchdarkly-server-sdk/rustls"] -tls = ["hyper-tls"] +rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "launchdarkly-server-sdk/rustls", "hyper-util"] +tls = ["hyper-tls", "hyper1-tls", "hyper-util"] diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 3538178..0bcb72e 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -1,3 +1,4 @@ +use eventsource_client as es; use futures::future::FutureExt; use launchdarkly_server_sdk::{ Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference, @@ -20,6 +21,7 @@ use crate::command_params::{ MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse, }; use crate::HttpsConnector; +use crate::StreamingHttpsConnector; use crate::{ command_params::{ CommandParams, CommandResponse, EvaluateAllFlagsParams, EvaluateAllFlagsResponse, @@ -36,6 +38,7 @@ impl ClientEntity { pub async fn new( create_instance_params: CreateInstanceParams, connector: HttpsConnector, + streaming_https_connector: StreamingHttpsConnector, ) -> Result { let mut config_builder = ConfigBuilder::new(&create_instance_params.configuration.credential); @@ -71,6 +74,8 @@ impl ClientEntity { } if let Some(streaming) = create_instance_params.configuration.streaming { + let transport = + es::HyperTransport::builder().build_with_connector(streaming_https_connector); if let Some(base_uri) = streaming.base_uri { service_endpoints_builder.streaming_base_url(&base_uri); } @@ -79,7 +84,7 @@ impl ClientEntity { if let Some(delay) = streaming.initial_retry_delay_ms { streaming_builder.initial_reconnect_delay(Duration::from_millis(delay)); } - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport); config_builder = config_builder.data_source(&streaming_builder); } else if let Some(polling) = create_instance_params.configuration.polling { @@ -98,8 +103,10 @@ impl ClientEntity { // If we didn't specify streaming or polling, we fall back to basic streaming. The only // customization we provide is the https connector to support testing multiple // connectors. + let transport = + es::HyperTransport::builder().build_with_connector(streaming_https_connector); let mut streaming_builder = StreamingDataSourceBuilder::new(); - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport); config_builder = config_builder.data_source(&streaming_builder); } diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index 4bd4ff2..ad400f0 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -132,6 +132,7 @@ async fn create_client( let client_entity = match ClientEntity::new( create_instance_params.into_inner(), app_state.https_connector.clone(), + app_state.streaming_https_connector.clone(), ) .await { @@ -206,12 +207,19 @@ struct AppState { counter: Mutex, client_entities: Mutex>, https_connector: HttpsConnector, + streaming_https_connector: StreamingHttpsConnector, } #[cfg(feature = "rustls")] type HttpsConnector = hyper_rustls::HttpsConnector; +#[cfg(feature = "rustls")] +type StreamingHttpsConnector = hyper_util::client::legacy::connect::HttpConnector; + #[cfg(feature = "tls")] type HttpsConnector = hyper_tls::HttpsConnector; +#[cfg(feature = "tls")] +type StreamingHttpsConnector = + hyper1_tls::HttpsConnector; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -228,6 +236,8 @@ async fn main() -> std::io::Result<()> { let (tx, rx) = mpsc::channel::<()>(); + #[cfg(feature = "rustls")] + let streaming_https_connector = hyper_util::client::legacy::connect::HttpConnector::new(); #[cfg(feature = "rustls")] let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() @@ -236,6 +246,8 @@ async fn main() -> std::io::Result<()> { .enable_http2() .build(); + #[cfg(feature = "tls")] + let streaming_https_connector = hyper1_tls::HttpsConnector::new(); #[cfg(feature = "tls")] let connector = hyper_tls::HttpsConnector::new(); @@ -243,6 +255,7 @@ async fn main() -> std::io::Result<()> { counter: Mutex::new(0), client_entities: Mutex::new(HashMap::new()), https_connector: connector, + streaming_https_connector: streaming_https_connector, }); let server = HttpServer::new(move || { diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index fb86abf..55bae78 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -4,7 +4,7 @@ description = "LaunchDarkly Server-Side SDK" version = "2.6.2" authors = ["LaunchDarkly"] edition = "2021" -rust-version = "1.83.0" # MSRV +rust-version = "1.85.0" # MSRV license = "Apache-2.0" homepage = "https://docs.launchdarkly.com/sdk/server-side/rust" repository = "https://github.com/launchdarkly/rust-server-sdk" @@ -20,7 +20,8 @@ features = ["event-compression"] chrono = "0.4.19" crossbeam-channel = "0.5.1" data-encoding = "2.3.2" -eventsource-client = { version = "0.16.0", default-features = false } +# eventsource-client = { version = "0.16.0", default-features = false } +eventsource-client = { git = "https://github.com/launchdarkly/rust-eventsource-client", branch = "feat/hyper-as-feature" } futures = "0.3.12" lazy_static = "1.4.0" log = "0.4.14" @@ -54,7 +55,7 @@ testing_logger = "0.1.1" [features] default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"] +rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/hyper-rustls"] event-compression = ["flate2"] [[example]] diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index e264bff..8e738df 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -6,6 +6,7 @@ use crate::events::processor_builders::{ }; use crate::stores::store_builders::{DataStoreFactory, InMemoryDataStoreBuilder}; use crate::{ServiceEndpointsBuilder, StreamingDataSourceBuilder}; +use eventsource_client as es; use std::borrow::Borrow; @@ -301,9 +302,9 @@ impl ConfigBuilder { } Some(builder) => Ok(builder), #[cfg(feature = "rustls")] - None => Ok(Box::new(StreamingDataSourceBuilder::< - hyper_rustls::HttpsConnector, - >::new())), + None => Ok(Box::new( + StreamingDataSourceBuilder::::new(), + )), #[cfg(not(feature = "rustls"))] None => Err(BuildError::InvalidConfig( "data source builder required when rustls is disabled".into(), diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 55c786f..a533281 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -7,15 +7,11 @@ use crate::LAUNCHDARKLY_TAGS_HEADER; use es::{Client, ClientBuilder, ReconnectOptionsBuilder}; use eventsource_client as es; use futures::StreamExt; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; use launchdarkly_server_sdk_evaluation::{Flag, Segment}; use parking_lot::RwLock; use serde::Deserialize; use std::sync::{Arc, Mutex, Once}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::broadcast; use tokio::time; use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; @@ -73,19 +69,13 @@ pub struct StreamingDataSource { } impl StreamingDataSource { - pub fn new( + pub fn new( base_url: &str, sdk_key: &str, initial_reconnect_delay: Duration, tags: &Option, - connector: C, - ) -> std::result::Result - where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, - { + transport: T, + ) -> std::result::Result { let stream_url = format!("{}/all", base_url); let client_builder = ClientBuilder::for_url(&stream_url)?; @@ -105,7 +95,7 @@ impl StreamingDataSource { } Ok(Self { - es_client: Box::new(client_builder.build_with_conn(connector)), + es_client: Box::new(client_builder.build_with_transport(transport)), }) } } @@ -385,6 +375,7 @@ mod tests { use super::{DataSource, PollingDataSource, StreamingDataSource}; use crate::feature_requester_builders::HyperFeatureRequesterBuilder; use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER}; + use eventsource_client as es; #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] @@ -411,7 +402,7 @@ mod tests { "sdk-key", Duration::from_secs(0), &tag, - HttpConnector::new(), + es::HyperTransport::new(), ) .unwrap(); diff --git a/launchdarkly-server-sdk/src/data_source_builders.rs b/launchdarkly-server-sdk/src/data_source_builders.rs index ec32390..2d1d566 100644 --- a/launchdarkly-server-sdk/src/data_source_builders.rs +++ b/launchdarkly-server-sdk/src/data_source_builders.rs @@ -1,6 +1,7 @@ use super::service_endpoints; use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource}; use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder}; +use eventsource_client as es; use hyper::{client::connect::Connection, service::Service, Uri}; #[cfg(feature = "rustls")] use hyper_rustls::HttpsConnectorBuilder; @@ -47,26 +48,25 @@ pub trait DataSourceFactory { /// Adjust the initial reconnect delay. /// ``` /// # use launchdarkly_server_sdk::{StreamingDataSourceBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use eventsource_client as es; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::::new() /// .initial_reconnect_delay(Duration::from_secs(10))); /// # } /// ``` #[derive(Clone)] -pub struct StreamingDataSourceBuilder { +pub struct StreamingDataSourceBuilder { initial_reconnect_delay: Duration, - connector: Option, + transport: Option, } -impl StreamingDataSourceBuilder { +impl StreamingDataSourceBuilder { /// Create a new instance of the [StreamingDataSourceBuilder] with default values. pub fn new() -> Self { Self { initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY, - connector: None, + transport: None, } } @@ -76,56 +76,42 @@ impl StreamingDataSourceBuilder { self } - /// Sets the connector for the event source client to use. This allows for re-use of a - /// connector between multiple client instances. This is especially useful for the + /// Sets the transport for the event source client to use. This allows for re-use of a + /// transport between multiple client instances. This is especially useful for the /// `sdk-test-harness` where many client instances are created throughout the test and reading /// the native certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } } -impl DataSourceFactory for StreamingDataSourceBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, -{ +impl DataSourceFactory for StreamingDataSourceBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, sdk_key: &str, tags: Option, ) -> Result, BuildError> { - let data_source_result = match &self.connector { + let data_source_result = match &self.transport { #[cfg(feature = "rustls")] - None => { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - Ok(StreamingDataSource::new( - endpoints.streaming_base_url(), - sdk_key, - self.initial_reconnect_delay, - &tags, - connector, - )) - } + None => Ok(StreamingDataSource::new( + endpoints.streaming_base_url(), + sdk_key, + self.initial_reconnect_delay, + &tags, + es::HyperTransport::new_https(), + )), #[cfg(not(feature = "rustls"))] None => Err(BuildError::InvalidConfig( "https connector required when rustls is disabled".into(), )), - Some(connector) => Ok(StreamingDataSource::new( + Some(transport) => Ok(StreamingDataSource::new( endpoints.streaming_base_url(), sdk_key, self.initial_reconnect_delay, &tags, - connector.clone(), + transport.clone(), )), }; let data_source = data_source_result? @@ -138,7 +124,7 @@ where } } -impl Default for StreamingDataSourceBuilder { +impl Default for StreamingDataSourceBuilder { fn default() -> Self { StreamingDataSourceBuilder::new() } @@ -355,13 +341,15 @@ impl DataSourceFactory for MockDataSourceBuilder { #[cfg(test)] mod tests { + use eventsource_client::{HyperTransport, ResponseFuture}; use hyper::client::HttpConnector; use super::*; #[test] fn default_stream_builder_has_correct_defaults() { - let builder: StreamingDataSourceBuilder = StreamingDataSourceBuilder::new(); + let builder: StreamingDataSourceBuilder = + StreamingDataSourceBuilder::new(); assert_eq!( builder.initial_reconnect_delay, @@ -370,29 +358,22 @@ mod tests { } #[test] - fn stream_builder_can_use_custom_connector() { + fn stream_builder_can_use_custom_transport() { #[derive(Debug, Clone)] - struct TestConnector; - impl hyper::service::Service for TestConnector { - type Response = tokio::net::TcpStream; - type Error = std::io::Error; - type Future = futures::future::BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } + struct TestTransport; - fn call(&mut self, _req: hyper::Uri) -> Self::Future { + impl es::HttpTransport for TestTransport { + fn request( + &self, + _request: eventsource_client::Request>, + ) -> ResponseFuture { // this won't be called during the test unreachable!(); } } let mut builder = StreamingDataSourceBuilder::new(); - builder.https_connector(TestConnector); + builder.transport(TestTransport); assert!(builder .build( &crate::ServiceEndpointsBuilder::new().build().unwrap(), @@ -410,7 +391,7 @@ mod tests { #[test] fn initial_reconnect_delay_for_streaming_can_be_adjusted() { - let mut builder = StreamingDataSourceBuilder::<()>::new(); + let mut builder = StreamingDataSourceBuilder::::new(); builder.initial_reconnect_delay(Duration::from_secs(1234)); assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234)); }