diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecceb..f0c246b223 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2460,6 +2460,14 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf); rd_kafka_resp_err_t err; int32_t TopicsCnt, i; + int32_t ReplicaId = -1; + int32_t ApiVersion; + + ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; + + if (ApiVersion >= 3) { + rd_kafka_buf_read_i32(rkbuf, &ReplicaId); + } /* Response: ThrottleTimeMs */ rd_kafka_buf_write_i32(resp, 0); @@ -2505,6 +2513,10 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, /* LeaderEpoch */ rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch); + /* Skip partition tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + mpart = rd_kafka_mock_partition_find(mtopic, Partition); if (!err && !mpart) err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; @@ -2528,9 +2540,29 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i32(resp, LeaderEpoch); /* Response: Partition */ rd_kafka_buf_write_i64(resp, EndOffset); + + /* Response: Partition tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); } + + /* Skip topic tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + + /* Response: Topic tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); } + /* Skip top-level tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + + /* Response: Top-level tags for v4+ */ + if (ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); + rd_kafka_mock_connection_send_response(mconn, resp); return 0; @@ -3022,7 +3054,7 @@ const struct rd_kafka_mock_api_handler rd_kafka_mock_handle_TxnOffsetCommit}, [RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn}, [RD_KAFKAP_OffsetForLeaderEpoch] = - {2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, + {2, 4, 4, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = {1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat}, [RD_KAFKAP_GetTelemetrySubscriptions] = diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index cf21d60c55..b9182f1725 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -1118,6 +1118,7 @@ static rd_kafka_op_res_t rd_kafka_offset_validate_op_cb(rd_kafka_t *rk, void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { rd_kafka_topic_partition_list_t *parts; rd_kafka_topic_partition_t *rktpar; + int32_t ReplicaId = -1; /* Consumer replica id */ char reason[512]; va_list ap; @@ -1219,7 +1220,8 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), reason); rd_kafka_OffsetForLeaderEpochRequest( - rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), + rktp->rktp_leader, ReplicaId, parts, + RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), rd_kafka_toppar_handle_OffsetForLeaderEpoch, rktp); rd_kafka_topic_partition_list_destroy(parts); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae3..ffe63bd9d4 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1101,6 +1101,10 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( if (!*offsets) goto err_parse; + if (ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + return RD_KAFKA_RESP_ERR_NO_ERROR; err: @@ -1118,6 +1122,7 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( */ void rd_kafka_OffsetForLeaderEpochRequest( rd_kafka_broker_t *rkb, + int32_t replica_id, rd_kafka_topic_partition_list_t *parts, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, @@ -1126,7 +1131,7 @@ void rd_kafka_OffsetForLeaderEpochRequest( int16_t ApiVersion; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL); + rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 4, NULL); /* If the supported ApiVersions are not yet known, * or this broker doesn't support it, we let this request * succeed or fail later from the broker thread where the @@ -1149,11 +1154,19 @@ void rd_kafka_OffsetForLeaderEpochRequest( /* LeaderEpoch */ RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + + if (ApiVersion >= 3) { + rd_kafka_buf_write_i32(rkbuf, replica_id); + } rd_kafka_buf_write_topic_partitions( rkbuf, parts, rd_false /*include invalid offsets*/, rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/, rd_true /*use topic name*/, fields); + if (ApiVersion >= 4) { + rd_kafka_buf_write_tags_empty(rkbuf); + } + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); /* Let caller perform retries */ diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaaf..a2392ea639 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -240,6 +240,7 @@ rd_kafka_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **offsets); void rd_kafka_OffsetForLeaderEpochRequest( rd_kafka_broker_t *rkb, + int32_t replica_id, rd_kafka_topic_partition_list_t *parts, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb,