-
-
Notifications
You must be signed in to change notification settings - Fork 0
wip(taskbroker): Add consumer step to resend delayed task to kafka #297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #297 +/- ##
==========================================
- Coverage 84.30% 82.22% -2.08%
==========================================
Files 20 21 +1
Lines 3771 3866 +95
==========================================
Hits 3179 3179
- Misses 592 687 +95 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c4367d5
to
1aa01ea
Compare
1aa01ea
to
9c42502
Compare
store: Arc<InflightActivationStore>, | ||
batch: Option<Vec<InflightActivation>>, | ||
} | ||
|
||
impl InflightActivationWriter { | ||
pub fn new(store: Arc<InflightActivationStore>, config: ActivationWriterConfig) -> Self { | ||
impl ActivationStoreWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should deprecate 'inflightactivation' 🤔 we'll need a term to disambiguate the protobuf form and the sqlite mapped struct though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
let (ready_activations, delayed_activations): (Vec<_>, Vec<_>) = take(&mut self.batch) | ||
.unwrap() | ||
.into_iter() | ||
.partition(|_| true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a field in the protobuf to make this work for real. Options I've thought of are countdown
and runafter
🤷
let deliveries = payloads | ||
.into_iter() | ||
.map(|payload| { | ||
let producer = self.producer.clone(); | ||
let config = self.config.clone(); | ||
async move { | ||
producer | ||
.send( | ||
FutureRecord::<(), Vec<u8>>::to(&config.kafka_topic) | ||
.payload(&payload), | ||
Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), | ||
) | ||
.await | ||
} | ||
}) | ||
.collect::<FuturesUnordered<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this pattern in a few places now, should we try making a function for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should, they're all slightly different tho. So we need to be clever with our abstraction.
TODO: