From fa789d64c2c7772dcd1381eee613a50943f9c05a Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 16 Dec 2024 13:38:26 -0800 Subject: [PATCH 1/2] ref(kafka): Remove incremental rebalancing --- src/consumer/kafka.rs | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index c63d5a5..b6ce692 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -359,32 +359,15 @@ pub async fn handle_events( unreachable!("Got partition revocation before the consumer has started") } (ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped, - (ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => { - assert!( - tpl.is_disjoint(&assigned), - "Newly assigned TPL should be disjoint from TPL we're consuming from" - ); - tpl.append(&mut assigned); - debug!( - "{} additional topic partitions added after assignment", - assigned.len() - ); - handles.shutdown(CALLBACK_DURATION).await; - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + (ConsumerState::Consuming(_, _), Event::Assign(_)) => { + unreachable!("Got partition assignment after the consumer has started") } - (ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked)) => { + (ConsumerState::Consuming(_, tpl), Event::Revoke(revoked)) => { assert!( - tpl.is_subset(&revoked), - "Revoked TPL should be a subset of TPL we're consuming from" + tpl == revoked, + "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - tpl.retain(|e| !revoked.contains(e)); - debug!("{} topic partitions remaining after revocation", tpl.len()); - handles.shutdown(CALLBACK_DURATION).await; - if tpl.is_empty() { - ConsumerState::Ready - } else { - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) - } + ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => { handles.shutdown(CALLBACK_DURATION).await; From 9b0bf20c5e3d5948de346d43d08989c908dc1eab Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 16 Dec 2024 13:59:46 -0800 Subject: [PATCH 2/2] oops, forgot to shutdown on revoke --- src/consumer/kafka.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index b6ce692..5f7a310 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -362,11 +362,12 @@ pub async fn handle_events( (ConsumerState::Consuming(_, _), Event::Assign(_)) => { unreachable!("Got partition assignment after the consumer has started") } - (ConsumerState::Consuming(_, tpl), Event::Revoke(revoked)) => { + (ConsumerState::Consuming(handles, tpl), Event::Revoke(revoked)) => { assert!( tpl == revoked, "Revoked TPL should be equal to the subset of TPL we're consuming from" ); + handles.shutdown(CALLBACK_DURATION).await; ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => {