Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) {

if (rd_kafka_buf_ApiVersion(rkbuf) >= 9) {
/* CurrentLeaderEpoch */
rd_kafka_toppar_lock(rktp);
if (rktp->rktp_leader_epoch < 0 &&
rd_kafka_has_reliable_leader_epochs(rkb)) {
/* If current leader epoch is set to -1 and
Comment on lines +1109 to 1112
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is released before writing to the buffer at line 1128 (FetchOffset), which may also access rktp fields. Verify that the fetch offset write at line 1128 doesn't require the same lock protection, or if it does, the unlock should be moved after that operation.

Copilot uses AI. Check for mistakes.
Expand All @@ -1121,6 +1122,7 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) {
rd_kafka_buf_write_i32(rkbuf,
rktp->rktp_leader_epoch);
}
rd_kafka_toppar_unlock(rktp);
Comment on lines 1122 to +1125
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is released before writing to the buffer at line 1128 (FetchOffset), which may also access rktp fields. Verify that the fetch offset write at line 1128 doesn't require the same lock protection, or if it does, the unlock should be moved after that operation.

Copilot uses AI. Check for mistakes.
}
/* FetchOffset */
rd_kafka_buf_write_i64(rkbuf,
Expand Down