You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi all!
I am quite new to Rust, so please excuse my (possibly) fairly obvious question.
Currently I am trying to design an application that processes Kafka messages as fast as possible.
Using a simple StreamConsumer I can read about 800 messages per second:
async fn consume_messages<'a>(
num_messages: u32,
mut stream: MessageStream<'a, DefaultConsumerContext>,
) {
for _ in 0..num_messages {
let message = stream.next().await.unwrap().unwrap();
let key: &str = message.key_view().unwrap().unwrap();
println!("ID {} with offset {}", key, message.offset())
}
}
async fn main() {
dotenv().ok();
let num_messages: u32 = 10000;
let consumer = configure_consumer();
let now = Instant::now();
consume_messages(num_messages, consumer.stream()).await;
let elapsed = now.elapsed();
println!("Elapsed: {:.2?}", elapsed);
}
The processing I want to do is very light (atm I am only printing the keys out), I feel like there should be a lot of potential left on the table. E.g. with a comparable Java application I was able to consume over 4000 messages per second.
I've been trying to leverage parallelism and concurrency using threads and Tokio tasks, but without success.
From the documentation I deduce that I should (in theory) be able to create multiple worker threads that all share one MessageStream:
"It is legal to have multiple live message streams for the same consumer, and to move those message streams across threads. Note, however, that the message streams share the same underlying state. A message received by the consumer will be delivered to only one of the live message streams. If you seek the underlying consumer, all message streams created from the consumer will begin to draw messages from the new position of the consumer."
I have tried all sorts of things like:
#[tokio::main]
async fn main() {
dotenv().ok();
let num_messages: u32 = 10000;
let consumer = configure_consumer();
let now = Instant::now();
let mut handles = JoinSet::new();
for i in 0..6 {
handles.spawn(consume_messages(num_messages, consumer.stream()));
}
while let Some(res) = handles.join_next().await {}
let elapsed = now.elapsed();
println!("Elapsed: {:.2?}", elapsed);
}
But nothing seems to work, for one reason or another. In the example above, Rusts lifetime rules rules prevent it from compiling (and that's a good thing, if you know how to work with it properly - which apparently I don't).
Could someone give me an example of a proper multi-threaded consumer application or point me in the right direction on how to achieve better performance?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi all!
I am quite new to Rust, so please excuse my (possibly) fairly obvious question.
Currently I am trying to design an application that processes Kafka messages as fast as possible.
Using a simple StreamConsumer I can read about 800 messages per second:
The processing I want to do is very light (atm I am only printing the keys out), I feel like there should be a lot of potential left on the table. E.g. with a comparable Java application I was able to consume over 4000 messages per second.
I've been trying to leverage parallelism and concurrency using threads and Tokio tasks, but without success.
From the documentation I deduce that I should (in theory) be able to create multiple worker threads that all share one MessageStream:
"It is legal to have multiple live message streams for the same consumer, and to move those message streams across threads. Note, however, that the message streams share the same underlying state. A message received by the consumer will be delivered to only one of the live message streams. If you seek the underlying consumer, all message streams created from the consumer will begin to draw messages from the new position of the consumer."
I have tried all sorts of things like:
But nothing seems to work, for one reason or another. In the example above, Rusts lifetime rules rules prevent it from compiling (and that's a good thing, if you know how to work with it properly - which apparently I don't).
Could someone give me an example of a proper multi-threaded consumer application or point me in the right direction on how to achieve better performance?
Thank you in advance!
Beta Was this translation helpful? Give feedback.
All reactions