Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed Dec 27, 2024
1 parent 0130119 commit ff1d9d9
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions lading/src/generator/splunk_hec/acknowledgements.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use core::slice;
use std::time::Duration;

use bytes::Bytes;
use futures::Future;
use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri};
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::Deserialize;
Expand Down Expand Up @@ -87,7 +92,7 @@ impl Channels {
token: String,
ack_settings: AckSettings,
) {
let client: Client<HttpConnector, Body> = Client::builder()
let client = Client::builder(TokioExecutor::new())
.retry_canceled_requests(false)
.build_http();

Expand Down Expand Up @@ -130,7 +135,7 @@ impl<'a, V> Iterator for Iter<'a, V> {
struct AckService {
pub(crate) ack_uri: Uri,
pub(crate) token: String,
pub(crate) client: Client<HttpConnector, Body>,
pub(crate) client: Client<HttpConnector, BoxBody<Bytes, hyper::Error>>,
pub(crate) ack_settings: AckSettings,
}

Expand Down Expand Up @@ -167,11 +172,11 @@ impl AckService {
if ack_ids.is_empty() {
debug!("tick expired with no acks");
} else {
let body = Body::from(
let body = crate::full(
serde_json::json!({ "acks": ack_ids.keys().collect::<Vec<&u64>>() })
.to_string(),
);
let request: Request<Body> = Request::builder()
let request = Request::builder()
.method(Method::POST)
.uri(self.ack_uri.clone())
.header(AUTHORIZATION, format!("Splunk {}", self.token))
Expand All @@ -191,8 +196,8 @@ impl AckService {
}

async fn ack_request(
client: Client<HttpConnector>,
request: Request<Body>,
client: Client<HttpConnector, BoxBody<Bytes, hyper::Error>>,
request: Request<BoxBody<Bytes, hyper::Error>>,
channel_id: String,
ack_ids: &mut FxHashMap<AckId, u64>,
) -> Result<(), Error> {
Expand All @@ -202,7 +207,7 @@ async fn ack_request(
let status = parts.status;
counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1);
if status == StatusCode::OK {
let body = body.collect().await?.to_bytes();
let body = body.boxed().collect().await?.to_bytes();
let ack_status = serde_json::from_slice::<HecAckStatusResponse>(&body)?;

let mut ack_ids_acked: u32 = 0;
Expand Down

0 comments on commit ff1d9d9

Please sign in to comment.