diff --git a/lading/src/generator/splunk_hec/acknowledgements.rs b/lading/src/generator/splunk_hec/acknowledgements.rs index ab13629d7..2c9a9301d 100644 --- a/lading/src/generator/splunk_hec/acknowledgements.rs +++ b/lading/src/generator/splunk_hec/acknowledgements.rs @@ -1,9 +1,14 @@ use core::slice; use std::time::Duration; +use bytes::Bytes; use futures::Future; use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri}; -use hyper::{body::HttpBody, client::HttpConnector, Body, Client}; +use http_body_util::{combinators::BoxBody, BodyExt}; +use hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, +}; use metrics::counter; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -87,7 +92,7 @@ impl Channels { token: String, ack_settings: AckSettings, ) { - let client: Client = Client::builder() + let client = Client::builder(TokioExecutor::new()) .retry_canceled_requests(false) .build_http(); @@ -130,7 +135,7 @@ impl<'a, V> Iterator for Iter<'a, V> { struct AckService { pub(crate) ack_uri: Uri, pub(crate) token: String, - pub(crate) client: Client, + pub(crate) client: Client>, pub(crate) ack_settings: AckSettings, } @@ -167,11 +172,11 @@ impl AckService { if ack_ids.is_empty() { debug!("tick expired with no acks"); } else { - let body = Body::from( + let body = crate::full( serde_json::json!({ "acks": ack_ids.keys().collect::>() }) .to_string(), ); - let request: Request = Request::builder() + let request = Request::builder() .method(Method::POST) .uri(self.ack_uri.clone()) .header(AUTHORIZATION, format!("Splunk {}", self.token)) @@ -191,8 +196,8 @@ impl AckService { } async fn ack_request( - client: Client, - request: Request, + client: Client>, + request: Request>, channel_id: String, ack_ids: &mut FxHashMap, ) -> Result<(), Error> { @@ -202,7 +207,7 @@ async fn ack_request( let status = parts.status; counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1); if status == StatusCode::OK { - let body = body.collect().await?.to_bytes(); + let body = body.boxed().collect().await?.to_bytes(); let ack_status = serde_json::from_slice::(&body)?; let mut ack_ids_acked: u32 = 0;