Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ connections.max.idle.ms | * | 0 .. 2147483647 | 0
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`. <br>*Type: integer*
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. <br>*Type: integer*
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed. <br>*Type: integer*
reconnect.failure.report.ms | * | 0 .. 86400000 | 60000 | medium | The time after which persistent broker reconnection failures should be reported to the application via an error event (RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). This helps detect scenarios where a broker is unreachable or authentication is failing persistently (e.g., after broker restarts with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min). <br>*Type: integer*
statistics.interval.ms | * | 0 .. 86400000 | 0 | high | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. A value of 0 disables statistics. <br>*Type: integer*
enabled_events | * | 0 .. 2147483647 | 0 | low | See `rd_kafka_conf_set_events()` <br>*Type: integer*
error_cb | * | | | low | Error callback (set with rd_kafka_conf_set_error_cb()) <br>*Type: see dedicated API*
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD,
"Local: an invalid record in the same batch caused "
"the failure of this message too"),
_ERR_DESC(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE,
"Local: Broker reconnection has been failing persistently"),
_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY_BROKER,
"Local: Broker handle destroyed without termination"),

Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ typedef enum {
RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD = -138,
/** Broker is going away but client isn't terminating */
RD_KAFKA_RESP_ERR__DESTROY_BROKER = -137,
/** Broker reconnection has been failing persistently */
RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE = -136,

/** End internal error codes */
RD_KAFKA_RESP_ERR__END = -100,
Expand Down
180 changes: 178 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,62 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,

rkb->rkb_reauth_in_progress = rd_false;

/* Track persistent connection failures and report to application
* if threshold is reached. This helps detect scenarios where
* broker reconnection is persistently failing (e.g., after broker
* restart with AWS MSK IAM auth). */
{
rd_ts_t now = rd_clock();
int reconnect_failure_report_ms =
rkb->rkb_rk->rk_conf.reconnect_failure_report_ms;

/* Start tracking if this is the first failure */
if (rkb->rkb_ts_first_failure == 0)
rkb->rkb_ts_first_failure = now;

/* On authentication failures, force DNS re-resolution on next
* connection attempt. This helps with scenarios where broker
* IPs change after restarts (e.g., AWS MSK with private links).
*/
if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
err == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) {
rkb->rkb_force_dns_reresolution = rd_true;
rd_rkb_dbg(rkb, BROKER, "AUTHFAIL",
"Authentication failure, will force DNS "
"re-resolution on next connect attempt");
}

/* Report persistent failure if threshold reached and not yet
* reported */
if (reconnect_failure_report_ms > 0 &&
!rkb->rkb_persistent_failure_reported &&
rkb->rkb_ts_first_failure > 0 &&
(now - rkb->rkb_ts_first_failure) >=
((rd_ts_t)reconnect_failure_report_ms * 1000)) {
rkb->rkb_persistent_failure_reported = rd_true;

rd_kafka_log(rkb->rkb_rk, LOG_WARNING, "BRKFAIL",
"%s: Broker reconnection has been failing "
"persistently for %dms (threshold: %dms). "
"Last error: %s",
rkb->rkb_name,
(int)((now - rkb->rkb_ts_first_failure) /
1000),
reconnect_failure_report_ms,
rd_kafka_err2str(err));

/* Send persistent failure error to application */
rd_kafka_q_op_err(
rkb->rkb_rk->rk_rep,
RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE,
"%s: Broker reconnection has been failing "
"persistently for %dms: %s",
rkb->rkb_name,
(int)((now - rkb->rkb_ts_first_failure) / 1000),
rd_kafka_err2str(err));
}
}

va_start(ap, fmt);
rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
va_end(ap);
Expand Down Expand Up @@ -2294,9 +2350,17 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
rd_kafka_broker_lock(rkb);
rd_strlcpy(nodename, rkb->rkb_nodename, sizeof(nodename));

/* If the nodename was changed since the last connect,
/* If the nodename was changed since the last connect, or if
* DNS re-resolution was forced (e.g., after auth failure),
* reset the address cache. */
reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch);
reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch) ||
rkb->rkb_force_dns_reresolution;
if (rkb->rkb_force_dns_reresolution) {
rd_rkb_dbg(rkb, BROKER, "CONNECT",
"Forcing DNS re-resolution due to previous "
"authentication or connection failure");
rkb->rkb_force_dns_reresolution = rd_false;
}
rkb->rkb_connect_epoch = rkb->rkb_nodename_epoch;
/* Logical brokers might not have a hostname set, in which case
* we should not try to connect. */
Expand Down Expand Up @@ -2345,6 +2409,11 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {
rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_reauth_in_progress = rd_false;

/* Reset persistent failure tracking on successful connection */
rkb->rkb_ts_first_failure = 0;
rkb->rkb_persistent_failure_reported = rd_false;
rkb->rkb_force_dns_reresolution = rd_false;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
rd_kafka_broker_unlock(rkb);
Expand Down Expand Up @@ -6320,6 +6389,111 @@ static int rd_ut_ApiVersion_at_least(void) {
RD_UT_PASS();
}

/**
* @brief Unittest for persistent failure tracking
*
* Tests that:
* 1. First failure time is tracked correctly
* 2. Persistent failure is reported after threshold
* 3. Failure tracking is reset on successful connection
* 4. DNS re-resolution is triggered on auth failures
*/
static int rd_ut_persistent_failure_tracking(void) {
rd_kafka_broker_t rkb = RD_ZERO_INIT;
rd_kafka_conf_t conf = {.reconnect_failure_report_ms = 100};
rd_kafka_t rk = RD_ZERO_INIT;
rd_ts_t now = 1000000; /* 1 second in microseconds */

RD_UT_SAY("Testing persistent failure tracking");

rkb.rkb_rk = &rk;
rk.rk_conf = conf;

/* Test 1: First failure should set rkb_ts_first_failure */
RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0,
"Initial rkb_ts_first_failure should be 0");

/* Simulate setting first failure (normally done in rd_kafka_broker_fail)
*/
rkb.rkb_ts_first_failure = now;
RD_UT_ASSERT(rkb.rkb_ts_first_failure == now,
"rkb_ts_first_failure should be set to now");

/* Test 2: After threshold, persistent failure should be reported */
rd_ts_t after_threshold = now + (conf.reconnect_failure_report_ms * 1000);
RD_UT_ASSERT(
(after_threshold - rkb.rkb_ts_first_failure) >=
((rd_ts_t)conf.reconnect_failure_report_ms * 1000),
"Time difference should exceed threshold");

/* Test 3: Simulating successful connection reset */
rkb.rkb_persistent_failure_reported = rd_true;
rkb.rkb_force_dns_reresolution = rd_true;

/* Reset (normally done in rd_kafka_broker_connect_up) */
rkb.rkb_ts_first_failure = 0;
rkb.rkb_persistent_failure_reported = rd_false;
rkb.rkb_force_dns_reresolution = rd_false;

RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0,
"rkb_ts_first_failure should be reset to 0");
RD_UT_ASSERT(rkb.rkb_persistent_failure_reported == rd_false,
"rkb_persistent_failure_reported should be reset");
RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_false,
"rkb_force_dns_reresolution should be reset");

/* Test 4: Auth failure should set DNS re-resolution flag */
/* Simulate auth failure setting the flag */
rkb.rkb_force_dns_reresolution = rd_true;
RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_true,
"Auth failure should set DNS re-resolution flag");

/* Test 5: Disabled persistent failure reporting (threshold = 0) */
rk.rk_conf.reconnect_failure_report_ms = 0;
rkb.rkb_ts_first_failure = now;
rkb.rkb_persistent_failure_reported = rd_false;

/* With threshold 0, reporting should not happen even after long time */
rd_ts_t way_later = now + (1000000 * 1000); /* 1000 seconds later */
rd_bool_t should_report =
(rk.rk_conf.reconnect_failure_report_ms > 0 &&
!rkb.rkb_persistent_failure_reported &&
rkb.rkb_ts_first_failure > 0 &&
(way_later - rkb.rkb_ts_first_failure) >=
((rd_ts_t)rk.rk_conf.reconnect_failure_report_ms * 1000));

RD_UT_ASSERT(should_report == rd_false,
"With threshold=0, should never report persistent failure");

RD_UT_PASS();
}

/**
* @brief Unittest for reconnect.failure.report.ms configuration validation
*/
static int rd_ut_reconnect_failure_config(void) {
rd_kafka_conf_t conf = RD_ZERO_INIT;

RD_UT_SAY("Testing reconnect.failure.report.ms configuration");

/* Test default value */
conf.reconnect_failure_report_ms = 60000; /* default */
RD_UT_ASSERT(conf.reconnect_failure_report_ms == 60000,
"Default should be 60000ms");

/* Test disabled (0) */
conf.reconnect_failure_report_ms = 0;
RD_UT_ASSERT(conf.reconnect_failure_report_ms == 0,
"Value 0 should disable persistent failure reporting");

/* Test custom value */
conf.reconnect_failure_report_ms = 30000;
RD_UT_ASSERT(conf.reconnect_failure_report_ms == 30000,
"Custom value should be accepted");

RD_UT_PASS();
}

/**
* @name Unit tests
* @{
Expand All @@ -6330,6 +6504,8 @@ int unittest_broker(void) {

fails += rd_ut_reconnect_backoff();
fails += rd_ut_ApiVersion_at_least();
fails += rd_ut_persistent_failure_tracking();
fails += rd_ut_reconnect_failure_config();

return fails;
}
Expand Down
11 changes: 11 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
/** Absolute time of last connection attempt. */
rd_ts_t rkb_ts_connect;

/** Absolute time when first failure in current failure streak started.
* Reset to 0 when a successful connection is made. */
rd_ts_t rkb_ts_first_failure;

/** Whether we've already reported this persistent failure to the app. */
rd_bool_t rkb_persistent_failure_reported;

/** Whether DNS should be re-resolved on next connect attempt.
* Set to true on authentication failures. */
rd_bool_t rkb_force_dns_reresolution;

/** True if a reauthentication is in progress. */
rd_bool_t rkb_reauth_in_progress;

Expand Down
9 changes: 9 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"The maximum time to wait before reconnecting to a broker "
"after the connection has been closed.",
0, 60 * 60 * 1000, 10 * 1000},
{_RK_GLOBAL | _RK_MED, "reconnect.failure.report.ms", _RK_C_INT,
_RK(reconnect_failure_report_ms),
"The time after which persistent broker reconnection failures should "
"be reported to the application via an error event "
"(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). "
"This helps detect scenarios where a broker is unreachable or "
"authentication is failing persistently (e.g., after broker restarts "
"with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min).",
0, 24 * 60 * 60 * 1000, 60 * 1000},
{_RK_GLOBAL | _RK_HIGH, "statistics.interval.ms", _RK_C_INT,
_RK(stats_interval_ms),
"librdkafka statistics emit interval. The application also needs to "
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ struct rd_kafka_conf_s {
int reconnect_backoff_ms;
int reconnect_backoff_max_ms;
int reconnect_jitter_ms;
int reconnect_failure_report_ms;
int socket_connection_setup_timeout_ms;
int connections_max_idle_ms;
int sparse_connections;
Expand Down
Loading