Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a52d29c
Prototype code to obtain assignments
milindl Jul 14, 2025
aed6043
Add broker-op for share fetching
milindl Oct 6, 2025
24acb72
Add fields to op
milindl Oct 13, 2025
847d04b
Add preliminary polling
milindl Oct 17, 2025
8cfb428
Start returning messages to user
milindl Oct 17, 2025
e6b512a
Add retry for share fetches
milindl Oct 22, 2025
98b5dfe
Unify error handling and add records conf
milindl Oct 27, 2025
39ba841
Allow poll to exit immediately on message
milindl Oct 28, 2025
24ada52
Fix invalid reads and compiler warnings
milindl Oct 28, 2025
279edb9
Add missing backoff for share fetch retries
milindl Oct 28, 2025
d5d50ad
Added Share fetch request and response parsing
pranavrth Oct 23, 2025
5f6e43f
Something working with lots of Segfault
pranavrth Oct 23, 2025
a96e267
Fixed Acknowledgement and acknowledgement type array count
pranavrth Oct 23, 2025
7cbcb9d
Fixed incrementing epoch. Fixed Uuid url encoding issue temporarily. …
pranavrth Oct 23, 2025
49a628b
Added timing around poll, fixed a memory leak
pranavrth Oct 28, 2025
089d6ed
Added multiple broker handing of should not fetch case
pranavrth Oct 28, 2025
5b4d9c5
Call sharegroupheartbeat when leaving group (#5247)
PratRanj07 Nov 20, 2025
dbee935
Add session partition information
pranavrth Nov 13, 2025
3a644ec
Added Share partition level operations
pranavrth Nov 17, 2025
8795346
Working with session partition management
pranavrth Nov 17, 2025
f0f85be
Deduplicated partition management.
pranavrth Nov 17, 2025
4fad8e5
toppars in session and memory fix related to freeing Error Message
pranavrth Nov 18, 2025
06a4832
Fix refcount issue with session partition management
pranavrth Nov 20, 2025
6775e7d
Improve ref counting and ref counting for toppars
pranavrth Nov 20, 2025
6f4837d
Add session leave. Not working properly. Improve this.
pranavrth Nov 20, 2025
04500ea
Improve acknowledgement to include all batches instead of just the la…
pranavrth Nov 20, 2025
acaf938
Improve consumer printing
pranavrth Nov 20, 2025
adf1808
Fix partition acknowledgement not being sent as it was referring to t…
pranavrth Nov 21, 2025
77e19db
More debug logging
pranavrth Nov 21, 2025
c1c811e
Fix incorrectly adding partitions to send in ShareFetch request even …
pranavrth Nov 25, 2025
241d833
Fix incorrectly parsing ShareFetch Response if MessageSetSize was 0
pranavrth Nov 25, 2025
d26d3a5
Fix incorrectly assigning memory to rktp_share_acknowledge even if Ac…
pranavrth Nov 25, 2025
6c6d281
nit: correct naming for partitions being sent to the ShareFetch request
pranavrth Nov 25, 2025
342fd04
added tests
PratRanj07 Nov 18, 2025
511d424
Added more tests
PratRanj07 Nov 24, 2025
a4b1fc7
Modified tests
PratRanj07 Nov 26, 2025
869479d
Created seperate file for tests and added more tests
PratRanj07 Nov 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ offset_commit_cb | C | |
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description, <br>*Type: integer*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
Expand Down
125 changes: 86 additions & 39 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@
* (https://github.com/confluentinc/librdkafka)
*/

#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 199309L
#endif

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
#include <time.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
Expand Down Expand Up @@ -149,6 +154,29 @@ int main(int argc, char **argv) {
return 1;
}

if (rd_kafka_conf_set(conf, "share.consumer", "true", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}


if (rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}


// if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) !=
// RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// rd_kafka_conf_destroy(conf);
// return 1;
// }

/*
* Create consumer instance.
*
Expand Down Expand Up @@ -213,49 +241,68 @@ int main(int argc, char **argv) {
* since a rebalance may happen at any time.
* Start polling for messages. */

rd_kafka_message_t *rkmessages[500];
while (run) {
rd_kafka_message_t *rkm;

rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: no message within 100ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/

/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr, "%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
rd_kafka_message_t *rkm = NULL;
size_t rcvd_msgs = 0;
int i;

// rkm = rd_kafka_consumer_poll(rk, 100);
rd_kafka_error_t *error;

// fprintf(stderr, "Calling consume_batch\n");
struct timespec __t0, __t1;
if (clock_gettime(CLOCK_MONOTONIC, &__t0) != 0)
perror("clock_gettime");
error = rd_kafka_share_consume_batch(rk, 500, rkmessages, &rcvd_msgs);
if (clock_gettime(CLOCK_MONOTONIC, &__t1) != 0)
perror("clock_gettime");
double __elapsed_ms =
(__t1.tv_sec - __t0.tv_sec) * 1000.0 + (__t1.tv_nsec - __t0.tv_nsec) / 1e6;
// fprintf(stdout, "%% rd_kafka_share_consume_batch() took %.3f ms\n", __elapsed_ms);

if (error) {
fprintf(stderr, "%% Consume error: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
continue;
}

/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64
" (leader epoch %" PRId32 "):\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset, rd_kafka_message_leader_epoch(rkm));

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);

/* Print the message value/payload. */
if (rkm->payload && is_printable(rkm->payload, rkm->len))
printf(" Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);

rd_kafka_message_destroy(rkm);
// fprintf(stderr, "%% Received %zu messages\n", rcvd_msgs);
for (i = 0; i < (int)rcvd_msgs; i++) {
rkm = rkmessages[i];

if (rkm->err) {
fprintf(stderr, "%% Consumer error: %d: %s\n",
rkm->err, rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}

// if((int)rcvd_msgs < -1) {
/* Proper message. */
printf("Message received on %s [%" PRId32 "] at offset %" PRId64,
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);

/* Print the message value/payload. */
if (rkm->payload &&
is_printable(rkm->payload, rkm->len))
printf(" - Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" - Value: (%d bytes)\n", (int)rkm->len);
// }

rd_kafka_message_destroy(rkm);
}
}


Expand Down
7 changes: 5 additions & 2 deletions src/rd.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,17 @@ static RD_INLINE RD_UNUSED int rd_refcnt_get(rd_refcnt_t *R) {
rd_refcnt_get(R), (R), WHAT, __FUNCTION__, __LINE__), \
rd_refcnt_sub0(R))

#define rd_refcnt_sub(R) \
#define rd_refcnt_sub_fl(FUNC, LINE, R) \
(fprintf(stderr, "REFCNT DEBUG: %-35s %d -1: %16p: %s:%d\n", #R, \
rd_refcnt_get(R), (R), __FUNCTION__, __LINE__), \
rd_refcnt_get(R), (R), (FUNC), (LINE)), \
rd_refcnt_sub0(R))

#define rd_refcnt_sub(R) rd_refcnt_sub_fl(__FUNCTION__, __LINE__, R)

#else
#define rd_refcnt_add_fl(FUNC, LINE, R) rd_refcnt_add0(R)
#define rd_refcnt_add(R) rd_refcnt_add0(R)
#define rd_refcnt_sub_fl(FUNC, LINE, R) rd_refcnt_sub0(R)
#define rd_refcnt_sub(R) rd_refcnt_sub0(R)
#endif

Expand Down
Loading