diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b93834e..ad2741e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: ./.github/actions/ci @@ -49,7 +49,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy rustup target add x86_64-unknown-linux-musl diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml index 9f350b0..6189f32 100644 --- a/.github/workflows/manual-publish.yml +++ b/.github/workflows/manual-publish.yml @@ -19,7 +19,7 @@ jobs: - name: Setup rust tooling run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: ./.github/actions/ci diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 2078f4c..c6dfe21 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -24,7 +24,7 @@ jobs: - name: Setup rust tooling if: ${{ steps.release.outputs['launchdarkly-server-sdk--release_created'] == 'true' }} run: | - rustup override set 1.83 + rustup override set 1.85 rustup component add rustfmt clippy - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.2.0 diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index 228327a..2e1cf4a 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -2,25 +2,27 @@ name = "contract-tests" version = "0.1.0" edition = "2021" -rust-version = "1.83.0" # MSRV +rust-version = "1.85.0" # MSRV license = "Apache-2.0" [dependencies] actix = "0.13.0" actix-web = "4.2.1" env_logger = "0.10.0" +# eventsource-client = { version = "0.16.0", default-features = false } +eventsource-client = { git = "https://github.com/launchdarkly/rust-eventsource-client", branch = "feat/hyper-as-feature" } log = "0.4.14" launchdarkly-server-sdk = { path = "../launchdarkly-server-sdk/", default-features = false, features = ["event-compression"]} serde = { version = "1.0.132", features = ["derive"] } serde_json = "1.0.73" futures = "0.3.12" -hyper = { version = "0.14.19", features = ["client"] } -hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]} -hyper-tls = { version = "0.5.0", optional = true } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"] } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "webpki-roots"], optional = true } +hyper-tls = { version = "0.6.0", optional = true } reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] } async-mutex = "1.4.0" [features] default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "launchdarkly-server-sdk/rustls"] +rustls = ["hyper-rustls", "launchdarkly-server-sdk/rustls"] tls = ["hyper-tls"] diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index 3538178..f4931e7 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -1,3 +1,4 @@ +use eventsource_client as es; use futures::future::FutureExt; use launchdarkly_server_sdk::{ Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference, @@ -20,6 +21,7 @@ use crate::command_params::{ MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse, }; use crate::HttpsConnector; +use crate::StreamingHttpsConnector; use crate::{ command_params::{ CommandParams, CommandResponse, EvaluateAllFlagsParams, EvaluateAllFlagsResponse, @@ -36,6 +38,7 @@ impl ClientEntity { pub async fn new( create_instance_params: CreateInstanceParams, connector: HttpsConnector, + streaming_https_connector: StreamingHttpsConnector, ) -> Result { let mut config_builder = ConfigBuilder::new(&create_instance_params.configuration.credential); @@ -71,6 +74,8 @@ impl ClientEntity { } if let Some(streaming) = create_instance_params.configuration.streaming { + let transport = + es::HyperTransport::builder().build_with_connector(streaming_https_connector); if let Some(base_uri) = streaming.base_uri { service_endpoints_builder.streaming_base_url(&base_uri); } @@ -79,7 +84,7 @@ impl ClientEntity { if let Some(delay) = streaming.initial_retry_delay_ms { streaming_builder.initial_reconnect_delay(Duration::from_millis(delay)); } - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport); config_builder = config_builder.data_source(&streaming_builder); } else if let Some(polling) = create_instance_params.configuration.polling { @@ -98,8 +103,10 @@ impl ClientEntity { // If we didn't specify streaming or polling, we fall back to basic streaming. The only // customization we provide is the https connector to support testing multiple // connectors. + let transport = + es::HyperTransport::builder().build_with_connector(streaming_https_connector); let mut streaming_builder = StreamingDataSourceBuilder::new(); - streaming_builder.https_connector(connector.clone()); + streaming_builder.transport(transport); config_builder = config_builder.data_source(&streaming_builder); } @@ -350,7 +357,7 @@ impl ClientEntity { )), } } - command => Err(format!("Invalid command requested: {}", command)), + command => Err(format!("Invalid command requested: {command}")), } } diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index 4bd4ff2..e4b1b36 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -7,7 +7,7 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Resu use async_mutex::Mutex; use client_entity::ClientEntity; use futures::executor; -use hyper::client::HttpConnector; +use hyper_util::client::legacy::connect::HttpConnector; use launchdarkly_server_sdk::Reference; use serde::{self, Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -132,11 +132,12 @@ async fn create_client( let client_entity = match ClientEntity::new( create_instance_params.into_inner(), app_state.https_connector.clone(), + app_state.streaming_https_connector.clone(), ) .await { Ok(ce) => ce, - Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)), + Err(e) => return HttpResponse::InternalServerError().body(format!("{e}")), }; let mut counter = app_state.counter.lock().await; @@ -206,12 +207,18 @@ struct AppState { counter: Mutex, client_entities: Mutex>, https_connector: HttpsConnector, + streaming_https_connector: StreamingHttpsConnector, } #[cfg(feature = "rustls")] type HttpsConnector = hyper_rustls::HttpsConnector; +#[cfg(feature = "rustls")] +type StreamingHttpsConnector = hyper_util::client::legacy::connect::HttpConnector; + #[cfg(feature = "tls")] type HttpsConnector = hyper_tls::HttpsConnector; +#[cfg(feature = "tls")] +type StreamingHttpsConnector = hyper_tls::HttpsConnector; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -228,14 +235,19 @@ async fn main() -> std::io::Result<()> { let (tx, rx) = mpsc::channel::<()>(); + #[cfg(feature = "rustls")] + let streaming_https_connector = hyper_util::client::legacy::connect::HttpConnector::new(); #[cfg(feature = "rustls")] let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() + .expect("Failed to load native root certificates") .https_or_http() .enable_http1() .enable_http2() .build(); + #[cfg(feature = "tls")] + let streaming_https_connector = hyper_tls::HttpsConnector::new(); #[cfg(feature = "tls")] let connector = hyper_tls::HttpsConnector::new(); @@ -243,6 +255,7 @@ async fn main() -> std::io::Result<()> { counter: Mutex::new(0), client_entities: Mutex::new(HashMap::new()), https_connector: connector, + streaming_https_connector, }); let server = HttpServer::new(move || { diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index fb86abf..9e6e2b8 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -4,7 +4,7 @@ description = "LaunchDarkly Server-Side SDK" version = "2.6.2" authors = ["LaunchDarkly"] edition = "2021" -rust-version = "1.83.0" # MSRV +rust-version = "1.85.0" # MSRV license = "Apache-2.0" homepage = "https://docs.launchdarkly.com/sdk/server-side/rust" repository = "https://github.com/launchdarkly/rust-server-sdk" @@ -20,7 +20,8 @@ features = ["event-compression"] chrono = "0.4.19" crossbeam-channel = "0.5.1" data-encoding = "2.3.2" -eventsource-client = { version = "0.16.0", default-features = false } +# eventsource-client = { version = "0.16.0", default-features = false } +eventsource-client = { git = "https://github.com/launchdarkly/rust-eventsource-client", branch = "feat/hyper-as-feature" } futures = "0.3.12" lazy_static = "1.4.0" log = "0.4.14" @@ -34,8 +35,13 @@ parking_lot = "0.12.0" tokio-stream = { version = "0.1.8", features = ["sync"] } moka = { version = "0.12.1", features = ["sync"] } uuid = {version = "1.2.2", features = ["v4"] } -hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] } -hyper-rustls = { version = "0.24.1" , optional = true} +http = "1.0" +bytes = "1.11" +hyper = { version = "1.0", features = ["client", "http1", "http2"] } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "tokio"] } +http-body-util = { version = "0.1" } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "webpki-roots"], optional = true} +tower = { version = "0.4" } rand = "0.9" flate2 = { version = "1.0.35", optional = true } aws-lc-rs = "1.14.1" @@ -54,7 +60,7 @@ testing_logger = "0.1.1" [features] default = ["rustls"] -rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/rustls"] +rustls = ["hyper-rustls/http1", "hyper-rustls/http2", "eventsource-client/hyper-rustls"] event-compression = ["flate2"] [[example]] diff --git a/launchdarkly-server-sdk/examples/print_flags.rs b/launchdarkly-server-sdk/examples/print_flags.rs index 5a7c3b3..168d918 100644 --- a/launchdarkly-server-sdk/examples/print_flags.rs +++ b/launchdarkly-server-sdk/examples/print_flags.rs @@ -27,7 +27,7 @@ async fn main() { } else if let ["str", name] = bits { str_flags.push(name.to_string()); } else if let [flag_type, _] = bits { - error!("Unsupported flag type {} in {}", flag_type, flag); + error!("Unsupported flag type {flag_type} in {flag}"); exit(2); } else if let [name] = bits { bool_flags.push(name.to_string()); diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index bd3d6d9..9d2c8b3 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -294,10 +294,7 @@ impl Client { } let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await; - match initialized { - Ok(result) => Some(result), - Err(_) => None, - } + initialized.ok() } async fn initialized_async_internal(&self) -> bool { @@ -335,7 +332,7 @@ impl Client { // broadcast channel, so sending on it would always result in an error. if !self.offline && !self.daemon_mode { if let Err(e) = self.shutdown_broadcast.send(()) { - error!("Failed to shutdown client appropriately: {}", e); + error!("Failed to shutdown client appropriately: {e}"); } } @@ -380,8 +377,7 @@ impl Client { b } else { warn!( - "bool_variation called for a non-bool flag {:?} (got {:?})", - flag_key, val + "bool_variation called for a non-bool flag {flag_key:?} (got {val:?})" ); default } @@ -400,8 +396,7 @@ impl Client { s } else { warn!( - "str_variation called for a non-string flag {:?} (got {:?})", - flag_key, val + "str_variation called for a non-string flag {flag_key:?} (got {val:?})" ); default } @@ -420,8 +415,7 @@ impl Client { f } else { warn!( - "float_variation called for a non-float flag {:?} (got {:?})", - flag_key, val + "float_variation called for a non-float flag {flag_key:?} (got {val:?})" ); default } @@ -440,8 +434,7 @@ impl Client { f } else { warn!( - "int_variation called for a non-int flag {:?} (got {:?})", - flag_key, val + "int_variation called for a non-int flag {flag_key:?} (got {val:?})" ); default } @@ -761,14 +754,12 @@ impl Client { ); } Err(e) => error!( - "Failed to build migration event, no event will be sent: {}", - e + "Failed to build migration event, no event will be sent: {e}" ), } } Err(e) => error!( - "Failed to lock migration tracker, no event will be sent: {}", - e + "Failed to lock migration tracker, no event will be sent: {e}" ), } } @@ -849,7 +840,7 @@ mod tests { use crossbeam_channel::Receiver; use eval::{ContextBuilder, MultiContextBuilder}; use futures::FutureExt; - use hyper::client::HttpConnector; + use hyper_util::client::legacy::connect::HttpConnector; use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment}; use maplit::hashmap; use std::collections::HashMap; diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index e264bff..b4d21ad 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -6,6 +6,7 @@ use crate::events::processor_builders::{ }; use crate::stores::store_builders::{DataStoreFactory, InMemoryDataStoreBuilder}; use crate::{ServiceEndpointsBuilder, StreamingDataSourceBuilder}; +use eventsource_client as es; use std::borrow::Borrow; @@ -85,7 +86,7 @@ impl ApplicationInfo { match tag.is_valid() { Ok(_) => self.tags.push(tag), Err(e) => { - warn!("{}", e) + warn!("{e}") } } @@ -301,9 +302,9 @@ impl ConfigBuilder { } Some(builder) => Ok(builder), #[cfg(feature = "rustls")] - None => Ok(Box::new(StreamingDataSourceBuilder::< - hyper_rustls::HttpsConnector, - >::new())), + None => Ok(Box::new( + StreamingDataSourceBuilder::::new(), + )), #[cfg(not(feature = "rustls"))] None => Err(BuildError::InvalidConfig( "data source builder required when rustls is disabled".into(), @@ -321,7 +322,7 @@ impl ConfigBuilder { Some(builder) => Ok(builder), #[cfg(feature = "rustls")] None => Ok(Box::new(EventProcessorBuilder::< - hyper_rustls::HttpsConnector, + hyper_rustls::HttpsConnector, >::new())), #[cfg(not(feature = "rustls"))] None => Err(BuildError::InvalidConfig( diff --git a/launchdarkly-server-sdk/src/data_source.rs b/launchdarkly-server-sdk/src/data_source.rs index 55c786f..022abcc 100644 --- a/launchdarkly-server-sdk/src/data_source.rs +++ b/launchdarkly-server-sdk/src/data_source.rs @@ -7,15 +7,11 @@ use crate::LAUNCHDARKLY_TAGS_HEADER; use es::{Client, ClientBuilder, ReconnectOptionsBuilder}; use eventsource_client as es; use futures::StreamExt; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; use launchdarkly_server_sdk_evaluation::{Flag, Segment}; use parking_lot::RwLock; use serde::Deserialize; use std::sync::{Arc, Mutex, Once}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::broadcast; use tokio::time; use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; @@ -73,20 +69,14 @@ pub struct StreamingDataSource { } impl StreamingDataSource { - pub fn new( + pub fn new( base_url: &str, sdk_key: &str, initial_reconnect_delay: Duration, tags: &Option, - connector: C, - ) -> std::result::Result - where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, - { - let stream_url = format!("{}/all", base_url); + transport: T, + ) -> std::result::Result { + let stream_url = format!("{base_url}/all"); let client_builder = ClientBuilder::for_url(&stream_url)?; let mut client_builder = client_builder @@ -105,7 +95,7 @@ impl StreamingDataSource { } Ok(Self { - es_client: Box::new(client_builder.build_with_conn(connector)), + es_client: Box::new(client_builder.build_with_transport(transport)), }) } } @@ -136,7 +126,7 @@ impl DataSource for StreamingDataSource { continue; }, es::SSE::Comment(str)=> { - debug!("data source got a comment: {}", str); + debug!("data source got a comment: {str}"); continue; }, es::SSE::Event(ev) => ev, @@ -157,7 +147,7 @@ impl DataSource for StreamingDataSource { continue; } _ => { - error!("unhandled error on event stream: {:?}", e); + error!("unhandled error on event stream: {e:?}"); break; } } @@ -181,7 +171,7 @@ impl DataSource for StreamingDataSource { }; if let Err(e) = stored { init_success = false; - error!("error processing update: {:?}", e); + error!("error processing update: {e:?}"); } notify_init.call_once(|| (init_complete)(init_success)); @@ -223,12 +213,12 @@ impl DataSource for PollingDataSource { Ok(factory) => match factory.build(self.tags.clone()) { Ok(requester) => requester, Err(e) => { - error!("{:?}", e); + error!("{e:?}"); return; } }, Err(e) => { - error!("{:?}", e); + error!("{e:?}"); return; } }; @@ -376,7 +366,7 @@ mod tests { time::Duration, }; - use hyper::client::HttpConnector; + use hyper_util::client::legacy::connect::HttpConnector; use mockito::Matcher; use parking_lot::RwLock; use test_case::test_case; @@ -385,6 +375,7 @@ mod tests { use super::{DataSource, PollingDataSource, StreamingDataSource}; use crate::feature_requester_builders::HyperFeatureRequesterBuilder; use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER}; + use eventsource_client as es; #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")] #[test_case(None, Matcher::Missing)] @@ -411,7 +402,7 @@ mod tests { "sdk-key", Duration::from_secs(0), &tag, - HttpConnector::new(), + es::HyperTransport::new(), ) .unwrap(); diff --git a/launchdarkly-server-sdk/src/data_source_builders.rs b/launchdarkly-server-sdk/src/data_source_builders.rs index ec32390..02e105b 100644 --- a/launchdarkly-server-sdk/src/data_source_builders.rs +++ b/launchdarkly-server-sdk/src/data_source_builders.rs @@ -1,13 +1,13 @@ use super::service_endpoints; use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource}; use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder}; -use hyper::{client::connect::Connection, service::Service, Uri}; +use eventsource_client as es; +use http::Uri; #[cfg(feature = "rustls")] use hyper_rustls::HttpsConnectorBuilder; use std::sync::{Arc, Mutex}; use std::time::Duration; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; #[cfg(test)] use super::data_source; @@ -47,26 +47,25 @@ pub trait DataSourceFactory { /// Adjust the initial reconnect delay. /// ``` /// # use launchdarkly_server_sdk::{StreamingDataSourceBuilder, ConfigBuilder}; -/// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use eventsource_client as es; /// # use std::time::Duration; /// # fn main() { -/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::>::new() +/// ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::::new() /// .initial_reconnect_delay(Duration::from_secs(10))); /// # } /// ``` #[derive(Clone)] -pub struct StreamingDataSourceBuilder { +pub struct StreamingDataSourceBuilder { initial_reconnect_delay: Duration, - connector: Option, + transport: Option, } -impl StreamingDataSourceBuilder { +impl StreamingDataSourceBuilder { /// Create a new instance of the [StreamingDataSourceBuilder] with default values. pub fn new() -> Self { Self { initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY, - connector: None, + transport: None, } } @@ -76,60 +75,46 @@ impl StreamingDataSourceBuilder { self } - /// Sets the connector for the event source client to use. This allows for re-use of a - /// connector between multiple client instances. This is especially useful for the + /// Sets the transport for the event source client to use. This allows for re-use of a + /// transport between multiple client instances. This is especially useful for the /// `sdk-test-harness` where many client instances are created throughout the test and reading /// the native certificates is a substantial portion of the runtime. - pub fn https_connector(&mut self, connector: C) -> &mut Self { - self.connector = Some(connector); + pub fn transport(&mut self, transport: T) -> &mut Self { + self.transport = Some(transport); self } } -impl DataSourceFactory for StreamingDataSourceBuilder -where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, - C::Future: Send + 'static, - C::Error: Into>, -{ +impl DataSourceFactory for StreamingDataSourceBuilder { fn build( &self, endpoints: &service_endpoints::ServiceEndpoints, sdk_key: &str, tags: Option, ) -> Result, BuildError> { - let data_source_result = match &self.connector { + let data_source_result = match &self.transport { #[cfg(feature = "rustls")] - None => { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - Ok(StreamingDataSource::new( - endpoints.streaming_base_url(), - sdk_key, - self.initial_reconnect_delay, - &tags, - connector, - )) - } + None => Ok(StreamingDataSource::new( + endpoints.streaming_base_url(), + sdk_key, + self.initial_reconnect_delay, + &tags, + es::HyperTransport::new_https(), + )), #[cfg(not(feature = "rustls"))] None => Err(BuildError::InvalidConfig( "https connector required when rustls is disabled".into(), )), - Some(connector) => Ok(StreamingDataSource::new( + Some(transport) => Ok(StreamingDataSource::new( endpoints.streaming_base_url(), sdk_key, self.initial_reconnect_delay, &tags, - connector.clone(), + transport.clone(), )), }; let data_source = data_source_result? - .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {:?}", e)))?; + .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {e:?}")))?; Ok(Arc::new(data_source)) } @@ -138,7 +123,7 @@ where } } -impl Default for StreamingDataSourceBuilder { +impl Default for StreamingDataSourceBuilder { fn default() -> Self { StreamingDataSourceBuilder::new() } @@ -190,7 +175,7 @@ impl Default for NullDataSourceBuilder { /// ``` /// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; /// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use hyper_util::client::legacy::connect::HttpConnector; /// # use std::time::Duration; /// # fn main() { /// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() @@ -221,7 +206,7 @@ pub struct PollingDataSourceBuilder { /// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder}; /// # use std::time::Duration; /// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use hyper_util::client::legacy::connect::HttpConnector; /// # fn main() { /// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::>::new() /// .poll_interval(Duration::from_secs(60))); @@ -257,8 +242,8 @@ impl PollingDataSourceBuilder { impl DataSourceFactory for PollingDataSourceBuilder where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { @@ -273,7 +258,7 @@ where #[cfg(feature = "rustls")] None => { let connector = HttpsConnectorBuilder::new() - .with_native_roots() + .with_webpki_roots() .https_or_http() .enable_http1() .enable_http2() @@ -355,13 +340,15 @@ impl DataSourceFactory for MockDataSourceBuilder { #[cfg(test)] mod tests { - use hyper::client::HttpConnector; + use eventsource_client::{HyperTransport, ResponseFuture}; + use hyper_util::client::legacy::connect::HttpConnector; use super::*; #[test] fn default_stream_builder_has_correct_defaults() { - let builder: StreamingDataSourceBuilder = StreamingDataSourceBuilder::new(); + let builder: StreamingDataSourceBuilder = + StreamingDataSourceBuilder::new(); assert_eq!( builder.initial_reconnect_delay, @@ -370,29 +357,22 @@ mod tests { } #[test] - fn stream_builder_can_use_custom_connector() { + fn stream_builder_can_use_custom_transport() { #[derive(Debug, Clone)] - struct TestConnector; - impl hyper::service::Service for TestConnector { - type Response = tokio::net::TcpStream; - type Error = std::io::Error; - type Future = futures::future::BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } + struct TestTransport; - fn call(&mut self, _req: hyper::Uri) -> Self::Future { + impl es::HttpTransport for TestTransport { + fn request( + &self, + _request: eventsource_client::Request>, + ) -> ResponseFuture { // this won't be called during the test unreachable!(); } } let mut builder = StreamingDataSourceBuilder::new(); - builder.https_connector(TestConnector); + builder.transport(TestTransport); assert!(builder .build( &crate::ServiceEndpointsBuilder::new().build().unwrap(), @@ -410,7 +390,7 @@ mod tests { #[test] fn initial_reconnect_delay_for_streaming_can_be_adjusted() { - let mut builder = StreamingDataSourceBuilder::<()>::new(); + let mut builder = StreamingDataSourceBuilder::::new(); builder.initial_reconnect_delay(Duration::from_secs(1234)); assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234)); } diff --git a/launchdarkly-server-sdk/src/events/dispatcher.rs b/launchdarkly-server-sdk/src/events/dispatcher.rs index df59165..5921fb5 100644 --- a/launchdarkly-server-sdk/src/events/dispatcher.rs +++ b/launchdarkly-server-sdk/src/events/dispatcher.rs @@ -108,7 +108,7 @@ impl EventDispatcher { let rt = match rt { Ok(rt) => rt, Err(e) => { - error!("Could not start runtime for event sending: {}", e); + error!("Could not start runtime for event sending: {e}"); return; } }; @@ -128,7 +128,7 @@ impl EventDispatcher { }, Ok(_) => continue, Err(e) => { - error!("event_result_rx is disconnected. Shutting down dispatcher: {}", e); + error!("event_result_rx is disconnected. Shutting down dispatcher: {e}"); return; } }, @@ -158,7 +158,7 @@ impl EventDispatcher { return; } Err(e) => { - error!("inbox_rx is disconnected. Shutting down dispatcher: {}", e); + error!("inbox_rx is disconnected. Shutting down dispatcher: {e}"); return; } } @@ -302,11 +302,11 @@ impl EventDispatcher { let key = context.canonical_key(); if self.context_keys.get(key).is_none() { - trace!("noticing new context {:?}", key); + trace!("noticing new context {key:?}"); self.context_keys.put(key.to_owned(), ()); true } else { - trace!("ignoring already-seen context {:?}", key); + trace!("ignoring already-seen context {key:?}"); false } } diff --git a/launchdarkly-server-sdk/src/events/event.rs b/launchdarkly-server-sdk/src/events/event.rs index 741694b..665f08c 100644 --- a/launchdarkly-server-sdk/src/events/event.rs +++ b/launchdarkly-server-sdk/src/events/event.rs @@ -476,8 +476,8 @@ impl InputEvent { impl Display for InputEvent { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let json = serde_json::to_string_pretty(self) - .unwrap_or_else(|e| format!("JSON serialization failed ({}): {:?}", e, self)); - write!(f, "{}", json) + .unwrap_or_else(|e| format!("JSON serialization failed ({e}): {self:?}")); + write!(f, "{json}") } } @@ -1600,7 +1600,7 @@ mod tests { variation_index: Some(1), reason: Reason::RuleMatch { rule_index, - rule_id: format!("rule-{}", rule_index), + rule_id: format!("rule-{rule_index}"), in_experiment: rule_in_experiment, }, }; diff --git a/launchdarkly-server-sdk/src/events/processor_builders.rs b/launchdarkly-server-sdk/src/events/processor_builders.rs index 75b7e8e..9bf40f7 100644 --- a/launchdarkly-server-sdk/src/events/processor_builders.rs +++ b/launchdarkly-server-sdk/src/events/processor_builders.rs @@ -4,14 +4,11 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; +use http::Uri; #[cfg(feature = "rustls")] use hyper_rustls::HttpsConnectorBuilder; use launchdarkly_server_sdk_evaluation::Reference; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; use crate::events::sender::HyperEventSender; use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER}; @@ -66,7 +63,7 @@ pub trait EventProcessorFactory { /// ``` /// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder}; /// # use hyper_rustls::HttpsConnector; -/// # use hyper::client::HttpConnector; +/// # use hyper_util::client::legacy::connect::HttpConnector; /// # use std::time::Duration; /// # fn main() { /// ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::>::new() @@ -90,8 +87,8 @@ pub struct EventProcessorBuilder { impl EventProcessorFactory for EventProcessorBuilder where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { @@ -125,7 +122,7 @@ where #[cfg(feature = "rustls")] { let connector = HttpsConnectorBuilder::new() - .with_native_roots() + .with_webpki_roots() .https_or_http() .enable_http1() .enable_http2() @@ -133,7 +130,7 @@ where Ok(Arc::new(HyperEventSender::new( connector, - hyper::Uri::from_str(url_string.as_str()).unwrap(), + Uri::from_str(url_string.as_str()).unwrap(), sdk_key, default_headers, self.compress_events, @@ -324,7 +321,7 @@ impl Default for NullEventProcessorBuilder { #[cfg(test)] mod tests { - use hyper::client::HttpConnector; + use hyper_util::client::legacy::connect::HttpConnector; use launchdarkly_server_sdk_evaluation::ContextBuilder; use maplit::hashset; use mockito::Matcher; diff --git a/launchdarkly-server-sdk/src/events/sender.rs b/launchdarkly-server-sdk/src/events/sender.rs index f8b6a54..778442c 100644 --- a/launchdarkly-server-sdk/src/events/sender.rs +++ b/launchdarkly-server-sdk/src/events/sender.rs @@ -13,12 +13,11 @@ use flate2::Compression; #[cfg(feature = "event-compression")] use std::io::Write; +use bytes::Bytes; use futures::future::BoxFuture; -use hyper::{client::connect::Connection, service::Service, Uri}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - time::{sleep, Duration}, -}; +use http_body_util::{BodyExt, Full}; +use hyper_util::{client::legacy::Client as HyperClient, rt::TokioExecutor}; +use tokio::time::{sleep, Duration}; use uuid::Uuid; use super::event::OutputEvent; @@ -39,9 +38,9 @@ pub trait EventSender: Send + Sync { #[derive(Clone)] pub struct HyperEventSender { - url: hyper::Uri, + url: http::Uri, sdk_key: String, - http: hyper::Client, + http: HyperClient>>, default_headers: HashMap<&'static str, String>, // used with event-compression feature @@ -51,14 +50,14 @@ pub struct HyperEventSender { impl HyperEventSender where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { pub fn new( connector: C, - url: hyper::Uri, + url: http::Uri, sdk_key: &str, default_headers: HashMap<&'static str, String>, compress_events: bool, @@ -66,13 +65,13 @@ where Self { url, sdk_key: sdk_key.to_owned(), - http: hyper::Client::builder().build(connector), + http: HyperClient::builder(TokioExecutor::new()).build(connector), default_headers, compress_events, } } - fn get_server_time_from_response(&self, response: &hyper::Response) -> u128 { + fn get_server_time_from_response(&self, response: &http::Response) -> u128 { let date_value = response .headers() .get("date") @@ -90,8 +89,8 @@ where impl EventSender for HyperEventSender where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { @@ -115,8 +114,7 @@ where Ok(json) => json, Err(e) => { error!( - "Failed to serialize event payload. Some events were dropped: {:?}", - e + "Failed to serialize event payload. Some events were dropped: {e:?}" ); return; } @@ -158,7 +156,14 @@ where request_builder = request_builder.header(*default_header.0, default_header.1.as_str()); } - let request = request_builder.body(hyper::Body::from(payload.clone())); + + // Convert Vec to BoxBody for hyper 1.0 + let body_bytes = Bytes::from(payload.clone()); + let boxed_body: http_body_util::combinators::BoxBody> = + Full::new(body_bytes) + .map_err(|e| Box::new(e) as Box) + .boxed(); + let request = request_builder.body(boxed_body); let result = self.http.request(request.unwrap()).await; @@ -168,7 +173,7 @@ where Err(e) => { // It appears this type of error will not be an HTTP error. // It will be a closed connection, aborted write, timeout, etc. - error!("Failed to send events. Some events were dropped: {:?}", e); + error!("Failed to send events. Some events were dropped: {e:?}"); result_tx .send(EventSenderResult { success: false, @@ -355,12 +360,12 @@ mod tests { assert_eq!(sender_result.time_from_server, 1234567890000); } - fn build_event_sender(url: String) -> HyperEventSender { + fn build_event_sender(url: String) -> HyperEventSender { let url = format!("{}/bulk", &url); - let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url"); + let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); HyperEventSender::new( - hyper::client::HttpConnector::new(), + hyper_util::client::legacy::connect::HttpConnector::new(), url, "sdk-key", HashMap::<&str, String>::new(), diff --git a/launchdarkly-server-sdk/src/feature_requester.rs b/launchdarkly-server-sdk/src/feature_requester.rs index e435036..6d3c560 100644 --- a/launchdarkly-server-sdk/src/feature_requester.rs +++ b/launchdarkly-server-sdk/src/feature_requester.rs @@ -1,6 +1,8 @@ use crate::reqwest::is_http_error_recoverable; +use bytes::Bytes; use futures::future::BoxFuture; -use hyper::Body; +use http_body_util::{BodyExt, Empty}; +use hyper_util::client::legacy::Client as HyperClient; use std::collections::HashMap; use std::sync::Arc; @@ -21,8 +23,8 @@ pub trait FeatureRequester: Send { } pub struct HyperFeatureRequester { - http: Arc>, - url: hyper::Uri, + http: Arc>>>, + url: http::Uri, sdk_key: String, cache: Option, default_headers: HashMap<&'static str, String>, @@ -30,8 +32,8 @@ pub struct HyperFeatureRequester { impl HyperFeatureRequester { pub fn new( - http: hyper::Client, - url: hyper::Uri, + http: HyperClient>>, + url: http::Uri, sdk_key: String, default_headers: HashMap<&'static str, String>, ) -> Self { @@ -47,7 +49,7 @@ impl HyperFeatureRequester { impl FeatureRequester for HyperFeatureRequester where - C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static, { fn get_all(&mut self) -> BoxFuture, FeatureRequesterError>> { Box::pin(async { @@ -73,8 +75,14 @@ where request_builder = request_builder.header("If-None-Match", cache.1.clone()); } + // Create empty body for GET request + let empty_body: http_body_util::combinators::BoxBody> = + Empty::::new() + .map_err(|e| Box::new(e) as Box) + .boxed(); + let result = http - .request(request_builder.body(Body::empty()).unwrap()) + .request(request_builder.body(empty_body).unwrap()) .await; let response = match result { @@ -82,7 +90,7 @@ where Err(e) => { // It appears this type of error will not be an HTTP error. // It will be a closed connection, aborted write, timeout, etc. - error!("An error occurred while retrieving flag information {}", e,); + error!("An error occurred while retrieving flag information {e}",); return Err(FeatureRequesterError::Temporary); } }; @@ -101,27 +109,26 @@ where .map_or_else(|_| "".into(), |s| s.into()); if response.status().is_success() { - let bytes = hyper::body::to_bytes(response.into_body()) - .await + let body_bytes = response.into_body().collect().await .map_err(|e| { error!( - "An error occurred while reading the polling response body: {}", - e + "An error occurred while reading the polling response body: {e}" ); FeatureRequesterError::Temporary - })?; - let json = serde_json::from_slice::>(bytes.as_ref()); + })? + .to_bytes(); + let json = serde_json::from_slice::>(body_bytes.as_ref()); return match json { Ok(all_data) => { if !etag.is_empty() { - debug!("Caching data for future use with etag: {}", etag); + debug!("Caching data for future use with etag: {etag}"); self.cache = Some(CachedEntry(all_data.clone(), etag)); } Ok(all_data) } Err(e) => { - error!("An error occurred while parsing the json response: {}", e); + error!("An error occurred while parsing the json response: {e}"); Err(FeatureRequesterError::Temporary) } }; @@ -246,9 +253,11 @@ mod tests { } } - fn build_feature_requester(url: String) -> HyperFeatureRequester { - let http = hyper::Client::builder().build(hyper::client::HttpConnector::new()); - let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url"); + fn build_feature_requester(url: String) -> HyperFeatureRequester { + use hyper_util::rt::TokioExecutor; + let connector = hyper_util::client::legacy::connect::HttpConnector::new(); + let http = HyperClient::builder(TokioExecutor::new()).build(connector); + let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url"); HyperFeatureRequester::new( http, diff --git a/launchdarkly-server-sdk/src/feature_requester_builders.rs b/launchdarkly-server-sdk/src/feature_requester_builders.rs index 1d7f1ee..f009393 100644 --- a/launchdarkly-server-sdk/src/feature_requester_builders.rs +++ b/launchdarkly-server-sdk/src/feature_requester_builders.rs @@ -1,12 +1,10 @@ use crate::feature_requester::{FeatureRequester, HyperFeatureRequester}; use crate::LAUNCHDARKLY_TAGS_HEADER; -use hyper::client::connect::Connection; -use hyper::service::Service; -use hyper::Uri; +use http::Uri; +use hyper_util::{client::legacy::Client as HyperClient, rt::TokioExecutor}; use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite}; /// Error type used to represent failures when building a [FeatureRequesterFactory] instance. #[non_exhaustive] @@ -29,19 +27,19 @@ pub trait FeatureRequesterFactory: Send { pub struct HyperFeatureRequesterBuilder { url: String, sdk_key: String, - http: hyper::Client, + http: HyperClient>>, } impl HyperFeatureRequesterBuilder where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { pub fn new(url: &str, sdk_key: &str, connector: C) -> Self { Self { - http: hyper::Client::builder().build(connector), + http: HyperClient::builder(TokioExecutor::new()).build(connector), url: url.into(), sdk_key: sdk_key.into(), } @@ -50,8 +48,8 @@ where impl FeatureRequesterFactory for HyperFeatureRequesterBuilder where - C: Service + Clone + Send + Sync + 'static, - C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin, + C: tower::Service + Clone + Send + Sync + 'static, + C::Response: hyper_util::client::legacy::connect::Connection + hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send + Unpin + 'static, C::Error: Into>, { @@ -64,7 +62,7 @@ where default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags); } - let url = hyper::Uri::from_str(url.as_str()) + let url = Uri::from_str(url.as_str()) .map_err(|_| BuildError::InvalidConfig("Invalid base url provided".into()))?; Ok(Box::new(HyperFeatureRequester::new( @@ -78,7 +76,7 @@ where #[cfg(test)] mod tests { - use hyper::client::HttpConnector; + use hyper_util::client::legacy::connect::HttpConnector; use super::*; diff --git a/launchdarkly-server-sdk/src/migrations/mod.rs b/launchdarkly-server-sdk/src/migrations/mod.rs index 91f9541..c3dea2c 100644 --- a/launchdarkly-server-sdk/src/migrations/mod.rs +++ b/launchdarkly-server-sdk/src/migrations/mod.rs @@ -79,7 +79,7 @@ impl TryFrom for Stage { "live" => Ok(Stage::Live), "rampdown" => Ok(Stage::Rampdown), "complete" => Ok(Stage::Complete), - _ => Err(format!("Invalid stage: {}", value)), + _ => Err(format!("Invalid stage: {value}")), } } else { Err("Cannot convert non-string value to Stage".to_string()) diff --git a/launchdarkly-server-sdk/src/migrations/tracker.rs b/launchdarkly-server-sdk/src/migrations/tracker.rs index 547d294..d1ec6b1 100644 --- a/launchdarkly-server-sdk/src/migrations/tracker.rs +++ b/launchdarkly-server-sdk/src/migrations/tracker.rs @@ -140,15 +140,13 @@ impl MigrationOpTracker { if self.errors.contains(origin) { return Err(format!( - "provided error for origin {:?} without recording invocation", - origin + "provided error for origin {origin:?} without recording invocation" )); } if self.latencies.contains_key(origin) { return Err(format!( - "provided latency for origin {:?} without recording invocation", - origin + "provided latency for origin {origin:?} without recording invocation" )); } } diff --git a/launchdarkly-server-sdk/src/stores/persistent_store_wrapper.rs b/launchdarkly-server-sdk/src/stores/persistent_store_wrapper.rs index a2831e6..f54feea 100644 --- a/launchdarkly-server-sdk/src/stores/persistent_store_wrapper.rs +++ b/launchdarkly-server-sdk/src/stores/persistent_store_wrapper.rs @@ -145,14 +145,14 @@ impl Store for PersistentDataStoreWrapper { item.into() } Err(e) => { - warn!("failed to convert serialized item into flag: {}", e); + warn!("failed to convert serialized item into flag: {e}"); None } } } Ok(None) => None, Err(e) => { - warn!("persistent store failed to retrieve flag: {}", e); + warn!("persistent store failed to retrieve flag: {e}"); None } } @@ -173,14 +173,14 @@ impl Store for PersistentDataStoreWrapper { item.into() } Err(e) => { - warn!("failed to convert serialized item into segment: {}", e); + warn!("failed to convert serialized item into segment: {e}"); None } } } Ok(None) => None, Err(e) => { - warn!("persistent store failed to retrieve segment: {}", e); + warn!("persistent store failed to retrieve segment: {e}"); None } } @@ -196,8 +196,7 @@ impl DataStore for PersistentDataStoreWrapper { match serialized_data { Err(e) => warn!( - "failed to deserialize payload; cannot initialize store {}", - e + "failed to deserialize payload; cannot initialize store {e}" ), Ok(data) => { let result = self.store.init(data); @@ -208,7 +207,7 @@ impl DataStore for PersistentDataStoreWrapper { self.cache_items(all_data.into()); } Err(e) => { - error!("failed to init store: {}", e); + error!("failed to init store: {e}"); if self.flags.cache_is_infinite() { debug!("updating non-expiring cache"); self.cache_items(all_data.into()) @@ -249,13 +248,13 @@ impl DataStore for PersistentDataStoreWrapper { HashMap::from_iter(flag_iter) } Err(e) => { - warn!("failed to convert serialized items into flags: {}", e); + warn!("failed to convert serialized items into flags: {e}"); HashMap::new() } } } Err(e) => { - warn!("persistent store failed to retrieve all flags: {}", e); + warn!("persistent store failed to retrieve all flags: {e}"); HashMap::new() } } @@ -267,7 +266,7 @@ impl DataStore for PersistentDataStoreWrapper { PatchTarget::Segment(item) => self.upsert_segment(key, item), PatchTarget::Other(v) => Err(UpdateError::InvalidTarget( "flag or segment".to_string(), - format!("{:?}", v), + format!("{v:?}"), )), } } diff --git a/launchdarkly-server-sdk/src/stores/store.rs b/launchdarkly-server-sdk/src/stores/store.rs index 28d87c2..a4e4d4b 100644 --- a/launchdarkly-server-sdk/src/stores/store.rs +++ b/launchdarkly-server-sdk/src/stores/store.rs @@ -106,7 +106,7 @@ impl DataStore for InMemoryDataStore { } PatchTarget::Other(v) => Err(UpdateError::InvalidTarget( "flag or segment".to_string(), - format!("{:?}", v), + format!("{v:?}"), )), } }