From f2525c554105ff43e499d8a1f7e7d8fe2d3bbc35 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 27 Nov 2025 11:14:05 +0530 Subject: [PATCH 1/3] upgrade version from v2 to v3 --- src/rdkafka_offset.c | 4 +++- src/rdkafka_request.c | 7 ++++++- src/rdkafka_request.h | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index cf21d60c55..b9182f1725 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -1118,6 +1118,7 @@ static rd_kafka_op_res_t rd_kafka_offset_validate_op_cb(rd_kafka_t *rk, void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { rd_kafka_topic_partition_list_t *parts; rd_kafka_topic_partition_t *rktpar; + int32_t ReplicaId = -1; /* Consumer replica id */ char reason[512]; va_list ap; @@ -1219,7 +1220,8 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), reason); rd_kafka_OffsetForLeaderEpochRequest( - rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), + rktp->rktp_leader, ReplicaId, parts, + RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), rd_kafka_toppar_handle_OffsetForLeaderEpoch, rktp); rd_kafka_topic_partition_list_destroy(parts); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae3..88a424d015 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1118,6 +1118,7 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( */ void rd_kafka_OffsetForLeaderEpochRequest( rd_kafka_broker_t *rkb, + int32_t replica_id, rd_kafka_topic_partition_list_t *parts, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, @@ -1126,7 +1127,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( int16_t ApiVersion; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL); + rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 3, NULL); /* If the supported ApiVersions are not yet known, * or this broker doesn't support it, we let this request * succeed or fail later from the broker thread where the @@ -1149,6 +1150,10 @@ void rd_kafka_OffsetForLeaderEpochRequest( /* LeaderEpoch */ RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + + if (ApiVersion >= 3) { + rd_kafka_buf_write_i32(rkbuf, replica_id); + } rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/, diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaaf..a2392ea639 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -240,6 +240,7 @@ rd_kafka_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **offsets); void rd_kafka_OffsetForLeaderEpochRequest( rd_kafka_broker_t *rkb, + int32_t replica_id, rd_kafka_topic_partition_list_t *parts, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, From cf6579686c3b7f500d72bc52f5b7fc8739c4e260 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 27 Nov 2025 14:37:26 +0530 Subject: [PATCH 2/3] Upgrade from v3 -> v4 --- src/rdkafka_mock_handlers.c | 34 +++++++++++++++++++++++++++++++++- src/rdkafka_request.c | 10 +++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecceb..6e9cad7997 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2460,6 +2460,14 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf); rd_kafka_resp_err_t err; int32_t TopicsCnt, i; + int32_t ReplicaId = -1; + int32_t ApiVersion; + + ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; + + if (ApiVersion >= 3) { + rd_kafka_buf_read_i32(rkbuf, &ReplicaId); + } /* Response: ThrottleTimeMs */ rd_kafka_buf_write_i32(resp, 0); @@ -2505,6 +2513,10 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, /* LeaderEpoch */ rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch); + /* Skip partition tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + mpart = rd_kafka_mock_partition_find(mtopic, Partition); if (!err && !mpart) err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; @@ -2528,9 +2540,29 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i32(resp, LeaderEpoch); /* Response: Partition */ rd_kafka_buf_write_i64(resp, EndOffset); + + /* Response: Partition tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); } + + /* Skip topic tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + + /* Response: Topic tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); } + /* Skip top-level tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + + /* Response: Top-level tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); + rd_kafka_mock_connection_send_response(mconn, resp); return 0; @@ -3022,7 +3054,7 @@ const struct rd_kafka_mock_api_handler rd_kafka_mock_handle_TxnOffsetCommit}, [RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn}, [RD_KAFKAP_OffsetForLeaderEpoch] = - {2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, + {2, 4, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = {1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat}, [RD_KAFKAP_GetTelemetrySubscriptions] = diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 88a424d015..ffe63bd9d4 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1101,6 +1101,10 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( if (!*offsets) goto err_parse; + if (ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + return RD_KAFKA_RESP_ERR_NO_ERROR; err: @@ -1127,7 +1131,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( int16_t ApiVersion; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 3, NULL); + rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 4, NULL); /* If the supported ApiVersions are not yet known, * or this broker doesn't support it, we let this request * succeed or fail later from the broker thread where the @@ -1159,6 +1163,10 @@ void rd_kafka_OffsetForLeaderEpochRequest( rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/, rd_true /*use topic name*/, fields); + if (ApiVersion >= 4) { + rd_kafka_buf_write_tags_empty(rkbuf); + } + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); /* Let caller perform retries */ From e9126293e572e70bc339db98d2d81b73f95517ab Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 27 Nov 2025 15:01:01 +0530 Subject: [PATCH 3/3] version change minor --- src/rdkafka_mock_handlers.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 6e9cad7997..f0c246b223 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3054,7 +3054,7 @@ const struct rd_kafka_mock_api_handler rd_kafka_mock_handle_TxnOffsetCommit}, [RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn}, [RD_KAFKAP_OffsetForLeaderEpoch] = - {2, 4, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, + {2, 4, 4, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = {1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat}, [RD_KAFKAP_GetTelemetrySubscriptions] =