Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2460,6 +2460,14 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf);
rd_kafka_resp_err_t err;
int32_t TopicsCnt, i;
int32_t ReplicaId = -1;
int32_t ApiVersion;

ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;

if (ApiVersion >= 3) {
rd_kafka_buf_read_i32(rkbuf, &ReplicaId);
}

/* Response: ThrottleTimeMs */
rd_kafka_buf_write_i32(resp, 0);
Expand Down Expand Up @@ -2505,6 +2513,10 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
/* LeaderEpoch */
rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch);

/* Skip partition tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_skip_tags(rkbuf);

mpart = rd_kafka_mock_partition_find(mtopic, Partition);
if (!err && !mpart)
err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
Expand All @@ -2528,9 +2540,29 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_write_i32(resp, LeaderEpoch);
/* Response: Partition */
rd_kafka_buf_write_i64(resp, EndOffset);

/* Response: Partition tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_write_tags_empty(resp);
}

/* Skip topic tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_skip_tags(rkbuf);

/* Response: Topic tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_write_tags_empty(resp);
}

/* Skip top-level tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_skip_tags(rkbuf);

/* Response: Top-level tags for v4+ */
if (ApiVersion >= 4)
rd_kafka_buf_write_tags_empty(resp);

rd_kafka_mock_connection_send_response(mconn, resp);

return 0;
Expand Down Expand Up @@ -3022,7 +3054,7 @@ const struct rd_kafka_mock_api_handler
rd_kafka_mock_handle_TxnOffsetCommit},
[RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn},
[RD_KAFKAP_OffsetForLeaderEpoch] =
{2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch},
{2, 4, 4, rd_kafka_mock_handle_OffsetForLeaderEpoch},
[RD_KAFKAP_ConsumerGroupHeartbeat] =
{1, 1, 1, rd_kafka_mock_handle_ConsumerGroupHeartbeat},
[RD_KAFKAP_GetTelemetrySubscriptions] =
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ static rd_kafka_op_res_t rd_kafka_offset_validate_op_cb(rd_kafka_t *rk,
void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
rd_kafka_topic_partition_list_t *parts;
rd_kafka_topic_partition_t *rktpar;
int32_t ReplicaId = -1; /* Consumer replica id */
char reason[512];
va_list ap;

Expand Down Expand Up @@ -1219,7 +1220,8 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos), reason);

rd_kafka_OffsetForLeaderEpochRequest(
rktp->rktp_leader, parts, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
rktp->rktp_leader, ReplicaId, parts,
RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
rd_kafka_toppar_handle_OffsetForLeaderEpoch, rktp);
rd_kafka_topic_partition_list_destroy(parts);
}
Expand Down
15 changes: 14 additions & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,10 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(
if (!*offsets)
goto err_parse;

if (ApiVersion >= 4) {
rd_kafka_buf_skip_tags(rkbuf);
}

return RD_KAFKA_RESP_ERR_NO_ERROR;

err:
Expand All @@ -1118,6 +1122,7 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(
*/
void rd_kafka_OffsetForLeaderEpochRequest(
rd_kafka_broker_t *rkb,
int32_t replica_id,
rd_kafka_topic_partition_list_t *parts,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
Expand All @@ -1126,7 +1131,7 @@ void rd_kafka_OffsetForLeaderEpochRequest(
int16_t ApiVersion;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL);
rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 4, NULL);
/* If the supported ApiVersions are not yet known,
* or this broker doesn't support it, we let this request
* succeed or fail later from the broker thread where the
Expand All @@ -1149,11 +1154,19 @@ void rd_kafka_OffsetForLeaderEpochRequest(
/* LeaderEpoch */
RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};

if (ApiVersion >= 3) {
rd_kafka_buf_write_i32(rkbuf, replica_id);
}
rd_kafka_buf_write_topic_partitions(
rkbuf, parts, rd_false /*include invalid offsets*/,
rd_false /*skip valid offsets*/, rd_false /*don't use topic id*/,
rd_true /*use topic name*/, fields);

if (ApiVersion >= 4) {
rd_kafka_buf_write_tags_empty(rkbuf);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

/* Let caller perform retries */
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ rd_kafka_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t **offsets);
void rd_kafka_OffsetForLeaderEpochRequest(
rd_kafka_broker_t *rkb,
int32_t replica_id,
rd_kafka_topic_partition_list_t *parts,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
Expand Down