Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,9 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset + 1;
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset)))
/* Only update position for messages that are not EOF */
if (unlikely(rktp && (rktp->rktp_app_pos.offset < offset) &&
(rkmessages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)))
rd_kafka_update_app_pos(
rk, rktp,
RD_KAFKA_FETCH_POS(
Expand Down
129 changes: 129 additions & 0 deletions tests/0137-barrier_batch_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,141 @@ static void do_test_consume_batch_control_msgs(void) {
}


/**
* @brief Test that rd_kafka_consume_batch_queue correctly updates consumer
* position when EOF messages are received with enable.partition.eof=true.
*
* This is a regression test for the bug where EOF messages incorrectly
* advanced the consumer position by 2 instead of 1 (last_offset + 2 instead
* of last_offset + 1).
*/
static void do_test_consume_batch_eof_position(void) {
const char *topic;
rd_kafka_t *consumer;
rd_kafka_conf_t *conf;
rd_kafka_queue_t *rkq;
uint64_t testid;
const int partition_cnt = 1;
const int partition = 0;
const int produce_msg_cnt = 5;
const int consume_msg_cnt = 10;
const int timeout_ms = 5000;
const int session_timeout_s = 60;
const int replication_factor = -1;
const int topic_creation_timeout_ms = 5000;
rd_kafka_message_t **rkmessages;
int msg_cnt, i;
int64_t last_real_offset = -1;
int64_t eof_offset = -1;
int64_t position_after_eof;
rd_kafka_topic_partition_list_t *positions;
rd_kafka_resp_err_t err;
int eof_received = 0;

SUB_TEST("Testing EOF position with consume_batch_queue");

/* Create consumer configuration with enable.partition.eof=true */
test_conf_init(&conf, NULL, session_timeout_s);
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "enable.partition.eof", "true");

testid = test_id_generate();

topic = test_mk_topic_name("0137-barrier_batch_consume", 1);

/* Create topic */
test_create_topic_wait_exists(NULL, topic, partition_cnt,
replication_factor,
topic_creation_timeout_ms);
test_produce_msgs_easy(topic, testid, partition, produce_msg_cnt);

consumer = test_create_consumer(topic, NULL, conf, NULL);
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);

/* Consume messages in batches until we get EOF */
rkmessages = malloc(consume_msg_cnt * sizeof(*rkmessages));

while (!eof_received) {
msg_cnt = (int)rd_kafka_consume_batch_queue(
rkq, timeout_ms, rkmessages, consume_msg_cnt);

TEST_ASSERT(msg_cnt >= 0, "consume_batch_queue failed");

if (msg_cnt == 0) {
TEST_WARN("No messages received, retrying...");
continue;
}

/* Check if EOF messages are received */
for (i = 0; i < msg_cnt; i++) {
rd_kafka_message_t *rkm = rkmessages[i];

if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
eof_received = 1;
eof_offset = rkm->offset;
TEST_SAY("Received EOF at offset %" PRId64 "\n",
eof_offset);
} else if (!rkm->err) {
last_real_offset = rkm->offset;
}
}

/* Destroy messages */
for (i = 0; i < msg_cnt; i++)
rd_kafka_message_destroy(rkmessages[i]);
}

rd_free(rkmessages);

/* Test that the last real message offset is the expected value */
TEST_ASSERT(last_real_offset == produce_msg_cnt - 1,
"Expected last message offset %" PRId64 ", got %" PRId64,
(int64_t)(produce_msg_cnt - 1), last_real_offset);

/* Get consumer position after EOF */
positions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(positions, topic, partition);

err = rd_kafka_position(consumer, positions);
TEST_ASSERT(!err, "rd_kafka_position failed: %s", rd_kafka_err2str(err));

/* Get consumer position after EOF */
position_after_eof = positions->elems[0].offset;

TEST_SAY(
"Last real message offset: %" PRId64 "\n"
"EOF offset: %" PRId64 "\n"
"Position after EOF: %" PRId64 "\n",
last_real_offset, eof_offset, position_after_eof);

TEST_ASSERT(
position_after_eof == last_real_offset + 1,
"Position after EOF should be %" PRId64 " (last_offset + 1), "
"but got %" PRId64 ". This indicates the EOF offset bug where "
"position incorrectly advances by +2 instead of +1",
last_real_offset + 1, position_after_eof);

rd_kafka_topic_partition_list_destroy(positions);
rd_kafka_queue_destroy(rkq);
test_consumer_close(consumer);
rd_kafka_destroy(consumer);

SUB_TEST_PASS();
}


int main_0137_barrier_batch_consume(int argc, char **argv) {
do_test_consume_batch_with_seek();
do_test_consume_batch_store_offset();
do_test_consume_batch_with_pause_and_resume_different_batch();
do_test_consume_batch_with_pause_and_resume_same_batch();
do_test_consume_batch_control_msgs();
do_test_consume_batch_eof_position();

return 0;
}