Skip to content

Add support for listing Kafka offsets in bulk #26168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

pmw-rp
Copy link

@pmw-rp pmw-rp commented Jul 10, 2025

Description

This PR modifies how the Trino Kafka integration performs translation of timestamps into offsets.

The current implementation makes a Kafka API call per partition to translate the timestamp, however the API can accept a list of partitions as part of the call, allowing for a bulk translation.

By changing the call to a bulk operation, the number of API calls can be significantly reduced, improving query startup time.

Release notes

(X) This is not user-visible or is docs only, and no release notes are required.

Since the only impact for end users is increased query performance, release notes are probably optional.

Copy link

cla-bot bot commented Jul 10, 2025

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@github-actions github-actions bot added the kafka Kafka connector label Jul 10, 2025
@findinpath findinpath requested a review from wendigo July 10, 2025 20:41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change commit comment to
pull all partition offsets in a single call to Kafka. -> Retrieve in bulk partition offsets

@@ -37,6 +37,7 @@
import org.apache.kafka.common.config.ConfigResource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Squash the two commits into one

@@ -37,6 +37,7 @@
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description

"By changing the call to a bulk operation, the number of API calls can be significantly reduced, improving query startup time."

please add some specific numbers to add the reviewers understand the impact of this change.

@findinpath
Copy link
Contributor

findinpath commented Jul 10, 2025

https://github.com/trinodb/trino/actions/runs/16201941001/job/45742893962?pr=26168

Commit 97525136936f7faffd10b4ed3519939d170416e1 is an merge commit: https://api.github.com/repos/trinodb/trino/commits/97525136936f7faffd10b4ed3519939d170416e1
Error: PR requires a rebase. Found: 1 merge commits.
git rebase origin/master

Comment on lines +189 to +195
Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsetAndTimestamps.forEach((topicPartition, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
topicPartitionOffsets.put(topicPartition, offsetAndTimestamp.offset());
}
});
return topicPartitionOffsets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return topicPartitionOffsetAndTimestamps.entrySet().stream()
                .filter(entry -> entry.getValue() != null)
                .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));

Comment on lines +126 to +129
Map<TopicPartition, Long> partitionBeginTimestamps = new HashMap<>();
partitionBeginOffsets.forEach((partition, partitionIndex) -> {
partitionBeginTimestamps.put(partition, offsetTimestampRanged.get().begin());
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                        long partitionBeginTimestamp = floorDiv(offsetTimestampRanged.get().begin(), MICROSECONDS_PER_MILLISECOND);
                        Map<TopicPartition, Long> partitionBeginTimestamps = partitionBeginOffsets.entrySet().stream()
                                .collect(Collectors.toMap(Map.Entry::getKey,  _ -> partitionBeginTimestamp));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to mutate the map anymore

timestamps.replaceAll((k, v) -> floorDiv(v, MICROSECONDS_PER_MILLISECOND));

in findOffsetsForTimestampGreaterOrEqual method.

@@ -172,11 +182,17 @@ private boolean isTimestampUpperBoundPushdownEnabled(ConnectorSession session, S
return KafkaSessionProperties.isTimestampUpperBoundPushdownEnabled(session);
}

private static Optional<Long> findOffsetsForTimestampGreaterOrEqual(KafkaConsumer<byte[], byte[]> kafkaConsumer, TopicPartition topicPartition, long timestamp)
private static Map<TopicPartition, Long> findOffsetsForTimestampGreaterOrEqual(KafkaConsumer<byte[], byte[]> kafkaConsumer, Map<TopicPartition, Long> timestamps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: Maybe we could think rather returning Map<TopicPartition, Optional<Long> instead

It is better to avoid having null values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kafka Kafka connector
Development

Successfully merging this pull request may close these issues.

2 participants