-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not available. #20665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not available. #20665
Conversation
} | ||
} catch (final KafkaException fatal) { | ||
throw new StreamsException(fatal); | ||
if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change that reworked that code to use cached offsets: https://github.com/apache/kafka/pull/17091/files#diff-a76674468cda8772230fb8411717cf9068b1a363a792f32c602fb2ec5ba9efd7R472
@aliehsaeedii @lucasbru, as you folks worked on it, could you please take a look? thanks
break; | ||
return partitionsNeedCommit.stream() | ||
.flatMap(partition -> findOffsetAndMetadata(partition) | ||
.map(offsetAndMetadata -> Map.entry(partition, offsetAndMetadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If findOffsetAndMetadata
returns an empty Optional
, the partition will be dropped, and we does not end up in the computed "committableOffsets" Map
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. if findOffsetAndMetadata
returns an empty optional, that topic partition will not be in the result list
// This indicates a bug and thus we rethrow it as fatal `IllegalStateException` | ||
throw new IllegalStateException("Stream task " + id + " does not know the partition: " + partition); | ||
} | ||
} catch (final KafkaException fatal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as there are no client calls anymore, the catch is redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice slide cleanup.
} | ||
break; | ||
return partitionsNeedCommit.stream() | ||
.flatMap(partition -> findOffsetAndMetadata(partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java streams API always confused me. Why do we use flatMap
here? Don't we get a single entry in "committableOffsets" Map that we compute here? Or is it needed, because we might drop some partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the path here is:
a stream of partitionsNeedCommit -> flatMap(partition -> to offsetAndMetadata optional -> map optional to map.entry optional -> map it to a stream) -> collect
the reason why we map the the optional to a stream is that it looks nicer. otherwise we would need to have two operations: filter(Optonal::ifPresent).map(Optional::get).
if you prefer that syntax, I will update the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subjective. :) If would it personally find easy to read using filter(...).map(...)
; it's more explicit, and less "magic"
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Show resolved
Hide resolved
} catch (final KafkaException fatal) { | ||
throw new StreamsException(fatal); | ||
if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) { | ||
final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find the constainsKey()
followed by get()
pattern always confusing and hard to read... How about
final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
if (offsetAndMetadata != null) {
offset = offsetAndMetadata.offset();
leaderEpoch = offsetAndMetadata.leaderEpoch();
}
Just a personal preference.
...ion-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
Outdated
Show resolved
Hide resolved
…ms/integration/RegexSourceIntegrationTest.java Co-authored-by: Matthias J. Sax <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not
available.
Before we added caching for consumer next offsets we'd called
mainConsumer.position
and always expected something back. When weadded the caching, we kept the check that we always have nextOffset, but
as the logic changed to fetching the offsets from poll, we may not have
anything for topics that have no messages. This PR accounts for that.