From f3c72599101330c3c1d45b9533b3493033812ad3 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 28 Nov 2024 21:12:46 +0530 Subject: [PATCH 01/10] first pass --- examples/consumer.c | 7 + examples/describe_consumer_groups.c | 18 +- src/rdkafka.c | 1 + src/rdkafka.h | 45 ++++ src/rdkafka_admin.c | 376 +++++++++++++++++++++++----- src/rdkafka_admin.h | 4 + src/rdkafka_op.c | 6 +- src/rdkafka_op.h | 3 + src/rdkafka_proto.h | 1 + src/rdkafka_request.c | 59 +++++ src/rdkafka_request.h | 10 + 11 files changed, 468 insertions(+), 62 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index dad3efc43b..76ebfbd633 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -139,6 +139,13 @@ int main(int argc, char **argv) { return 1; } + if(rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + /* * Create consumer instance. * diff --git a/examples/describe_consumer_groups.c b/examples/describe_consumer_groups.c index daacc1d021..d58116cf94 100644 --- a/examples/describe_consumer_groups.c +++ b/examples/describe_consumer_groups.c @@ -174,6 +174,18 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) { printf(" Assignment:\n"); print_partition_list(stdout, topic_partitions, 0, " "); } + const rd_kafka_MemberAssignment_t *target_assignment = + rd_kafka_MemberDescription_target_assignment(member); + const rd_kafka_topic_partition_list_t *target_topic_partitions = + rd_kafka_MemberAssignment_partitions(target_assignment); + if (!target_topic_partitions) { + printf(" No target assignment\n"); + } else if (target_topic_partitions->cnt == 0) { + printf(" Empty target assignment\n"); + } else { + printf(" Target assignment:\n"); + print_partition_list(stdout, target_topic_partitions, 0, " "); + } } @@ -194,6 +206,8 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { rd_kafka_ConsumerGroupDescription_partition_assignor(group); rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupDescription_state(group); + rd_kafka_consumer_group_type_t type = + rd_kafka_ConsumerGroupDescription_type(group); authorized_operations = rd_kafka_ConsumerGroupDescription_authorized_operations( group, &authorized_operations_cnt); @@ -212,9 +226,9 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { rd_kafka_Node_port(coordinator)); } printf( - "Group \"%s\", partition assignor \"%s\", " + "Group \"%s\", partition assignor \"%s\", type \"%s\" " " state %s%s, with %" PRId32 " member(s)\n", - group_id, partition_assignor, + group_id, partition_assignor, rd_kafka_consumer_group_type_name(type), rd_kafka_consumer_group_state_name(state), coordinator_desc, member_cnt); for (j = 0; j < authorized_operations_cnt; j++) { diff --git a/src/rdkafka.c b/src/rdkafka.c index 656076df1b..b874f9634a 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1660,6 +1660,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st, [RD_KAFKAP_AlterClientQuotas] = rd_true, [RD_KAFKAP_DescribeUserScramCredentials] = rd_true, [RD_KAFKAP_AlterUserScramCredentials] = rd_true, + [RD_KAFKAP_ConsumerGroupDescribe] = rd_true, }}; int i; int cnt = 0; diff --git a/src/rdkafka.h b/src/rdkafka.h index e5d47d3264..7724c490e8 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -5593,6 +5593,8 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000 /** ElectLeaders_result_t */ #define RD_KAFKA_EVENT_ELECTLEADERS_RESULT 0x800000 +/** ConsumerGroupDescribe_result_t */ +#define RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT 0x1000000 /** * @returns the event type for the given event. @@ -5752,6 +5754,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev); * - RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT * - RD_KAFKA_EVENT_LISTOFFSETS_RESULT * - RD_KAFKA_EVENT_ELECTLEADERS_RESULT + * - RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT */ RD_EXPORT void *rd_kafka_event_opaque(rd_kafka_event_t *rkev); @@ -8881,6 +8884,20 @@ RD_EXPORT const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( const rd_kafka_ConsumerGroupDescription_t *grpdesc); +/** + * @brief Gets type for the \p grpdesc group. + * + * @param grpdesc The group description. + * + * @return A group type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p grpdesc object. + */ +RD_EXPORT +rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupDescription_type( + const rd_kafka_ConsumerGroupDescription_t *grpdesc); + /** * @brief Gets the members count of \p grpdesc group. * @@ -8993,6 +9010,34 @@ RD_EXPORT const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( const rd_kafka_MemberAssignment_t *assignment); +/** + * @brief Gets target assignment of \p member. + * + * @param member The group member. + * + * @return The target assignment. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p member object. + */ +RD_EXPORT +const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment( + const rd_kafka_MemberDescription_t *member); + +/** + * @brief Gets target assigned partitions of a member \p assignment. + * + * @param assignment The group member assignment. + * + * @return The target assigned partitions. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p assignment object. + */ +RD_EXPORT +const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_target_partitions( + const rd_kafka_MemberAssignment_t *assignment); + /**@}*/ /** diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 41934e349d..7d96b6e876 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7748,7 +7748,8 @@ static rd_kafka_MemberDescription_t *rd_kafka_MemberDescription_new( const char *consumer_id, const char *group_instance_id, const char *host, - const rd_kafka_topic_partition_list_t *assignment) { + const rd_kafka_topic_partition_list_t *assignment, + const rd_kafka_topic_partition_list_t *target_assignment) { rd_kafka_MemberDescription_t *member; member = rd_calloc(1, sizeof(*member)); member->client_id = rd_strdup(client_id); @@ -7762,6 +7763,12 @@ static rd_kafka_MemberDescription_t *rd_kafka_MemberDescription_new( else member->assignment.partitions = rd_kafka_topic_partition_list_new(0); + if (target_assignment) + member->target_assignment.partitions = + rd_kafka_topic_partition_list_copy(target_assignment); + else + member->target_assignment.partitions = + rd_kafka_topic_partition_list_new(0); return member; } @@ -7777,7 +7784,8 @@ static rd_kafka_MemberDescription_t * rd_kafka_MemberDescription_copy(const rd_kafka_MemberDescription_t *src) { return rd_kafka_MemberDescription_new(src->client_id, src->consumer_id, src->group_instance_id, src->host, - src->assignment.partitions); + src->assignment.partitions, + src->target_assignment.partitions); } /** @@ -7801,6 +7809,9 @@ rd_kafka_MemberDescription_destroy(rd_kafka_MemberDescription_t *member) { if (member->assignment.partitions) rd_kafka_topic_partition_list_destroy( member->assignment.partitions); + if(member->target_assignment.partitions) + rd_kafka_topic_partition_list_destroy( + member->target_assignment.partitions); rd_free(member); } @@ -7838,6 +7849,16 @@ const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( return assignment->partitions; } +const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment( + const rd_kafka_MemberDescription_t *member) { + return &member->target_assignment; +} + +const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_target_partitions( + const rd_kafka_MemberAssignment_t *assignment) { + return assignment->partitions; +} + /** * @brief Create a new ConsumerGroupDescription object. @@ -7864,7 +7885,8 @@ rd_kafka_ConsumerGroupDescription_new( int authorized_operations_cnt, rd_kafka_consumer_group_state_t state, const rd_kafka_Node_t *coordinator, - rd_kafka_error_t *error) { + rd_kafka_error_t *error, + rd_kafka_consumer_group_type_t type) { rd_kafka_ConsumerGroupDescription_t *grpdesc; grpdesc = rd_calloc(1, sizeof(*grpdesc)); grpdesc->group_id = rd_strdup(group_id); @@ -7892,6 +7914,7 @@ rd_kafka_ConsumerGroupDescription_new( error != NULL ? rd_kafka_error_new(rd_kafka_error_code(error), "%s", rd_kafka_error_string(error)) : NULL; + grpdesc->type = type; return grpdesc; } @@ -7905,10 +7928,11 @@ rd_kafka_ConsumerGroupDescription_new( */ static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, - rd_kafka_error_t *error) { + rd_kafka_error_t *error, + rd_kafka_consumer_group_type_t type) { return rd_kafka_ConsumerGroupDescription_new( group_id, rd_false, NULL, NULL, NULL, 0, - RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error); + RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error, type); } /** @@ -7924,7 +7948,7 @@ rd_kafka_ConsumerGroupDescription_copy( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, grpdesc->authorized_operations, grpdesc->authorized_operations_cnt, - grpdesc->state, grpdesc->coordinator, grpdesc->error); + grpdesc->state, grpdesc->coordinator, grpdesc->error, grpdesc->type); } /** @@ -7996,6 +8020,11 @@ const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( return grpdesc->coordinator; } +rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupDescription_type( + const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + return grpdesc->type; +} + size_t rd_kafka_ConsumerGroupDescription_member_count( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { return rd_list_cnt(&grpdesc->members); @@ -8015,51 +8044,6 @@ static int rd_kafka_DescribeConsumerGroups_cmp(const void *a, const void *b) { return strcmp(a, b); } -/** @brief Merge the DescribeConsumerGroups response from a single broker - * into the user response list. - */ -static void rd_kafka_DescribeConsumerGroups_response_merge( - rd_kafka_op_t *rko_fanout, - const rd_kafka_op_t *rko_partial) { - rd_kafka_ConsumerGroupDescription_t *groupres = NULL; - rd_kafka_ConsumerGroupDescription_t *newgroupres; - const char *grp = rko_partial->rko_u.admin_result.opaque; - int orig_pos; - - rd_assert(rko_partial->rko_evtype == - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT); - - if (!rko_partial->rko_err) { - /* Proper results. - * We only send one group per request, make sure it matches */ - groupres = - rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); - rd_assert(groupres); - rd_assert(!strcmp(groupres->group_id, grp)); - newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); - } else { - /* Op errored, e.g. timeout */ - rd_kafka_error_t *error = - rd_kafka_error_new(rko_partial->rko_err, NULL); - newgroupres = - rd_kafka_ConsumerGroupDescription_new_error(grp, error); - rd_kafka_error_destroy(error); - } - - /* As a convenience to the application we insert group result - * in the same order as they were requested. */ - orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, - rd_kafka_DescribeConsumerGroups_cmp); - rd_assert(orig_pos != -1); - - /* Make sure result is not already set */ - rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, - orig_pos) == NULL); - - rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, - newgroupres); -} - /** * @brief Construct and send DescribeConsumerGroupsRequest to \p rkb @@ -8250,7 +8234,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, member = rd_kafka_MemberDescription_new( client_id, member_id, group_instance_id, - client_host, partitions); + client_host, partitions, NULL); if (partitions) rd_kafka_topic_partition_list_destroy( partitions); @@ -8279,10 +8263,10 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, group_id, is_simple_consumer_group, &members, proto, operations, operation_cnt, rd_kafka_consumer_group_state_code(group_state), - node, error); + node, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); } else grpdesc = rd_kafka_ConsumerGroupDescription_new_error( - group_id, error); + group_id, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); @@ -8336,6 +8320,280 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, return reply->rkbuf_err; } +static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( + rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + int16_t api_version; + int32_t cnt; + rd_kafka_op_t *rko_result = NULL; + rd_kafka_broker_t *rkb = reply->rkbuf_rkb; + rd_kafka_error_t *error = NULL; + char *group_id = NULL, *group_state = NULL, *assignor_name = NULL, *error_str = NULL, *host = NULL; + int32_t group_epoch, assignment_epoch; + rd_kafka_AclOperation_t *operations = NULL; + rd_kafka_Node_t *node = NULL; + int32_t nodeid; + uint16_t port; + int operation_cnt = -1; + printf("rd_kafka_ConsumerGroupDescribeResponseParse\n"); + + api_version = rd_kafka_buf_ApiVersion(reply); + rd_kafka_buf_read_throttle_time(reply); + + rd_kafka_buf_read_arraycnt(reply, &cnt, 100000); + + rd_kafka_broker_lock(rkb); + nodeid = rkb->rkb_nodeid; + host = rd_strdup(rkb->rkb_origname); + port = rkb->rkb_port; + rd_kafka_broker_unlock(rkb); + + node = rd_kafka_Node_new(nodeid, host, port, NULL); + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, cnt, + rd_kafka_ConsumerGroupDescription_free); + + for(int i = 0; i < cnt; i++) { + int16_t error_code; + int32_t authorized_operations = -1; + int32_t member_cnt; + rd_kafkap_str_t GroupId, GroupState, AssignorName, ErrorString; + rd_list_t members; + rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &ErrorString); + rd_kafka_buf_read_str(reply, &GroupId); + rd_kafka_buf_read_str(reply, &GroupState); + rd_kafka_buf_read_i32(reply, &group_epoch); + rd_kafka_buf_read_i32(reply, &assignment_epoch); + rd_kafka_buf_read_str(reply, &AssignorName); + rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000); + + group_id = RD_KAFKAP_STR_DUP(&GroupId); + group_state = RD_KAFKAP_STR_DUP(&GroupState); + assignor_name = RD_KAFKAP_STR_DUP(&AssignorName); + error_str = RD_KAFKAP_STR_DUP(&ErrorString); + printf("error_code: %d\n", error_code); + printf("error string: %s\n", error_str); + printf("group_id: %s\n", group_id); + printf("group_state: %s\n", group_state); + printf("group_epoch: %d\n", group_epoch); + printf("assignment_epoch: %d\n", assignment_epoch); + printf("assignor_name: %s\n", assignor_name); + printf("member_cnt: %d\n", member_cnt); + + if(error_code) { + error = rd_kafka_error_new(error_code, "ConsumerGroupDescribe: %s", error_str); + } + + rd_list_init(&members, 0, rd_kafka_MemberDescription_free); + + for(int j = 0; j < member_cnt; j++) { + rd_kafkap_str_t MemberId, InstanceId, RackId, ClientId, ClientHost, SubscribedTopicNames, SubscribedTopicRegex; + int32_t MemberEpoch; + char *member_id, *instance_id, *rack_id, + *client_id, *client_host, *subscribed_topic_names, + *subscribed_topic_regex = NULL; + rd_kafka_MemberDescription_t *member = NULL; + rd_kafka_topic_partition_list_t *assignment = NULL, *target_assignment = NULL; + + rd_kafka_buf_read_str(reply, &MemberId); + rd_kafka_buf_read_str(reply, &InstanceId); + rd_kafka_buf_read_str(reply, &RackId); + rd_kafka_buf_read_i32(reply, &MemberEpoch); + rd_kafka_buf_read_str(reply, &ClientId); + rd_kafka_buf_read_str(reply, &ClientHost); + rd_kafka_buf_read_str(reply, &SubscribedTopicNames); + rd_kafka_buf_read_str(reply, &SubscribedTopicRegex); + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); + target_assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); + + member_id = RD_KAFKAP_STR_DUP(&MemberId); + instance_id = RD_KAFKAP_STR_DUP(&InstanceId); + rack_id = RD_KAFKAP_STR_DUP(&RackId); + client_id = RD_KAFKAP_STR_DUP(&ClientId); + client_host = RD_KAFKAP_STR_DUP(&ClientHost); + subscribed_topic_names = RD_KAFKAP_STR_DUP(&SubscribedTopicNames); + subscribed_topic_regex = RD_KAFKAP_STR_DUP(&SubscribedTopicRegex); + + printf("member_id: %s\n", member_id); + printf("instance_id: %s\n", instance_id); + printf("rack_id: %s\n", rack_id); + printf("member_epoch: %d\n", MemberEpoch); + printf("client_id: %s\n", client_id); + printf("client_host: %s\n", client_host); + printf("subscribed_topic_names: %s\n", subscribed_topic_names); + printf("subscribed_topic_regex: %s\n", subscribed_topic_regex); + printf("assignment: topic: %s partition: %d\n", assignment->elems[0].topic, assignment->elems[0].partition); + printf("target_assignment: topic: %s partition: %d\n", target_assignment->elems[0].topic, target_assignment->elems[0].partition); + member = rd_kafka_MemberDescription_new( + client_id, member_id, instance_id, client_host, assignment, target_assignment); + + rd_list_add(&members, member); + rd_kafka_buf_skip_tags(reply); + + if(assignment) + rd_kafka_topic_partition_list_destroy(assignment); + if(target_assignment) + rd_kafka_topic_partition_list_destroy(target_assignment); + rd_free(member_id); + rd_free(instance_id); + rd_free(rack_id); + rd_free(client_id); + rd_free(client_host); + rd_free(subscribed_topic_names); + rd_free(subscribed_topic_regex); + } + rd_kafka_buf_read_i32(reply, &authorized_operations); + operations = rd_kafka_AuthorizedOperations_parse( + authorized_operations, &operation_cnt); + rd_kafka_buf_skip_tags(reply); + + if(error == NULL) { + grpdesc = rd_kafka_ConsumerGroupDescription_new( + group_id, rd_false, &members, assignor_name, operations, operation_cnt, + rd_kafka_consumer_group_state_code(group_state), + node, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + } else { + grpdesc = rd_kafka_ConsumerGroupDescription_new_error( + group_id, error , RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + } + rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); + + rd_list_destroy(&members); + rd_free(group_id); + rd_free(group_state); + rd_free(assignor_name); + rd_free(error_str); + RD_IF_FREE(error, rd_kafka_error_destroy); + RD_IF_FREE(operations, rd_free); + + error = NULL; + group_id = NULL; + group_state = NULL; + assignor_name = NULL; + error_str = NULL; + operations = NULL; + } + rd_kafka_buf_skip_tags(reply); + *rko_resultp = rko_result; + printf("rd_kafka_ConsumerGroupDescribeResponseParse end\n"); + return RD_KAFKA_RESP_ERR_NO_ERROR; +err_parse: + if(group_id) + rd_free(group_id); + if(group_state) + rd_free(group_state); + if(assignor_name) + rd_free(assignor_name); + if(error_str) + rd_free(error_str); + if(error) + rd_kafka_error_destroy(error); + RD_IF_FREE(operations, rd_free); + if(rko_result) + rd_kafka_op_destroy(rko_result); + rd_snprintf(errstr, errstr_size, + "DescribeConsumerGroups response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + return reply->rkbuf_err; + +} + +/** @brief Merge the DescribeConsumerGroups response from a single broker + * into the user response list. + */ +static void rd_kafka_DescribeConsumerGroups_response_merge( + rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + rd_kafka_ConsumerGroupDescription_t *groupres = NULL; + rd_kafka_ConsumerGroupDescription_t *newgroupres; + const char *grp = rko_partial->rko_u.admin_result.opaque; + int orig_pos; + + rd_assert(rko_partial->rko_evtype == + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT || rko_partial->rko_evtype == RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT); + + if (!rko_partial->rko_err) { + /* Proper results. + * We only send one group per request, make sure it matches */ + groupres = + rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); + rd_assert(groupres); + rd_assert(!strcmp(groupres->group_id, grp)); + newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); + } else { + /* Op errored, e.g. timeout */ + rd_kafka_error_t *error = + rd_kafka_error_new(rko_partial->rko_err, NULL); + newgroupres = + rd_kafka_ConsumerGroupDescription_new_error(grp, error, RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); + rd_kafka_error_destroy(error); + } + + if(groupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && + (groupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || groupres->error->code == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { + rko_fanout->rko_u.admin_request.fanout.outstanding++; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_admin_DescribeConsumerGroupsRequest, + rd_kafka_DescribeConsumerGroupsResponse_parse, + }; + rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( + rko_fanout->rko_rk, RD_KAFKA_OP_CONSUMERGROUPDESCRIBE, + RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT, &cbs, &rko_fanout->rko_u.admin_request.options, + rko_fanout->rko_rk->rk_ops); + + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = + RD_KAFKA_ADMIN_TARGET_COORDINATOR; + rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; + rko->rko_u.admin_request.coordkey = rd_strdup(grp); + + /* Set the group name as the opaque so the fanout worker use it + * to fill in errors. + * References rko_fanout's memory, which will always outlive + * the fanned out op. */ + rd_kafka_AdminOptions_set_opaque( + &rko->rko_u.admin_request.options, grp); + + rd_list_init(&rko->rko_u.admin_request.args, 1, rd_free); + rd_list_add(&rko->rko_u.admin_request.args, + rd_strdup(grp)); + + rd_kafka_q_enq(rko_fanout->rko_rk->rk_ops, rko); + } + else { + /* As a convenience to the application we insert group result + * in the same order as they were requested. */ + orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, + rd_kafka_DescribeConsumerGroups_cmp); + rd_assert(orig_pos != -1); + + /* Make sure result is not already set */ + rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, + orig_pos) == NULL); + + rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, + newgroupres); + } +} + void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, const char **groups, size_t groups_cnt, @@ -8405,14 +8663,14 @@ void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, * coordinator into one op. */ for (i = 0; i < groups_cnt; i++) { static const struct rd_kafka_admin_worker_cbs cbs = { - rd_kafka_admin_DescribeConsumerGroupsRequest, - rd_kafka_DescribeConsumerGroupsResponse_parse, + rd_kafka_ConsumerGroupDescribeRequest, + rd_kafka_ConsumerGroupDescribeResponseParse, }; char *grp = rd_list_elem(&rko_fanout->rko_u.admin_request.args, (int)i); rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( - rk, RD_KAFKA_OP_DESCRIBECONSUMERGROUPS, - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, &cbs, options, + rk, RD_KAFKA_OP_CONSUMERGROUPDESCRIBE, + RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT, &cbs, options, rk->rk_ops); rko->rko_u.admin_request.fanout_parent = rko_fanout; @@ -8443,7 +8701,7 @@ rd_kafka_DescribeConsumerGroups_result_groups( const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; - rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS); + rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS || reqtype == RD_KAFKA_OP_CONSUMERGROUPDESCRIBE); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_ConsumerGroupDescription_t **) diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index ad58fe5cc2..f215501fec 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -495,6 +495,8 @@ struct rd_kafka_MemberDescription_s { char *group_instance_id; /**< Group instance id */ char *host; /**< Group member host */ rd_kafka_MemberAssignment_t assignment; /**< Member assignment */ + rd_kafka_MemberAssignment_t + target_assignment; /**< Target assignment */ }; /** @@ -524,6 +526,8 @@ struct rd_kafka_ConsumerGroupDescription_s { rd_kafka_AclOperation_t *authorized_operations; /** Group specific error. */ rd_kafka_error_t *error; + /**< Consumer group type. */ + rd_kafka_consumer_group_type_t type; }; /**@}*/ diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 60076e835d..6aae7f6a57 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -123,6 +123,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_CONSUMERGROUPDESCRIBE] = + "REPLY:CONSUMERGROUPDESCRIBE", }; if (type & RD_KAFKA_OP_REPLY) @@ -287,6 +289,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.telemetry_broker), [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_CONSUMERGROUPDESCRIBE] = + sizeof(rko->rko_u.admin_request), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -440,7 +444,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_LISTOFFSETS: - case RD_KAFKA_OP_ELECTLEADERS: + case RD_KAFKA_OP_ELECTLEADERS: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 3af8a5f395..521cb46484 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -189,6 +189,9 @@ typedef enum { RD_KAFKA_OP_ELECTLEADERS, /**< Admin: * ElectLeaders * u.admin_request */ + RD_KAFKA_OP_CONSUMERGROUPDESCRIBE, /**< Admin: + * ConsumerGroupDescribe + * u.admin_request */ RD_KAFKA_OP__END } rd_kafka_op_type_t; diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 895e338c83..a1fae6bd6a 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -156,6 +156,7 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) { "DescribeUserScramCredentialsRequest", [RD_KAFKAP_AlterUserScramCredentials] = "AlterUserScramCredentialsRequest", + [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest", [RD_KAFKAP_Vote] = "VoteRequest", [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest", [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest", diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index ac04343fac..7ba447b15b 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6003,6 +6003,65 @@ rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( return RD_KAFKA_RESP_ERR_NO_ERROR; } + +/** + * @brief Construct and send ConsumerGroupDescribe Request to broker. + * + * @return RD_KAFKA_RESP_ERR_NO_ERROR on success, a new error instance that + * must be released with rd_kafka_error_destroy() in case of error. + */ +rd_kafka_resp_err_t +rd_kafka_ConsumerGroupDescribeRequest(rd_kafka_broker_t *rkb, + const rd_list_t *groups /*(char*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t maxApiVersion = 0; + int16_t ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_ConsumerGroupDescribe, 0, maxApiVersion, NULL); + size_t ofGroupsArrayCnt; + int grp_ids_cnt = rd_list_cnt(groups); + int i, include_authorized_operations; + char *group; + + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "Broker does not support ConsumerGroupDescribe"); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + include_authorized_operations = + rd_kafka_confval_get_int(&options->include_authorized_operations); + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_ConsumerGroupDescribe, 1, + 4 /* rd_kafka_buf_write_arraycnt_pos */ + + 1 /* IncludeAuthorizedOperations */ + 1 /* tags */ + + 32 * grp_ids_cnt /* Groups */, + rd_true /* flexver */); + + ofGroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); + rd_kafka_buf_finalize_arraycnt(rkbuf, ofGroupsArrayCnt, grp_ids_cnt); + printf("grp_ids_cnt: %d\n", grp_ids_cnt); + printf("include_authorized_operations: %d\n", include_authorized_operations); + RD_LIST_FOREACH(group, groups, i) { + group = rd_list_elem(groups, i); + printf("group: %s\n", group); + rd_kafka_buf_write_str(rkbuf, group, -1); + } + + rd_kafka_buf_write_bool(rkbuf, include_authorized_operations); + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + printf("rd_kafka_ConsumerGroupDescribeRequest: done\n"); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + /** * @brief Parses and handles an InitProducerId reply. * diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 932b301acd..ed8693adbc 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -639,6 +639,16 @@ rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( rd_kafka_resp_cb_t *resp_cb, void *opaque); +rd_kafka_resp_err_t +rd_kafka_ConsumerGroupDescribeRequest(rd_kafka_broker_t *rkb, + const rd_list_t *groups /*(char*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + void rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers( rd_tmpabuf_t *tbuf, rd_kafkap_NodeEndpoints_t *NodeEndpoints); From 5246d91d75b78f418d8f8a2ca583580d2cddd8cc Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 29 Nov 2024 14:31:33 +0530 Subject: [PATCH 02/10] debugging --- src/rdkafka_admin.c | 80 ++++++++++++++++++++++++++++++++++----------- src/rdkafka_event.c | 4 ++- src/rdkafka_event.h | 1 + src/rdkafka_op.c | 3 +- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 7d96b6e876..61a6592836 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7944,6 +7944,7 @@ rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_copy( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { + printf("rd_kafka_ConsumerGroupDescription_copy\n"); return rd_kafka_ConsumerGroupDescription_new( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, @@ -8396,12 +8397,15 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( for(int j = 0; j < member_cnt; j++) { rd_kafkap_str_t MemberId, InstanceId, RackId, ClientId, ClientHost, SubscribedTopicNames, SubscribedTopicRegex; - int32_t MemberEpoch; + int32_t MemberEpoch, idx; char *member_id, *instance_id, *rack_id, *client_id, *client_host, *subscribed_topic_names, *subscribed_topic_regex = NULL; rd_kafka_MemberDescription_t *member = NULL; rd_kafka_topic_partition_list_t *assignment = NULL, *target_assignment = NULL; + int8_t are_assignments_present = 0, are_target_assignments_present = 0; + char **subscribed_topic_names_array = NULL; + int32_t subscribed_topic_names_array_cnt; rd_kafka_buf_read_str(reply, &MemberId); rd_kafka_buf_read_str(reply, &InstanceId); @@ -8409,27 +8413,43 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( rd_kafka_buf_read_i32(reply, &MemberEpoch); rd_kafka_buf_read_str(reply, &ClientId); rd_kafka_buf_read_str(reply, &ClientHost); - rd_kafka_buf_read_str(reply, &SubscribedTopicNames); + rd_kafka_buf_read_arraycnt(reply, &subscribed_topic_names_array_cnt, 100000); + printf("subscribed_topic_names_array_cnt: %d\n", subscribed_topic_names_array_cnt); + subscribed_topic_names_array = rd_calloc(subscribed_topic_names_array_cnt, sizeof(char*)); + for(idx=0; idx < subscribed_topic_names_array_cnt; idx++) { + rd_kafkap_str_t SubscribedTopicName; + rd_kafka_buf_read_str(reply, &SubscribedTopicName); + char *subscribed_topic_name = RD_KAFKAP_STR_DUP(&SubscribedTopicName); + subscribed_topic_names_array[idx] = subscribed_topic_name; + printf("subscribed_topic_name: %s\n", subscribed_topic_name); + } rd_kafka_buf_read_str(reply, &SubscribedTopicRegex); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - rd_kafka_buf_skip_tags(reply); - target_assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - rd_kafka_buf_skip_tags(reply); + // rd_kafka_buf_read_i8(reply, &are_assignments_present); + // printf("are_assignments_present: %d\n", are_assignments_present); + // if(are_assignments_present == 1) { + assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); + // } + // rd_kafka_buf_read_i8(reply, &are_target_assignments_present); + // printf("are_target_assignments_present: %d\n", are_target_assignments_present); + // if(are_target_assignments_present == 1) { + + target_assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + //} + rd_kafka_buf_skip_tags(reply); member_id = RD_KAFKAP_STR_DUP(&MemberId); instance_id = RD_KAFKAP_STR_DUP(&InstanceId); rack_id = RD_KAFKAP_STR_DUP(&RackId); client_id = RD_KAFKAP_STR_DUP(&ClientId); client_host = RD_KAFKAP_STR_DUP(&ClientHost); - subscribed_topic_names = RD_KAFKAP_STR_DUP(&SubscribedTopicNames); subscribed_topic_regex = RD_KAFKAP_STR_DUP(&SubscribedTopicRegex); printf("member_id: %s\n", member_id); @@ -8438,15 +8458,28 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( printf("member_epoch: %d\n", MemberEpoch); printf("client_id: %s\n", client_id); printf("client_host: %s\n", client_host); - printf("subscribed_topic_names: %s\n", subscribed_topic_names); + printf("subscribed topic names: "); + for(int i=0;ielems[0].topic, assignment->elems[0].partition); - printf("target_assignment: topic: %s partition: %d\n", target_assignment->elems[0].topic, target_assignment->elems[0].partition); + if(assignment) { + printf("assignment: topic: %s partition: %d\n", assignment->elems[0].topic, assignment->elems[0].partition); + } + else { + printf("assignment is null\n"); + } + if(target_assignment) { + printf("target_assignment: topic: %s partition: %d\n", target_assignment->elems[0].topic, target_assignment->elems[0].partition); + } + else { + printf("target_assignment is null\n"); + } member = rd_kafka_MemberDescription_new( client_id, member_id, instance_id, client_host, assignment, target_assignment); rd_list_add(&members, member); - rd_kafka_buf_skip_tags(reply); if(assignment) rd_kafka_topic_partition_list_destroy(assignment); @@ -8526,9 +8559,11 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_ConsumerGroupDescription_t *newgroupres; const char *grp = rko_partial->rko_u.admin_result.opaque; int orig_pos; - + printf("rd_kafka_DescribeConsumerGroups_response_merge\n"); rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT || rko_partial->rko_evtype == RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT); + printf("rko_partial->rko_evtype: %d\n", rko_partial->rko_evtype); + printf("processing group: %s\n", grp); if (!rko_partial->rko_err) { /* Proper results. @@ -8538,6 +8573,7 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_assert(groupres); rd_assert(!strcmp(groupres->group_id, grp)); newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); + printf("newgroupres->type: %d\n", newgroupres->type); } else { /* Op errored, e.g. timeout */ rd_kafka_error_t *error = @@ -8546,9 +8582,11 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_ConsumerGroupDescription_new_error(grp, error, RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); rd_kafka_error_destroy(error); } + printf("newgroupres->type: %d\n", newgroupres->type); - if(groupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && - (groupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || groupres->error->code == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { + if(newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && + (newgroupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || newgroupres->error->code == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { + printf("group not found\n"); rko_fanout->rko_u.admin_request.fanout.outstanding++; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_admin_DescribeConsumerGroupsRequest, @@ -8579,6 +8617,7 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_q_enq(rko_fanout->rko_rk->rk_ops, rko); } else { + printf("group found\n"); /* As a convenience to the application we insert group result * in the same order as they were requested. */ orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, @@ -8591,7 +8630,9 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, newgroupres); + printf("processed group: %s\n", grp); } + printf("rd_kafka_DescribeConsumerGroups_response_merge end\n"); } void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, @@ -8699,6 +8740,7 @@ rd_kafka_DescribeConsumerGroups_result_groups( const rd_kafka_DescribeConsumerGroups_result_t *result, size_t *cntp) { const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; + printf("rd_kafka_DescribeConsumerGroups_result_groups\n"); rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS || reqtype == RD_KAFKA_OP_CONSUMERGROUPDESCRIBE); diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 7e8cd200ae..41a98f83cf 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -99,6 +99,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "ListOffsetsResult"; case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: return "ElectLeadersResult"; + case RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT: + return "ConsumerGroupDescribeResult"; default: return "?unknown?"; } @@ -382,7 +384,7 @@ rd_kafka_event_ListConsumerGroups_result(rd_kafka_event_t *rkev) { const rd_kafka_DescribeConsumerGroups_result_t * rd_kafka_event_DescribeConsumerGroups_result(rd_kafka_event_t *rkev) { if (!rkev || - rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) + (rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT && rkev->rko_evtype != RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT)) return NULL; else return (const rd_kafka_DescribeConsumerGroups_result_t *)rkev; diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index cf63e414eb..7a049602f0 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -118,6 +118,7 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: + case RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT: return 1; default: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 6aae7f6a57..c8c72a99d7 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -444,7 +444,8 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_LISTOFFSETS: - case RD_KAFKA_OP_ELECTLEADERS: + case RD_KAFKA_OP_ELECTLEADERS: + case RD_KAFKA_OP_CONSUMERGROUPDESCRIBE: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states From 0d16888fc03f3ef9d7f6e2e1c61fabeac824c99d Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 2 Dec 2024 18:25:07 +0530 Subject: [PATCH 03/10] API working now --- src/rdkafka_admin.c | 80 ++++++++----------------------------------- src/rdkafka_request.c | 5 +-- 2 files changed, 16 insertions(+), 69 deletions(-) diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 61a6592836..99a2de4b02 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7944,7 +7944,6 @@ rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_copy( const rd_kafka_ConsumerGroupDescription_t *grpdesc) { - printf("rd_kafka_ConsumerGroupDescription_copy\n"); return rd_kafka_ConsumerGroupDescription_new( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, @@ -8340,7 +8339,6 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( int32_t nodeid; uint16_t port; int operation_cnt = -1; - printf("rd_kafka_ConsumerGroupDescribeResponseParse\n"); api_version = rd_kafka_buf_ApiVersion(reply); rd_kafka_buf_read_throttle_time(reply); @@ -8380,14 +8378,6 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( group_state = RD_KAFKAP_STR_DUP(&GroupState); assignor_name = RD_KAFKAP_STR_DUP(&AssignorName); error_str = RD_KAFKAP_STR_DUP(&ErrorString); - printf("error_code: %d\n", error_code); - printf("error string: %s\n", error_str); - printf("group_id: %s\n", group_id); - printf("group_state: %s\n", group_state); - printf("group_epoch: %d\n", group_epoch); - printf("assignment_epoch: %d\n", assignment_epoch); - printf("assignor_name: %s\n", assignor_name); - printf("member_cnt: %d\n", member_cnt); if(error_code) { error = rd_kafka_error_new(error_code, "ConsumerGroupDescribe: %s", error_str); @@ -8414,35 +8404,28 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( rd_kafka_buf_read_str(reply, &ClientId); rd_kafka_buf_read_str(reply, &ClientHost); rd_kafka_buf_read_arraycnt(reply, &subscribed_topic_names_array_cnt, 100000); - printf("subscribed_topic_names_array_cnt: %d\n", subscribed_topic_names_array_cnt); + subscribed_topic_names_array = rd_calloc(subscribed_topic_names_array_cnt, sizeof(char*)); for(idx=0; idx < subscribed_topic_names_array_cnt; idx++) { rd_kafkap_str_t SubscribedTopicName; rd_kafka_buf_read_str(reply, &SubscribedTopicName); char *subscribed_topic_name = RD_KAFKAP_STR_DUP(&SubscribedTopicName); subscribed_topic_names_array[idx] = subscribed_topic_name; - printf("subscribed_topic_name: %s\n", subscribed_topic_name); } rd_kafka_buf_read_str(reply, &SubscribedTopicRegex); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - // rd_kafka_buf_read_i8(reply, &are_assignments_present); - // printf("are_assignments_present: %d\n", are_assignments_present); - // if(are_assignments_present == 1) { - assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - rd_kafka_buf_skip_tags(reply); - // } - // rd_kafka_buf_read_i8(reply, &are_target_assignments_present); - // printf("are_target_assignments_present: %d\n", are_target_assignments_present); - // if(are_target_assignments_present == 1) { - - target_assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - //} + + assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); + + target_assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); member_id = RD_KAFKAP_STR_DUP(&MemberId); @@ -8452,33 +8435,10 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( client_host = RD_KAFKAP_STR_DUP(&ClientHost); subscribed_topic_regex = RD_KAFKAP_STR_DUP(&SubscribedTopicRegex); - printf("member_id: %s\n", member_id); - printf("instance_id: %s\n", instance_id); - printf("rack_id: %s\n", rack_id); - printf("member_epoch: %d\n", MemberEpoch); - printf("client_id: %s\n", client_id); - printf("client_host: %s\n", client_host); - printf("subscribed topic names: "); - for(int i=0;ielems[0].topic, assignment->elems[0].partition); - } - else { - printf("assignment is null\n"); - } - if(target_assignment) { - printf("target_assignment: topic: %s partition: %d\n", target_assignment->elems[0].topic, target_assignment->elems[0].partition); - } - else { - printf("target_assignment is null\n"); - } member = rd_kafka_MemberDescription_new( client_id, member_id, instance_id, client_host, assignment, target_assignment); - + + rd_kafka_buf_skip_tags(reply); rd_list_add(&members, member); if(assignment) @@ -8526,7 +8486,6 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( } rd_kafka_buf_skip_tags(reply); *rko_resultp = rko_result; - printf("rd_kafka_ConsumerGroupDescribeResponseParse end\n"); return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: if(group_id) @@ -8559,11 +8518,9 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_ConsumerGroupDescription_t *newgroupres; const char *grp = rko_partial->rko_u.admin_result.opaque; int orig_pos; - printf("rd_kafka_DescribeConsumerGroups_response_merge\n"); + rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT || rko_partial->rko_evtype == RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT); - printf("rko_partial->rko_evtype: %d\n", rko_partial->rko_evtype); - printf("processing group: %s\n", grp); if (!rko_partial->rko_err) { /* Proper results. @@ -8573,7 +8530,6 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_assert(groupres); rd_assert(!strcmp(groupres->group_id, grp)); newgroupres = rd_kafka_ConsumerGroupDescription_copy(groupres); - printf("newgroupres->type: %d\n", newgroupres->type); } else { /* Op errored, e.g. timeout */ rd_kafka_error_t *error = @@ -8582,11 +8538,9 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_ConsumerGroupDescription_new_error(grp, error, RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); rd_kafka_error_destroy(error); } - printf("newgroupres->type: %d\n", newgroupres->type); - if(newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && + if(newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && newgroupres->error && (newgroupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || newgroupres->error->code == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { - printf("group not found\n"); rko_fanout->rko_u.admin_request.fanout.outstanding++; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_admin_DescribeConsumerGroupsRequest, @@ -8617,7 +8571,6 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_kafka_q_enq(rko_fanout->rko_rk->rk_ops, rko); } else { - printf("group found\n"); /* As a convenience to the application we insert group result * in the same order as they were requested. */ orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, @@ -8630,9 +8583,7 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, newgroupres); - printf("processed group: %s\n", grp); } - printf("rd_kafka_DescribeConsumerGroups_response_merge end\n"); } void rd_kafka_DescribeConsumerGroups(rd_kafka_t *rk, @@ -8740,7 +8691,6 @@ rd_kafka_DescribeConsumerGroups_result_groups( const rd_kafka_DescribeConsumerGroups_result_t *result, size_t *cntp) { const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; - printf("rd_kafka_DescribeConsumerGroups_result_groups\n"); rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS || reqtype == RD_KAFKA_OP_CONSUMERGROUPDESCRIBE); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 7ba447b15b..41398b899e 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6046,18 +6046,15 @@ rd_kafka_ConsumerGroupDescribeRequest(rd_kafka_broker_t *rkb, ofGroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); rd_kafka_buf_finalize_arraycnt(rkbuf, ofGroupsArrayCnt, grp_ids_cnt); - printf("grp_ids_cnt: %d\n", grp_ids_cnt); - printf("include_authorized_operations: %d\n", include_authorized_operations); + RD_LIST_FOREACH(group, groups, i) { group = rd_list_elem(groups, i); - printf("group: %s\n", group); rd_kafka_buf_write_str(rkbuf, group, -1); } rd_kafka_buf_write_bool(rkbuf, include_authorized_operations); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - printf("rd_kafka_ConsumerGroupDescribeRequest: done\n"); return RD_KAFKA_RESP_ERR_NO_ERROR; } From 041a3068745bf2b301181e32937b53cd864b6f3c Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 3 Dec 2024 13:28:53 +0530 Subject: [PATCH 04/10] Api working now, tests left --- src/rdkafka_event.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 41a98f83cf..eb1330bc02 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -384,7 +384,7 @@ rd_kafka_event_ListConsumerGroups_result(rd_kafka_event_t *rkev) { const rd_kafka_DescribeConsumerGroups_result_t * rd_kafka_event_DescribeConsumerGroups_result(rd_kafka_event_t *rkev) { if (!rkev || - (rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT && rkev->rko_evtype != RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT)) + (rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT)) return NULL; else return (const rd_kafka_DescribeConsumerGroups_result_t *)rkev; From 56dcce6c6c9e497cb1ba4a14cd8b056884f619e7 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 4 Dec 2024 15:11:43 +0530 Subject: [PATCH 05/10] Wrote tests --- tests/0081-admin.c | 201 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 9144c400c9..80431be1ee 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3137,6 +3137,203 @@ static void do_test_DescribeConsumerGroups(const char *what, SUB_TEST_PASS(); } +static void do_test_DescribeConsumerGroups_Compatibility(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4 + int i; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + char *topic; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_ConsumerGroupDescription_t **results = NULL; + expected_DescribeConsumerGroups_result_t + expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT; + const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + const rd_kafka_DescribeConsumerGroups_result_t *res; + char *protocols[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = { + "classic", "classic", "consumer", "consumer"}; + size_t authorized_operation_cnt; + rd_bool_t has_group_instance_id = + test_broker_version >= TEST_BRKVER(2, 4, 0, 0); + + SUB_TEST_QUICK("%s DescribeConsumerGroups Compatibility Test with %s, request_timeout %d", + rd_kafka_name(rk), what, request_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (request_timeout != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + rd_kafka_conf_t *conf; + char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + if (i < 2) { + /* Classic Protocol */ + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "group.protocol", "classic"); + } else { + /* Consumer Protocol */ + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "group.protocol", "consumer"); + } + + snprintf(client_ids[i], sizeof(client_ids[i]), "client_id_%" PRId32, i); + + test_conf_set(conf, "client.id", client_ids[i]); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + rks[i] = test_create_consumer(group_id, NULL, conf, NULL); + test_consumer_subscribe(rks[i], topic); + /* Consume messages */ + test_consumer_poll("consumer", rks[i], testid, -1, -1, msgs_cnt, NULL); + + expected[i].group_id = group_id; + expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; + describe_groups[i] = group_id; + } + + TIMING_START(&timing, "DescribeConsumerGroups"); + TEST_SAY("Call DescribeConsumerGroups\n"); + rd_kafka_DescribeConsumerGroups( + rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + TIMING_START(&timing, "DescribeConsumerGroups.queue_poll"); + + /* Poll result queue for DescribeConsumerGroups result. */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); + } + + /* Convert event to proper result */ + res = rd_kafka_event_DescribeConsumerGroups_result(rkev); + TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DescribeConsumerGroups to return %s, got %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + size_t cnt = 0; + results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt); + + TEST_ASSERT( + TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt, + "expected DescribeConsumerGroups_result_groups to return %d items, " + "got %" PRIusz, + TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt); + + /* Verify results */ + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + expected_DescribeConsumerGroups_result_t *exp = &expected[i]; + rd_kafka_resp_err_t exp_err = exp->err; + const rd_kafka_ConsumerGroupDescription_t *act = results[i]; + rd_kafka_resp_err_t act_err = rd_kafka_error_code( + rd_kafka_ConsumerGroupDescription_error(act)); + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupDescription_state(act); + + TEST_ASSERT( + strcmp(exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id(act)) == 0, + "Result order mismatch at #%d: expected group id to be %s, got %s", + i, exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id(act)); + + if (strcmp(protocols[i], "classic") == 0) { + TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_STABLE || + state == RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, + "Expected Stable or Empty state for classic protocol, got %s.", + rd_kafka_consumer_group_state_name(state)); + } else if (strcmp(protocols[i], "consumer") == 0) { + TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_STABLE, + "Expected Stable state, got %s.", + rd_kafka_consumer_group_state_name(state)); + } + + TEST_ASSERT(exp_err == act_err, + "expected err=%d for group %s, got %d (%s)", + exp_err, exp->group_id, act_err, + rd_kafka_err2str(act_err)); + } + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + test_consumer_close(rks[i]); + rd_kafka_destroy(rks[i]); + rd_free(expected[i].group_id); + } + + test_DeleteTopics_simple(rk, NULL, &topic, 1, NULL); + rd_free(topic); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); + +#undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT + + SUB_TEST_PASS(); +} + /** @brief Helper function to check whether \p expected and \p actual contain * the same values. */ static void @@ -5274,6 +5471,10 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_DescribeConsumerGroups("temp queue", rk, NULL, -1); do_test_DescribeConsumerGroups("main queue", rk, mainq, 1500); } + else { + do_test_DescribeConsumerGroups_Compatibility("temp queue", rk, NULL, -1); + do_test_DescribeConsumerGroups_Compatibility("main queue", rk, mainq, 1500); + } /* Describe topics */ do_test_DescribeTopics("temp queue", rk, NULL, 15000, rd_false); From f4adaf840ea3f4ca5f6dc1f4b0ab649dc6aca21f Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 4 Dec 2024 15:21:46 +0530 Subject: [PATCH 06/10] style fix --- examples/consumer.c | 7 - examples/describe_consumer_groups.c | 12 +- src/rdkafka.h | 21 +- src/rdkafka_admin.c | 385 +++++++++++++++------------- src/rdkafka_admin.h | 13 +- src/rdkafka_op.c | 5 +- src/rdkafka_proto.h | 2 +- tests/0081-admin.c | 359 +++++++++++++------------- 8 files changed, 418 insertions(+), 386 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index 76ebfbd633..dad3efc43b 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -139,13 +139,6 @@ int main(int argc, char **argv) { return 1; } - if(rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - fprintf(stderr, "%s\n", errstr); - rd_kafka_conf_destroy(conf); - return 1; - } - /* * Create consumer instance. * diff --git a/examples/describe_consumer_groups.c b/examples/describe_consumer_groups.c index d58116cf94..529cee7e69 100644 --- a/examples/describe_consumer_groups.c +++ b/examples/describe_consumer_groups.c @@ -175,16 +175,17 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) { print_partition_list(stdout, topic_partitions, 0, " "); } const rd_kafka_MemberAssignment_t *target_assignment = - rd_kafka_MemberDescription_target_assignment(member); + rd_kafka_MemberDescription_target_assignment(member); const rd_kafka_topic_partition_list_t *target_topic_partitions = - rd_kafka_MemberAssignment_partitions(target_assignment); + rd_kafka_MemberAssignment_partitions(target_assignment); if (!target_topic_partitions) { printf(" No target assignment\n"); } else if (target_topic_partitions->cnt == 0) { printf(" Empty target assignment\n"); } else { printf(" Target assignment:\n"); - print_partition_list(stdout, target_topic_partitions, 0, " "); + print_partition_list(stdout, target_topic_partitions, 0, + " "); } } @@ -207,7 +208,7 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupDescription_state(group); rd_kafka_consumer_group_type_t type = - rd_kafka_ConsumerGroupDescription_type(group); + rd_kafka_ConsumerGroupDescription_type(group); authorized_operations = rd_kafka_ConsumerGroupDescription_authorized_operations( group, &authorized_operations_cnt); @@ -228,7 +229,8 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { printf( "Group \"%s\", partition assignor \"%s\", type \"%s\" " " state %s%s, with %" PRId32 " member(s)\n", - group_id, partition_assignor, rd_kafka_consumer_group_type_name(type), + group_id, partition_assignor, + rd_kafka_consumer_group_type_name(type), rd_kafka_consumer_group_state_name(state), coordinator_desc, member_cnt); for (j = 0; j < authorized_operations_cnt; j++) { diff --git a/src/rdkafka.h b/src/rdkafka.h index 7724c490e8..6f6c7f3dc8 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -8886,11 +8886,11 @@ const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator( /** * @brief Gets type for the \p grpdesc group. - * + * * @param grpdesc The group description. - * + * * @return A group type. - * + * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p grpdesc object. */ @@ -9012,11 +9012,11 @@ const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( /** * @brief Gets target assignment of \p member. - * + * * @param member The group member. - * + * * @return The target assignment. - * + * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p member object. */ @@ -9026,16 +9026,17 @@ const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment( /** * @brief Gets target assigned partitions of a member \p assignment. - * + * * @param assignment The group member assignment. - * + * * @return The target assigned partitions. - * + * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p assignment object. */ RD_EXPORT -const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_target_partitions( +const rd_kafka_topic_partition_list_t * +rd_kafka_MemberAssignment_target_partitions( const rd_kafka_MemberAssignment_t *assignment); /**@}*/ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 99a2de4b02..6f7900480c 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7782,10 +7782,9 @@ static rd_kafka_MemberDescription_t *rd_kafka_MemberDescription_new( */ static rd_kafka_MemberDescription_t * rd_kafka_MemberDescription_copy(const rd_kafka_MemberDescription_t *src) { - return rd_kafka_MemberDescription_new(src->client_id, src->consumer_id, - src->group_instance_id, src->host, - src->assignment.partitions, - src->target_assignment.partitions); + return rd_kafka_MemberDescription_new( + src->client_id, src->consumer_id, src->group_instance_id, src->host, + src->assignment.partitions, src->target_assignment.partitions); } /** @@ -7809,7 +7808,7 @@ rd_kafka_MemberDescription_destroy(rd_kafka_MemberDescription_t *member) { if (member->assignment.partitions) rd_kafka_topic_partition_list_destroy( member->assignment.partitions); - if(member->target_assignment.partitions) + if (member->target_assignment.partitions) rd_kafka_topic_partition_list_destroy( member->target_assignment.partitions); rd_free(member); @@ -7854,7 +7853,8 @@ const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment( return &member->target_assignment; } -const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_target_partitions( +const rd_kafka_topic_partition_list_t * +rd_kafka_MemberAssignment_target_partitions( const rd_kafka_MemberAssignment_t *assignment) { return assignment->partitions; } @@ -7927,9 +7927,10 @@ rd_kafka_ConsumerGroupDescription_new( * Use rd_kafka_ConsumerGroupDescription_destroy() to free when done. */ static rd_kafka_ConsumerGroupDescription_t * -rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, - rd_kafka_error_t *error, - rd_kafka_consumer_group_type_t type) { +rd_kafka_ConsumerGroupDescription_new_error( + const char *group_id, + rd_kafka_error_t *error, + rd_kafka_consumer_group_type_t type) { return rd_kafka_ConsumerGroupDescription_new( group_id, rd_false, NULL, NULL, NULL, 0, RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error, type); @@ -7948,7 +7949,8 @@ rd_kafka_ConsumerGroupDescription_copy( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, grpdesc->authorized_operations, grpdesc->authorized_operations_cnt, - grpdesc->state, grpdesc->coordinator, grpdesc->error, grpdesc->type); + grpdesc->state, grpdesc->coordinator, grpdesc->error, + grpdesc->type); } /** @@ -8266,7 +8268,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, node, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); } else grpdesc = rd_kafka_ConsumerGroupDescription_new_error( - group_id, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); + group_id, error, + RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); @@ -8320,22 +8323,23 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, return reply->rkbuf_err; } -static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( - rd_kafka_op_t *rko_req, - rd_kafka_op_t **rko_resultp, - rd_kafka_buf_t *reply, - char *errstr, - size_t errstr_size) { +static rd_kafka_resp_err_t +rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { const int log_decode_errors = LOG_ERR; int16_t api_version; int32_t cnt; rd_kafka_op_t *rko_result = NULL; rd_kafka_broker_t *rkb = reply->rkbuf_rkb; rd_kafka_error_t *error = NULL; - char *group_id = NULL, *group_state = NULL, *assignor_name = NULL, *error_str = NULL, *host = NULL; + char *group_id = NULL, *group_state = NULL, *assignor_name = NULL, + *error_str = NULL, *host = NULL; int32_t group_epoch, assignment_epoch; rd_kafka_AclOperation_t *operations = NULL; - rd_kafka_Node_t *node = NULL; + rd_kafka_Node_t *node = NULL; int32_t nodeid; uint16_t port; int operation_cnt = -1; @@ -8357,155 +8361,173 @@ static rd_kafka_resp_err_t rd_kafka_ConsumerGroupDescribeResponseParse( rd_list_init(&rko_result->rko_u.admin_result.results, cnt, rd_kafka_ConsumerGroupDescription_free); - for(int i = 0; i < cnt; i++) { - int16_t error_code; - int32_t authorized_operations = -1; - int32_t member_cnt; - rd_kafkap_str_t GroupId, GroupState, AssignorName, ErrorString; - rd_list_t members; - rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; - - rd_kafka_buf_read_i16(reply, &error_code); - rd_kafka_buf_read_str(reply, &ErrorString); - rd_kafka_buf_read_str(reply, &GroupId); - rd_kafka_buf_read_str(reply, &GroupState); - rd_kafka_buf_read_i32(reply, &group_epoch); - rd_kafka_buf_read_i32(reply, &assignment_epoch); - rd_kafka_buf_read_str(reply, &AssignorName); - rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000); - - group_id = RD_KAFKAP_STR_DUP(&GroupId); - group_state = RD_KAFKAP_STR_DUP(&GroupState); - assignor_name = RD_KAFKAP_STR_DUP(&AssignorName); - error_str = RD_KAFKAP_STR_DUP(&ErrorString); - - if(error_code) { - error = rd_kafka_error_new(error_code, "ConsumerGroupDescribe: %s", error_str); - } - - rd_list_init(&members, 0, rd_kafka_MemberDescription_free); - - for(int j = 0; j < member_cnt; j++) { - rd_kafkap_str_t MemberId, InstanceId, RackId, ClientId, ClientHost, SubscribedTopicNames, SubscribedTopicRegex; - int32_t MemberEpoch, idx; - char *member_id, *instance_id, *rack_id, - *client_id, *client_host, *subscribed_topic_names, - *subscribed_topic_regex = NULL; - rd_kafka_MemberDescription_t *member = NULL; - rd_kafka_topic_partition_list_t *assignment = NULL, *target_assignment = NULL; - int8_t are_assignments_present = 0, are_target_assignments_present = 0; - char **subscribed_topic_names_array = NULL; - int32_t subscribed_topic_names_array_cnt; - - rd_kafka_buf_read_str(reply, &MemberId); - rd_kafka_buf_read_str(reply, &InstanceId); - rd_kafka_buf_read_str(reply, &RackId); - rd_kafka_buf_read_i32(reply, &MemberEpoch); - rd_kafka_buf_read_str(reply, &ClientId); - rd_kafka_buf_read_str(reply, &ClientHost); - rd_kafka_buf_read_arraycnt(reply, &subscribed_topic_names_array_cnt, 100000); - - subscribed_topic_names_array = rd_calloc(subscribed_topic_names_array_cnt, sizeof(char*)); - for(idx=0; idx < subscribed_topic_names_array_cnt; idx++) { - rd_kafkap_str_t SubscribedTopicName; - rd_kafka_buf_read_str(reply, &SubscribedTopicName); - char *subscribed_topic_name = RD_KAFKAP_STR_DUP(&SubscribedTopicName); - subscribed_topic_names_array[idx] = subscribed_topic_name; + for (int i = 0; i < cnt; i++) { + int16_t error_code; + int32_t authorized_operations = -1; + int32_t member_cnt; + rd_kafkap_str_t GroupId, GroupState, AssignorName, ErrorString; + rd_list_t members; + rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &ErrorString); + rd_kafka_buf_read_str(reply, &GroupId); + rd_kafka_buf_read_str(reply, &GroupState); + rd_kafka_buf_read_i32(reply, &group_epoch); + rd_kafka_buf_read_i32(reply, &assignment_epoch); + rd_kafka_buf_read_str(reply, &AssignorName); + rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000); + + group_id = RD_KAFKAP_STR_DUP(&GroupId); + group_state = RD_KAFKAP_STR_DUP(&GroupState); + assignor_name = RD_KAFKAP_STR_DUP(&AssignorName); + error_str = RD_KAFKAP_STR_DUP(&ErrorString); + + if (error_code) { + error = rd_kafka_error_new( + error_code, "ConsumerGroupDescribe: %s", error_str); } - rd_kafka_buf_read_str(reply, &SubscribedTopicRegex); - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - - assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - rd_kafka_buf_skip_tags(reply); - - target_assignment = rd_kafka_buf_read_topic_partitions( - reply, rd_true /* use topic_id */, - rd_true /* use topic name*/, 0, fields); - - rd_kafka_buf_skip_tags(reply); - - member_id = RD_KAFKAP_STR_DUP(&MemberId); - instance_id = RD_KAFKAP_STR_DUP(&InstanceId); - rack_id = RD_KAFKAP_STR_DUP(&RackId); - client_id = RD_KAFKAP_STR_DUP(&ClientId); - client_host = RD_KAFKAP_STR_DUP(&ClientHost); - subscribed_topic_regex = RD_KAFKAP_STR_DUP(&SubscribedTopicRegex); - - member = rd_kafka_MemberDescription_new( - client_id, member_id, instance_id, client_host, assignment, target_assignment); - + + rd_list_init(&members, 0, rd_kafka_MemberDescription_free); + + for (int j = 0; j < member_cnt; j++) { + rd_kafkap_str_t MemberId, InstanceId, RackId, ClientId, + ClientHost, SubscribedTopicNames, + SubscribedTopicRegex; + int32_t MemberEpoch, idx; + char *member_id, *instance_id, *rack_id, *client_id, + *client_host, *subscribed_topic_names, + *subscribed_topic_regex = NULL; + rd_kafka_MemberDescription_t *member = NULL; + rd_kafka_topic_partition_list_t *assignment = NULL, + *target_assignment = + NULL; + int8_t are_assignments_present = 0, + are_target_assignments_present = 0; + char **subscribed_topic_names_array = NULL; + int32_t subscribed_topic_names_array_cnt; + + rd_kafka_buf_read_str(reply, &MemberId); + rd_kafka_buf_read_str(reply, &InstanceId); + rd_kafka_buf_read_str(reply, &RackId); + rd_kafka_buf_read_i32(reply, &MemberEpoch); + rd_kafka_buf_read_str(reply, &ClientId); + rd_kafka_buf_read_str(reply, &ClientHost); + rd_kafka_buf_read_arraycnt( + reply, &subscribed_topic_names_array_cnt, 100000); + + subscribed_topic_names_array = rd_calloc( + subscribed_topic_names_array_cnt, sizeof(char *)); + for (idx = 0; idx < subscribed_topic_names_array_cnt; + idx++) { + rd_kafkap_str_t SubscribedTopicName; + rd_kafka_buf_read_str(reply, + &SubscribedTopicName); + char *subscribed_topic_name = + RD_KAFKAP_STR_DUP(&SubscribedTopicName); + subscribed_topic_names_array[idx] = + subscribed_topic_name; + } + rd_kafka_buf_read_str(reply, &SubscribedTopicRegex); + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + + assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + rd_kafka_buf_skip_tags(reply); + + target_assignment = rd_kafka_buf_read_topic_partitions( + reply, rd_true /* use topic_id */, + rd_true /* use topic name*/, 0, fields); + + rd_kafka_buf_skip_tags(reply); + + member_id = RD_KAFKAP_STR_DUP(&MemberId); + instance_id = RD_KAFKAP_STR_DUP(&InstanceId); + rack_id = RD_KAFKAP_STR_DUP(&RackId); + client_id = RD_KAFKAP_STR_DUP(&ClientId); + client_host = RD_KAFKAP_STR_DUP(&ClientHost); + subscribed_topic_regex = + RD_KAFKAP_STR_DUP(&SubscribedTopicRegex); + + member = rd_kafka_MemberDescription_new( + client_id, member_id, instance_id, client_host, + assignment, target_assignment); + + rd_kafka_buf_skip_tags(reply); + rd_list_add(&members, member); + + if (assignment) + rd_kafka_topic_partition_list_destroy( + assignment); + if (target_assignment) + rd_kafka_topic_partition_list_destroy( + target_assignment); + rd_free(member_id); + rd_free(instance_id); + rd_free(rack_id); + rd_free(client_id); + rd_free(client_host); + rd_free(subscribed_topic_names); + rd_free(subscribed_topic_regex); + } + rd_kafka_buf_read_i32(reply, &authorized_operations); + operations = rd_kafka_AuthorizedOperations_parse( + authorized_operations, &operation_cnt); rd_kafka_buf_skip_tags(reply); - rd_list_add(&members, member); - - if(assignment) - rd_kafka_topic_partition_list_destroy(assignment); - if(target_assignment) - rd_kafka_topic_partition_list_destroy(target_assignment); - rd_free(member_id); - rd_free(instance_id); - rd_free(rack_id); - rd_free(client_id); - rd_free(client_host); - rd_free(subscribed_topic_names); - rd_free(subscribed_topic_regex); - } - rd_kafka_buf_read_i32(reply, &authorized_operations); - operations = rd_kafka_AuthorizedOperations_parse( - authorized_operations, &operation_cnt); - rd_kafka_buf_skip_tags(reply); - - if(error == NULL) { - grpdesc = rd_kafka_ConsumerGroupDescription_new( - group_id, rd_false, &members, assignor_name, operations, operation_cnt, - rd_kafka_consumer_group_state_code(group_state), - node, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); - } else { - grpdesc = rd_kafka_ConsumerGroupDescription_new_error( - group_id, error , RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); - } - rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); - - rd_list_destroy(&members); - rd_free(group_id); - rd_free(group_state); - rd_free(assignor_name); - rd_free(error_str); - RD_IF_FREE(error, rd_kafka_error_destroy); - RD_IF_FREE(operations, rd_free); - - error = NULL; - group_id = NULL; - group_state = NULL; - assignor_name = NULL; - error_str = NULL; - operations = NULL; + + if (error == NULL) { + grpdesc = rd_kafka_ConsumerGroupDescription_new( + group_id, rd_false, &members, assignor_name, + operations, operation_cnt, + rd_kafka_consumer_group_state_code(group_state), + node, error, RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + } else { + grpdesc = rd_kafka_ConsumerGroupDescription_new_error( + group_id, error, + RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + } + rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc); + + rd_list_destroy(&members); + rd_free(group_id); + rd_free(group_state); + rd_free(assignor_name); + rd_free(error_str); + RD_IF_FREE(error, rd_kafka_error_destroy); + RD_IF_FREE(operations, rd_free); + + error = NULL; + group_id = NULL; + group_state = NULL; + assignor_name = NULL; + error_str = NULL; + operations = NULL; } rd_kafka_buf_skip_tags(reply); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: - if(group_id) - rd_free(group_id); - if(group_state) - rd_free(group_state); - if(assignor_name) - rd_free(assignor_name); - if(error_str) - rd_free(error_str); - if(error) - rd_kafka_error_destroy(error); + if (group_id) + rd_free(group_id); + if (group_state) + rd_free(group_state); + if (assignor_name) + rd_free(assignor_name); + if (error_str) + rd_free(error_str); + if (error) + rd_kafka_error_destroy(error); RD_IF_FREE(operations, rd_free); - if(rko_result) - rd_kafka_op_destroy(rko_result); - rd_snprintf(errstr, errstr_size, - "DescribeConsumerGroups response protocol parse failure: %s", - rd_kafka_err2str(reply->rkbuf_err)); + if (rko_result) + rd_kafka_op_destroy(rko_result); + rd_snprintf( + errstr, errstr_size, + "DescribeConsumerGroups response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); return reply->rkbuf_err; - } /** @brief Merge the DescribeConsumerGroups response from a single broker @@ -8520,7 +8542,9 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( int orig_pos; rd_assert(rko_partial->rko_evtype == - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT || rko_partial->rko_evtype == RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT); + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT || + rko_partial->rko_evtype == + RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT); if (!rko_partial->rko_err) { /* Proper results. @@ -8534,13 +8558,16 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( /* Op errored, e.g. timeout */ rd_kafka_error_t *error = rd_kafka_error_new(rko_partial->rko_err, NULL); - newgroupres = - rd_kafka_ConsumerGroupDescription_new_error(grp, error, RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); + newgroupres = rd_kafka_ConsumerGroupDescription_new_error( + grp, error, RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); rd_kafka_error_destroy(error); } - if(newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && newgroupres->error && - (newgroupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || newgroupres->error->code == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { + if (newgroupres->type == RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER && + newgroupres->error && + (newgroupres->error->code == RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND || + newgroupres->error->code == + RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)) { rko_fanout->rko_u.admin_request.fanout.outstanding++; static const struct rd_kafka_admin_worker_cbs cbs = { rd_kafka_admin_DescribeConsumerGroupsRequest, @@ -8548,7 +8575,8 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( }; rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( rko_fanout->rko_rk, RD_KAFKA_OP_CONSUMERGROUPDESCRIBE, - RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT, &cbs, &rko_fanout->rko_u.admin_request.options, + RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT, &cbs, + &rko_fanout->rko_u.admin_request.options, rko_fanout->rko_rk->rk_ops); rko->rko_u.admin_request.fanout_parent = rko_fanout; @@ -8563,26 +8591,26 @@ static void rd_kafka_DescribeConsumerGroups_response_merge( * the fanned out op. */ rd_kafka_AdminOptions_set_opaque( &rko->rko_u.admin_request.options, grp); - + rd_list_init(&rko->rko_u.admin_request.args, 1, rd_free); - rd_list_add(&rko->rko_u.admin_request.args, - rd_strdup(grp)); + rd_list_add(&rko->rko_u.admin_request.args, rd_strdup(grp)); rd_kafka_q_enq(rko_fanout->rko_rk->rk_ops, rko); - } - else { + } else { /* As a convenience to the application we insert group result - * in the same order as they were requested. */ - orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, - rd_kafka_DescribeConsumerGroups_cmp); + * in the same order as they were requested. */ + orig_pos = + rd_list_index(&rko_fanout->rko_u.admin_request.args, grp, + rd_kafka_DescribeConsumerGroups_cmp); rd_assert(orig_pos != -1); /* Make sure result is not already set */ - rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.fanout.results, - orig_pos) == NULL); + rd_assert(rd_list_elem( + &rko_fanout->rko_u.admin_request.fanout.results, + orig_pos) == NULL); - rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, orig_pos, - newgroupres); + rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, + orig_pos, newgroupres); } } @@ -8693,7 +8721,8 @@ rd_kafka_DescribeConsumerGroups_result_groups( const rd_kafka_op_t *rko = (const rd_kafka_op_t *)result; rd_kafka_op_type_t reqtype = rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; - rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS || reqtype == RD_KAFKA_OP_CONSUMERGROUPDESCRIBE); + rd_assert(reqtype == RD_KAFKA_OP_DESCRIBECONSUMERGROUPS || + reqtype == RD_KAFKA_OP_CONSUMERGROUPDESCRIBE); *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); return (const rd_kafka_ConsumerGroupDescription_t **) diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index f215501fec..4cbbe7c227 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -490,13 +490,12 @@ struct rd_kafka_MemberAssignment_s { * */ struct rd_kafka_MemberDescription_s { - char *client_id; /**< Client id */ - char *consumer_id; /**< Consumer id */ - char *group_instance_id; /**< Group instance id */ - char *host; /**< Group member host */ - rd_kafka_MemberAssignment_t assignment; /**< Member assignment */ - rd_kafka_MemberAssignment_t - target_assignment; /**< Target assignment */ + char *client_id; /**< Client id */ + char *consumer_id; /**< Consumer id */ + char *group_instance_id; /**< Group instance id */ + char *host; /**< Group member host */ + rd_kafka_MemberAssignment_t assignment; /**< Member assignment */ + rd_kafka_MemberAssignment_t target_assignment; /**< Target assignment */ }; /** diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index c8c72a99d7..6edb28527b 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -122,9 +122,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER", [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", - [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", - [RD_KAFKA_OP_CONSUMERGROUPDESCRIBE] = - "REPLY:CONSUMERGROUPDESCRIBE", + [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_CONSUMERGROUPDESCRIBE] = "REPLY:CONSUMERGROUPDESCRIBE", }; if (type & RD_KAFKA_OP_REPLY) diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index a1fae6bd6a..161532e8fa 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -156,7 +156,7 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) { "DescribeUserScramCredentialsRequest", [RD_KAFKAP_AlterUserScramCredentials] = "AlterUserScramCredentialsRequest", - [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest", + [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest", [RD_KAFKAP_Vote] = "VoteRequest", [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest", [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest", diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 80431be1ee..493d5b4a7a 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3138,200 +3138,208 @@ static void do_test_DescribeConsumerGroups(const char *what, } static void do_test_DescribeConsumerGroups_Compatibility(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int request_timeout) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; #define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4 - int i; - const int partitions_cnt = 1; - const int msgs_cnt = 100; - char *topic; - rd_kafka_metadata_topic_t exp_mdtopic = {0}; - int64_t testid = test_id_generate(); - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - const rd_kafka_ConsumerGroupDescription_t **results = NULL; - expected_DescribeConsumerGroups_result_t - expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT; - const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; - char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; - char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; - rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; - const rd_kafka_DescribeConsumerGroups_result_t *res; - char *protocols[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = { - "classic", "classic", "consumer", "consumer"}; - size_t authorized_operation_cnt; - rd_bool_t has_group_instance_id = - test_broker_version >= TEST_BRKVER(2, 4, 0, 0); - - SUB_TEST_QUICK("%s DescribeConsumerGroups Compatibility Test with %s, request_timeout %d", - rd_kafka_name(rk), what, request_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (request_timeout != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); + int i; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + char *topic; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_ConsumerGroupDescription_t **results = NULL; + expected_DescribeConsumerGroups_result_t + expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT; + const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + const rd_kafka_DescribeConsumerGroups_result_t *res; + char *protocols[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = { + "classic", "classic", "consumer", "consumer"}; + size_t authorized_operation_cnt; + rd_bool_t has_group_instance_id = + test_broker_version >= TEST_BRKVER(2, 4, 0, 0); - err = rd_kafka_AdminOptions_set_request_timeout( - options, request_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } + SUB_TEST_QUICK( + "%s DescribeConsumerGroups Compatibility Test with %s, " + "request_timeout %d", + rd_kafka_name(rk), what, request_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (request_timeout != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } - topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - exp_mdtopic.topic = topic; + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); - /* Produce 100 msgs */ - test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - rd_kafka_conf_t *conf; - char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - if (i < 2) { - /* Classic Protocol */ - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "group.protocol", "classic"); - } else { - /* Consumer Protocol */ - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "group.protocol", "consumer"); + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + rd_kafka_conf_t *conf; + char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + if (i < 2) { + /* Classic Protocol */ + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "group.protocol", "classic"); + } else { + /* Consumer Protocol */ + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "group.protocol", "consumer"); + } + + snprintf(client_ids[i], sizeof(client_ids[i]), + "client_id_%" PRId32, i); + + test_conf_set(conf, "client.id", client_ids[i]); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + rks[i] = test_create_consumer(group_id, NULL, conf, NULL); + test_consumer_subscribe(rks[i], topic); + /* Consume messages */ + test_consumer_poll("consumer", rks[i], testid, -1, -1, msgs_cnt, + NULL); + + expected[i].group_id = group_id; + expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; + describe_groups[i] = group_id; } - snprintf(client_ids[i], sizeof(client_ids[i]), "client_id_%" PRId32, i); - - test_conf_set(conf, "client.id", client_ids[i]); - test_conf_set(conf, "session.timeout.ms", "5000"); - test_conf_set(conf, "auto.offset.reset", "earliest"); - rks[i] = test_create_consumer(group_id, NULL, conf, NULL); - test_consumer_subscribe(rks[i], topic); - /* Consume messages */ - test_consumer_poll("consumer", rks[i], testid, -1, -1, msgs_cnt, NULL); - - expected[i].group_id = group_id; - expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; - describe_groups[i] = group_id; - } - - TIMING_START(&timing, "DescribeConsumerGroups"); - TEST_SAY("Call DescribeConsumerGroups\n"); - rd_kafka_DescribeConsumerGroups( - rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - TIMING_START(&timing, "DescribeConsumerGroups.queue_poll"); - - /* Poll result queue for DescribeConsumerGroups result. */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { - break; + TIMING_START(&timing, "DescribeConsumerGroups"); + TEST_SAY("Call DescribeConsumerGroups\n"); + rd_kafka_DescribeConsumerGroups( + rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + TIMING_START(&timing, "DescribeConsumerGroups.queue_poll"); + + /* Poll result queue for DescribeConsumerGroups result. */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); } - rd_kafka_event_destroy(rkev); - } - - /* Convert event to proper result */ - res = rd_kafka_event_DescribeConsumerGroups_result(rkev); - TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s", - rd_kafka_event_name(rkev)); - - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected DescribeConsumerGroups to return %s, got %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - size_t cnt = 0; - results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt); - - TEST_ASSERT( - TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt, - "expected DescribeConsumerGroups_result_groups to return %d items, " - "got %" PRIusz, - TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt); - - /* Verify results */ - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - expected_DescribeConsumerGroups_result_t *exp = &expected[i]; - rd_kafka_resp_err_t exp_err = exp->err; - const rd_kafka_ConsumerGroupDescription_t *act = results[i]; - rd_kafka_resp_err_t act_err = rd_kafka_error_code( - rd_kafka_ConsumerGroupDescription_error(act)); - rd_kafka_consumer_group_state_t state = - rd_kafka_ConsumerGroupDescription_state(act); + /* Convert event to proper result */ + res = rd_kafka_event_DescribeConsumerGroups_result(rkev); + TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DescribeConsumerGroups to return %s, got %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + size_t cnt = 0; + results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt); TEST_ASSERT( - strcmp(exp->group_id, - rd_kafka_ConsumerGroupDescription_group_id(act)) == 0, - "Result order mismatch at #%d: expected group id to be %s, got %s", - i, exp->group_id, - rd_kafka_ConsumerGroupDescription_group_id(act)); - - if (strcmp(protocols[i], "classic") == 0) { - TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_STABLE || - state == RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, - "Expected Stable or Empty state for classic protocol, got %s.", - rd_kafka_consumer_group_state_name(state)); - } else if (strcmp(protocols[i], "consumer") == 0) { - TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_STABLE, - "Expected Stable state, got %s.", - rd_kafka_consumer_group_state_name(state)); - } + TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt, + "expected DescribeConsumerGroups_result_groups to return %d items, " + "got %" PRIusz, + TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt); + + /* Verify results */ + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + expected_DescribeConsumerGroups_result_t *exp = &expected[i]; + rd_kafka_resp_err_t exp_err = exp->err; + const rd_kafka_ConsumerGroupDescription_t *act = results[i]; + rd_kafka_resp_err_t act_err = rd_kafka_error_code( + rd_kafka_ConsumerGroupDescription_error(act)); + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupDescription_state(act); + + TEST_ASSERT(strcmp(exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id( + act)) == 0, + "Result order mismatch at #%d: expected group id " + "to be %s, got %s", + i, exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id(act)); - TEST_ASSERT(exp_err == act_err, - "expected err=%d for group %s, got %d (%s)", - exp_err, exp->group_id, act_err, - rd_kafka_err2str(act_err)); - } + if (strcmp(protocols[i], "classic") == 0) { + TEST_ASSERT( + state == RD_KAFKA_CONSUMER_GROUP_STATE_STABLE || + state == RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, + "Expected Stable or Empty state for classic " + "protocol, got %s.", + rd_kafka_consumer_group_state_name(state)); + } else if (strcmp(protocols[i], "consumer") == 0) { + TEST_ASSERT(state == + RD_KAFKA_CONSUMER_GROUP_STATE_STABLE, + "Expected Stable state, got %s.", + rd_kafka_consumer_group_state_name(state)); + } + + TEST_ASSERT(exp_err == act_err, + "expected err=%d for group %s, got %d (%s)", + exp_err, exp->group_id, act_err, + rd_kafka_err2str(act_err)); + } - rd_kafka_event_destroy(rkev); + rd_kafka_event_destroy(rkev); - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - test_consumer_close(rks[i]); - rd_kafka_destroy(rks[i]); - rd_free(expected[i].group_id); - } + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + test_consumer_close(rks[i]); + rd_kafka_destroy(rks[i]); + rd_free(expected[i].group_id); + } - test_DeleteTopics_simple(rk, NULL, &topic, 1, NULL); - rd_free(topic); + test_DeleteTopics_simple(rk, NULL, &topic, 1, NULL); + rd_free(topic); - if (options) - rd_kafka_AdminOptions_destroy(options); + if (options) + rd_kafka_AdminOptions_destroy(options); - if (!useq) - rd_kafka_queue_destroy(q); + if (!useq) + rd_kafka_queue_destroy(q); - TEST_LATER_CHECK(); + TEST_LATER_CHECK(); #undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT - SUB_TEST_PASS(); + SUB_TEST_PASS(); } /** @brief Helper function to check whether \p expected and \p actual contain @@ -5470,10 +5478,11 @@ static void do_test_apis(rd_kafka_type_t cltype) { /* Describe groups */ do_test_DescribeConsumerGroups("temp queue", rk, NULL, -1); do_test_DescribeConsumerGroups("main queue", rk, mainq, 1500); - } - else { - do_test_DescribeConsumerGroups_Compatibility("temp queue", rk, NULL, -1); - do_test_DescribeConsumerGroups_Compatibility("main queue", rk, mainq, 1500); + } else { + do_test_DescribeConsumerGroups_Compatibility("temp queue", rk, + NULL, -1); + do_test_DescribeConsumerGroups_Compatibility("main queue", rk, + mainq, 1500); } /* Describe topics */ From 01a16280de8454f6cec2c3455fd3849b34ab3aca Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 4 Dec 2024 15:34:27 +0530 Subject: [PATCH 07/10] style fix --- src/rdkafka_op.h | 14 +++++++------- src/rdkafka_proto.h | 34 +++++++++++++++++----------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 521cb46484..a735d94bc3 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -182,13 +182,13 @@ typedef enum { u.admin_request >*/ RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ RD_KAFKA_OP_METADATA_UPDATE, /**< Metadata update (KIP 951) **/ - RD_KAFKA_OP_SET_TELEMETRY_BROKER, /**< Set preferred broker for - telemetry. */ - RD_KAFKA_OP_TERMINATE_TELEMETRY, /**< Start termination sequence for - telemetry. */ - RD_KAFKA_OP_ELECTLEADERS, /**< Admin: - * ElectLeaders - * u.admin_request */ + RD_KAFKA_OP_SET_TELEMETRY_BROKER, /**< Set preferred broker for + telemetry. */ + RD_KAFKA_OP_TERMINATE_TELEMETRY, /**< Start termination sequence for + telemetry. */ + RD_KAFKA_OP_ELECTLEADERS, /**< Admin: + * ElectLeaders + * u.admin_request */ RD_KAFKA_OP_CONSUMERGROUPDESCRIBE, /**< Admin: * ConsumerGroupDescribe * u.admin_request */ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 161532e8fa..96637ab466 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -156,23 +156,23 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) { "DescribeUserScramCredentialsRequest", [RD_KAFKAP_AlterUserScramCredentials] = "AlterUserScramCredentialsRequest", - [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest", - [RD_KAFKAP_Vote] = "VoteRequest", - [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest", - [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest", - [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest", - [RD_KAFKAP_AlterIsr] = "AlterIsrRequest", - [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest", - [RD_KAFKAP_Envelope] = "EnvelopeRequest", - [RD_KAFKAP_FetchSnapshot] = "FetchSnapshot", - [RD_KAFKAP_DescribeCluster] = "DescribeCluster", - [RD_KAFKAP_DescribeProducers] = "DescribeProducers", - [RD_KAFKAP_BrokerHeartbeat] = "BrokerHeartbeat", - [RD_KAFKAP_UnregisterBroker] = "UnregisterBroker", - [RD_KAFKAP_DescribeTransactions] = "DescribeTransactions", - [RD_KAFKAP_ListTransactions] = "ListTransactions", - [RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds", - [RD_KAFKAP_ConsumerGroupHeartbeat] = "ConsumerGroupHeartbeat", + [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribeRequest", + [RD_KAFKAP_Vote] = "VoteRequest", + [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest", + [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest", + [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest", + [RD_KAFKAP_AlterIsr] = "AlterIsrRequest", + [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest", + [RD_KAFKAP_Envelope] = "EnvelopeRequest", + [RD_KAFKAP_FetchSnapshot] = "FetchSnapshot", + [RD_KAFKAP_DescribeCluster] = "DescribeCluster", + [RD_KAFKAP_DescribeProducers] = "DescribeProducers", + [RD_KAFKAP_BrokerHeartbeat] = "BrokerHeartbeat", + [RD_KAFKAP_UnregisterBroker] = "UnregisterBroker", + [RD_KAFKAP_DescribeTransactions] = "DescribeTransactions", + [RD_KAFKAP_ListTransactions] = "ListTransactions", + [RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds", + [RD_KAFKAP_ConsumerGroupHeartbeat] = "ConsumerGroupHeartbeat", [RD_KAFKAP_GetTelemetrySubscriptions] = "GetTelemetrySubscriptions", [RD_KAFKAP_PushTelemetry] = "PushTelemetry", From 5488ec74b31107585cebc8e64b7713267dd4c4bb Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 4 Dec 2024 15:39:50 +0530 Subject: [PATCH 08/10] removing unused variables --- src/rdkafka_admin.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6f7900480c..9512c49c64 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8402,8 +8402,6 @@ rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, rd_kafka_topic_partition_list_t *assignment = NULL, *target_assignment = NULL; - int8_t are_assignments_present = 0, - are_target_assignments_present = 0; char **subscribed_topic_names_array = NULL; int32_t subscribed_topic_names_array_cnt; From 4863104cdcafb30f0c609c0e36a6bcd92fba870e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 4 Dec 2024 16:49:11 +0530 Subject: [PATCH 09/10] trying to fix build --- src/rdkafka_admin.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 9512c49c64..5bffc30a72 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8343,6 +8343,7 @@ rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, int32_t nodeid; uint16_t port; int operation_cnt = -1; + int32_t i; api_version = rd_kafka_buf_ApiVersion(reply); rd_kafka_buf_read_throttle_time(reply); @@ -8361,10 +8362,10 @@ rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, rd_list_init(&rko_result->rko_u.admin_result.results, cnt, rd_kafka_ConsumerGroupDescription_free); - for (int i = 0; i < cnt; i++) { + for (i = 0; i < cnt; i++) { int16_t error_code; int32_t authorized_operations = -1; - int32_t member_cnt; + int32_t member_cnt, j; rd_kafkap_str_t GroupId, GroupState, AssignorName, ErrorString; rd_list_t members; rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL; @@ -8390,7 +8391,7 @@ rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, rd_list_init(&members, 0, rd_kafka_MemberDescription_free); - for (int j = 0; j < member_cnt; j++) { + for (j = 0; j < member_cnt; j++) { rd_kafkap_str_t MemberId, InstanceId, RackId, ClientId, ClientHost, SubscribedTopicNames, SubscribedTopicRegex; @@ -8402,7 +8403,7 @@ rd_kafka_ConsumerGroupDescribeResponseParse(rd_kafka_op_t *rko_req, rd_kafka_topic_partition_list_t *assignment = NULL, *target_assignment = NULL; - char **subscribed_topic_names_array = NULL; + char **subscribed_topic_names_array = NULL; int32_t subscribed_topic_names_array_cnt; rd_kafka_buf_read_str(reply, &MemberId); From 71d02368fc0997bb961a7c20cffe124b511c7aa9 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 27 Dec 2024 16:59:09 +0530 Subject: [PATCH 10/10] changelog change --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d0e8491fd..00d0b61759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ librdkafka v2.6.3 is a maintenance release: +* DescribeConsumerGroup now supports new consumer protocol groups (#4922). * Socket options are now all set before connection (#4893). * Client certificate chain is now sent when using `ssl.certificate.pem` or `ssl_certificate` or `ssl.keystore.location` (#4894).