Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ openssl-probe = { version = "0.1.6", default-features = false }
ordered-float.workspace = true
percent-encoding = { version = "2.3.1", default-features = false }
postgres-openssl = { version = "0.5.1", default-features = false, features = ["runtime"], optional = true }
pulsar = { version = "6.3.1", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
pulsar = { version = "6.5.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
quick-junit = { version = "0.5.1" }
rand.workspace = true
rand_distr.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/pulsar/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ impl PulsarSinkConfig {
batch_size: self.batch.max_events,
batch_byte_size: self.batch.max_bytes,
compression: None,
batch_timeout: None,
block_queue_if_full: false,
routing_policy: None,
};

match &self.compression {
Expand Down
5 changes: 5 additions & 0 deletions src/sinks/pulsar/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl MetaDescriptive for PulsarRequest {
}
}

#[derive(Clone)]
pub struct PulsarService<Exe: Executor> {
// NOTE: the reason for the Mutex here is because the `Producer` from the pulsar crate
// needs to be `mut`, and the `Service::call()` returns a Future.
Expand All @@ -81,6 +82,10 @@ impl<Exe: Executor> PulsarService<Exe> {
producer: Arc::new(Mutex::new(producer)),
}
}

pub(crate) fn get_producer(&self) -> Arc<Mutex<MultiTopicProducer<Exe>>> {
Arc::clone(&self.producer)
}
}

impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
Expand Down
21 changes: 19 additions & 2 deletions src/sinks/pulsar/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,14 @@ impl PulsarSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let service = ServiceBuilder::new().service(self.service.clone());
let request_builder = PulsarRequestBuilder {
encoder: PulsarEncoder {
transformer: self.transformer.clone(),
encoder: self.encoder.clone(),
},
};

let sink = input
.filter_map(|event| {
std::future::ready(util::make_pulsar_event(
Expand All @@ -124,7 +125,23 @@ impl PulsarSink {
.into_driver(service)
.protocol("tcp");

sink.run().await
let result = sink.run().await;

debug!(message = "Receiver exhausted, closing Pulsar producer.");

// Fermer tous les producers pour chaque topic
let producer_arc = self.service.get_producer();
let mut producer = producer_arc.lock().await;
let topics = producer.topics();
for topic in topics {
if let Err(e) = producer.close_producer(&topic).await {
warn!(message = "Error closing Pulsar producer for topic.", topic = %topic, error = ?e);
} else {
debug!(message = "Pulsar producer closed successfully for topic.", topic = %topic);
}
}

result
}
}

Expand Down
Loading