diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 40b7412ef..51c75a52d 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -36,6 +36,7 @@ connections.max.idle.ms | * | 0 .. 2147483647 | 0
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`.
*Type: integer*
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
*Type: integer*
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed.
*Type: integer*
+reconnect.failure.report.ms | * | 0 .. 86400000 | 60000 | medium | The time after which persistent broker reconnection failures should be reported to the application via an error event (RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). This helps detect scenarios where a broker is unreachable or authentication is failing persistently (e.g., after broker restarts with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min).
*Type: integer*
statistics.interval.ms | * | 0 .. 86400000 | 0 | high | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. A value of 0 disables statistics.
*Type: integer*
enabled_events | * | 0 .. 2147483647 | 0 | low | See `rd_kafka_conf_set_events()`
*Type: integer*
error_cb | * | | | low | Error callback (set with rd_kafka_conf_set_error_cb())
*Type: see dedicated API*
diff --git a/src/rdkafka.c b/src/rdkafka.c
index f3d5b7f3f..77e22d0b7 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -491,6 +491,8 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD,
"Local: an invalid record in the same batch caused "
"the failure of this message too"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE,
+ "Local: Broker reconnection has been failing persistently"),
_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY_BROKER,
"Local: Broker handle destroyed without termination"),
diff --git a/src/rdkafka.h b/src/rdkafka.h
index 3565b1c5a..c2fd1ab5c 100644
--- a/src/rdkafka.h
+++ b/src/rdkafka.h
@@ -414,6 +414,8 @@ typedef enum {
RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD = -138,
/** Broker is going away but client isn't terminating */
RD_KAFKA_RESP_ERR__DESTROY_BROKER = -137,
+ /** Broker reconnection has been failing persistently */
+ RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE = -136,
/** End internal error codes */
RD_KAFKA_RESP_ERR__END = -100,
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index eb8e84924..86ff313d7 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -652,6 +652,62 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rkb->rkb_reauth_in_progress = rd_false;
+ /* Track persistent connection failures and report to application
+ * if threshold is reached. This helps detect scenarios where
+ * broker reconnection is persistently failing (e.g., after broker
+ * restart with AWS MSK IAM auth). */
+ {
+ rd_ts_t now = rd_clock();
+ int reconnect_failure_report_ms =
+ rkb->rkb_rk->rk_conf.reconnect_failure_report_ms;
+
+ /* Start tracking if this is the first failure */
+ if (rkb->rkb_ts_first_failure == 0)
+ rkb->rkb_ts_first_failure = now;
+
+ /* On authentication failures, force DNS re-resolution on next
+ * connection attempt. This helps with scenarios where broker
+ * IPs change after restarts (e.g., AWS MSK with private links).
+ */
+ if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
+ err == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) {
+ rkb->rkb_force_dns_reresolution = rd_true;
+ rd_rkb_dbg(rkb, BROKER, "AUTHFAIL",
+ "Authentication failure, will force DNS "
+ "re-resolution on next connect attempt");
+ }
+
+ /* Report persistent failure if threshold reached and not yet
+ * reported */
+ if (reconnect_failure_report_ms > 0 &&
+ !rkb->rkb_persistent_failure_reported &&
+ rkb->rkb_ts_first_failure > 0 &&
+ (now - rkb->rkb_ts_first_failure) >=
+ ((rd_ts_t)reconnect_failure_report_ms * 1000)) {
+ rkb->rkb_persistent_failure_reported = rd_true;
+
+ rd_kafka_log(rkb->rkb_rk, LOG_WARNING, "BRKFAIL",
+ "%s: Broker reconnection has been failing "
+ "persistently for %dms (threshold: %dms). "
+ "Last error: %s",
+ rkb->rkb_name,
+ (int)((now - rkb->rkb_ts_first_failure) /
+ 1000),
+ reconnect_failure_report_ms,
+ rd_kafka_err2str(err));
+
+ /* Send persistent failure error to application */
+ rd_kafka_q_op_err(
+ rkb->rkb_rk->rk_rep,
+ RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE,
+ "%s: Broker reconnection has been failing "
+ "persistently for %dms: %s",
+ rkb->rkb_name,
+ (int)((now - rkb->rkb_ts_first_failure) / 1000),
+ rd_kafka_err2str(err));
+ }
+ }
+
va_start(ap, fmt);
rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
va_end(ap);
@@ -2294,9 +2350,17 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
rd_kafka_broker_lock(rkb);
rd_strlcpy(nodename, rkb->rkb_nodename, sizeof(nodename));
- /* If the nodename was changed since the last connect,
+ /* If the nodename was changed since the last connect, or if
+ * DNS re-resolution was forced (e.g., after auth failure),
* reset the address cache. */
- reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch);
+ reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch) ||
+ rkb->rkb_force_dns_reresolution;
+ if (rkb->rkb_force_dns_reresolution) {
+ rd_rkb_dbg(rkb, BROKER, "CONNECT",
+ "Forcing DNS re-resolution due to previous "
+ "authentication or connection failure");
+ rkb->rkb_force_dns_reresolution = rd_false;
+ }
rkb->rkb_connect_epoch = rkb->rkb_nodename_epoch;
/* Logical brokers might not have a hostname set, in which case
* we should not try to connect. */
@@ -2345,6 +2409,11 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {
rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_reauth_in_progress = rd_false;
+ /* Reset persistent failure tracking on successful connection */
+ rkb->rkb_ts_first_failure = 0;
+ rkb->rkb_persistent_failure_reported = rd_false;
+ rkb->rkb_force_dns_reresolution = rd_false;
+
rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
rd_kafka_broker_unlock(rkb);
@@ -6320,6 +6389,111 @@ static int rd_ut_ApiVersion_at_least(void) {
RD_UT_PASS();
}
+/**
+ * @brief Unittest for persistent failure tracking
+ *
+ * Tests that:
+ * 1. First failure time is tracked correctly
+ * 2. Persistent failure is reported after threshold
+ * 3. Failure tracking is reset on successful connection
+ * 4. DNS re-resolution is triggered on auth failures
+ */
+static int rd_ut_persistent_failure_tracking(void) {
+ rd_kafka_broker_t rkb = RD_ZERO_INIT;
+ rd_kafka_conf_t conf = {.reconnect_failure_report_ms = 100};
+ rd_kafka_t rk = RD_ZERO_INIT;
+ rd_ts_t now = 1000000; /* 1 second in microseconds */
+
+ RD_UT_SAY("Testing persistent failure tracking");
+
+ rkb.rkb_rk = &rk;
+ rk.rk_conf = conf;
+
+ /* Test 1: First failure should set rkb_ts_first_failure */
+ RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0,
+ "Initial rkb_ts_first_failure should be 0");
+
+ /* Simulate setting first failure (normally done in rd_kafka_broker_fail)
+ */
+ rkb.rkb_ts_first_failure = now;
+ RD_UT_ASSERT(rkb.rkb_ts_first_failure == now,
+ "rkb_ts_first_failure should be set to now");
+
+ /* Test 2: After threshold, persistent failure should be reported */
+ rd_ts_t after_threshold = now + (conf.reconnect_failure_report_ms * 1000);
+ RD_UT_ASSERT(
+ (after_threshold - rkb.rkb_ts_first_failure) >=
+ ((rd_ts_t)conf.reconnect_failure_report_ms * 1000),
+ "Time difference should exceed threshold");
+
+ /* Test 3: Simulating successful connection reset */
+ rkb.rkb_persistent_failure_reported = rd_true;
+ rkb.rkb_force_dns_reresolution = rd_true;
+
+ /* Reset (normally done in rd_kafka_broker_connect_up) */
+ rkb.rkb_ts_first_failure = 0;
+ rkb.rkb_persistent_failure_reported = rd_false;
+ rkb.rkb_force_dns_reresolution = rd_false;
+
+ RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0,
+ "rkb_ts_first_failure should be reset to 0");
+ RD_UT_ASSERT(rkb.rkb_persistent_failure_reported == rd_false,
+ "rkb_persistent_failure_reported should be reset");
+ RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_false,
+ "rkb_force_dns_reresolution should be reset");
+
+ /* Test 4: Auth failure should set DNS re-resolution flag */
+ /* Simulate auth failure setting the flag */
+ rkb.rkb_force_dns_reresolution = rd_true;
+ RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_true,
+ "Auth failure should set DNS re-resolution flag");
+
+ /* Test 5: Disabled persistent failure reporting (threshold = 0) */
+ rk.rk_conf.reconnect_failure_report_ms = 0;
+ rkb.rkb_ts_first_failure = now;
+ rkb.rkb_persistent_failure_reported = rd_false;
+
+ /* With threshold 0, reporting should not happen even after long time */
+ rd_ts_t way_later = now + (1000000 * 1000); /* 1000 seconds later */
+ rd_bool_t should_report =
+ (rk.rk_conf.reconnect_failure_report_ms > 0 &&
+ !rkb.rkb_persistent_failure_reported &&
+ rkb.rkb_ts_first_failure > 0 &&
+ (way_later - rkb.rkb_ts_first_failure) >=
+ ((rd_ts_t)rk.rk_conf.reconnect_failure_report_ms * 1000));
+
+ RD_UT_ASSERT(should_report == rd_false,
+ "With threshold=0, should never report persistent failure");
+
+ RD_UT_PASS();
+}
+
+/**
+ * @brief Unittest for reconnect.failure.report.ms configuration validation
+ */
+static int rd_ut_reconnect_failure_config(void) {
+ rd_kafka_conf_t conf = RD_ZERO_INIT;
+
+ RD_UT_SAY("Testing reconnect.failure.report.ms configuration");
+
+ /* Test default value */
+ conf.reconnect_failure_report_ms = 60000; /* default */
+ RD_UT_ASSERT(conf.reconnect_failure_report_ms == 60000,
+ "Default should be 60000ms");
+
+ /* Test disabled (0) */
+ conf.reconnect_failure_report_ms = 0;
+ RD_UT_ASSERT(conf.reconnect_failure_report_ms == 0,
+ "Value 0 should disable persistent failure reporting");
+
+ /* Test custom value */
+ conf.reconnect_failure_report_ms = 30000;
+ RD_UT_ASSERT(conf.reconnect_failure_report_ms == 30000,
+ "Custom value should be accepted");
+
+ RD_UT_PASS();
+}
+
/**
* @name Unit tests
* @{
@@ -6330,6 +6504,8 @@ int unittest_broker(void) {
fails += rd_ut_reconnect_backoff();
fails += rd_ut_ApiVersion_at_least();
+ fails += rd_ut_persistent_failure_tracking();
+ fails += rd_ut_reconnect_failure_config();
return fails;
}
diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h
index a649b7445..d4ebda1c9 100644
--- a/src/rdkafka_broker.h
+++ b/src/rdkafka_broker.h
@@ -305,6 +305,17 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
/** Absolute time of last connection attempt. */
rd_ts_t rkb_ts_connect;
+ /** Absolute time when first failure in current failure streak started.
+ * Reset to 0 when a successful connection is made. */
+ rd_ts_t rkb_ts_first_failure;
+
+ /** Whether we've already reported this persistent failure to the app. */
+ rd_bool_t rkb_persistent_failure_reported;
+
+ /** Whether DNS should be re-resolved on next connect attempt.
+ * Set to true on authentication failures. */
+ rd_bool_t rkb_force_dns_reresolution;
+
/** True if a reauthentication is in progress. */
rd_bool_t rkb_reauth_in_progress;
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 1f8bbf106..cc85b6297 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -653,6 +653,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"The maximum time to wait before reconnecting to a broker "
"after the connection has been closed.",
0, 60 * 60 * 1000, 10 * 1000},
+ {_RK_GLOBAL | _RK_MED, "reconnect.failure.report.ms", _RK_C_INT,
+ _RK(reconnect_failure_report_ms),
+ "The time after which persistent broker reconnection failures should "
+ "be reported to the application via an error event "
+ "(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). "
+ "This helps detect scenarios where a broker is unreachable or "
+ "authentication is failing persistently (e.g., after broker restarts "
+ "with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min).",
+ 0, 24 * 60 * 60 * 1000, 60 * 1000},
{_RK_GLOBAL | _RK_HIGH, "statistics.interval.ms", _RK_C_INT,
_RK(stats_interval_ms),
"librdkafka statistics emit interval. The application also needs to "
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index 92e5193eb..9d69d83e2 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -252,6 +252,7 @@ struct rd_kafka_conf_s {
int reconnect_backoff_ms;
int reconnect_backoff_max_ms;
int reconnect_jitter_ms;
+ int reconnect_failure_report_ms;
int socket_connection_setup_timeout_ms;
int connections_max_idle_ms;
int sparse_connections;
diff --git a/tests/0154-persistent_broker_failure.c b/tests/0154-persistent_broker_failure.c
new file mode 100644
index 000000000..f59a11358
--- /dev/null
+++ b/tests/0154-persistent_broker_failure.c
@@ -0,0 +1,301 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2024, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "../src/rdkafka_proto.h"
+
+/**
+ * @name Test persistent broker failure detection and error surfacing
+ *
+ * This test verifies that:
+ * 1. When a broker is unreachable for longer than reconnect.failure.report.ms,
+ * an RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE error is surfaced.
+ * 2. The error is only reported once per failure streak.
+ * 3. After successful reconnection, the failure tracking is reset.
+ */
+
+struct test_state {
+ rd_bool_t persistent_failure_seen;
+ rd_bool_t all_brokers_down_seen;
+ int persistent_failure_count;
+ mtx_t lock;
+};
+
+static void
+error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+ struct test_state *state = (struct test_state *)opaque;
+
+ mtx_lock(&state->lock);
+ TEST_SAY("Error callback: %s: %s\n", rd_kafka_err2name(err), reason);
+
+ if (err == RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE) {
+ state->persistent_failure_seen = rd_true;
+ state->persistent_failure_count++;
+ TEST_SAY("Persistent broker failure detected (count: %d)\n",
+ state->persistent_failure_count);
+ } else if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
+ state->all_brokers_down_seen = rd_true;
+ TEST_SAY("All brokers down\n");
+ }
+ mtx_unlock(&state->lock);
+}
+
+/**
+ * @brief Test that persistent failure error is surfaced when broker
+ * is unreachable for longer than the configured threshold.
+ */
+static void do_test_persistent_failure_detection(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ struct test_state state = {.persistent_failure_seen = rd_false,
+ .all_brokers_down_seen = rd_false,
+ .persistent_failure_count = 0};
+ int64_t start_time, elapsed;
+ const int threshold_ms = 2000; /* 2 seconds for faster testing */
+
+ SUB_TEST("Test persistent failure detection");
+
+ mtx_init(&state.lock, mtx_plain);
+
+ conf = rd_kafka_conf_new();
+
+ /* Use non-existent broker addresses to trigger connection failures */
+ test_conf_set(conf, "bootstrap.servers", "127.0.0.1:19091,127.0.0.1:19092");
+
+ /* Set a short threshold for testing */
+ test_conf_set(conf, "reconnect.failure.report.ms", "2000");
+
+ /* Short reconnect backoff for faster testing */
+ test_conf_set(conf, "reconnect.backoff.ms", "100");
+ test_conf_set(conf, "reconnect.backoff.max.ms", "500");
+
+ /* Set the error callback */
+ rd_kafka_conf_set_error_cb(conf, error_cb);
+ rd_kafka_conf_set_opaque(conf, &state);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
+ TEST_ASSERT(rk, "Failed to create producer");
+
+ start_time = test_clock();
+
+ /* Poll until we see the persistent failure error or timeout */
+ while (!state.persistent_failure_seen) {
+ rd_kafka_poll(rk, 100);
+
+ elapsed = (test_clock() - start_time) / 1000;
+ if (elapsed > threshold_ms + 5000) {
+ /* Give extra 5 seconds for the error to propagate */
+ TEST_FAIL(
+ "Persistent failure error not received within "
+ "expected time (waited %dms, threshold was %dms)",
+ (int)elapsed, threshold_ms);
+ }
+ }
+
+ elapsed = (test_clock() - start_time) / 1000;
+ TEST_SAY(
+ "Persistent failure detected after %dms "
+ "(threshold: %dms)\n",
+ (int)elapsed, threshold_ms);
+
+ /* Verify the error was received around the threshold time */
+ TEST_ASSERT(elapsed >= threshold_ms - 500,
+ "Persistent failure reported too early: %dms", (int)elapsed);
+
+ /* Continue polling to verify we don't get MORE duplicate reports
+ * Note: Each broker reports independently, so with 2 brokers we may
+ * get 2 reports (one per broker). This is correct behavior.
+ * We verify that after seeing the first, we don't get repeated
+ * reports from the SAME broker (count should stay stable). */
+ int initial_count;
+ mtx_lock(&state.lock);
+ initial_count = state.persistent_failure_count;
+ mtx_unlock(&state.lock);
+
+ TEST_SAY("Initial persistent failure count: %d\n", initial_count);
+ TEST_ASSERT(initial_count >= 1,
+ "Expected at least 1 persistent failure report, got %d",
+ initial_count);
+
+ /* Wait and verify count doesn't keep increasing (no repeated reports
+ * from same broker) */
+ start_time = test_clock();
+ while ((test_clock() - start_time) / 1000 < 3000) {
+ rd_kafka_poll(rk, 100);
+ }
+
+ mtx_lock(&state.lock);
+ /* With 2 brokers, we expect at most 2 reports (one per broker) */
+ TEST_ASSERT(state.persistent_failure_count <= 2,
+ "Expected at most 2 persistent failure reports "
+ "(one per broker), got %d",
+ state.persistent_failure_count);
+ mtx_unlock(&state.lock);
+
+ rd_kafka_destroy(rk);
+ mtx_destroy(&state.lock);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test that persistent failure reporting is disabled when
+ * reconnect.failure.report.ms is set to 0.
+ */
+static void do_test_persistent_failure_disabled(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ struct test_state state = {.persistent_failure_seen = rd_false,
+ .all_brokers_down_seen = rd_false,
+ .persistent_failure_count = 0};
+ int64_t start_time;
+
+ SUB_TEST("Test persistent failure reporting disabled");
+
+ mtx_init(&state.lock, mtx_plain);
+
+ conf = rd_kafka_conf_new();
+
+ /* Use non-existent broker addresses */
+ test_conf_set(conf, "bootstrap.servers", "127.0.0.1:19091");
+
+ /* Disable persistent failure reporting */
+ test_conf_set(conf, "reconnect.failure.report.ms", "0");
+
+ /* Short reconnect backoff for faster testing */
+ test_conf_set(conf, "reconnect.backoff.ms", "100");
+ test_conf_set(conf, "reconnect.backoff.max.ms", "500");
+
+ rd_kafka_conf_set_error_cb(conf, error_cb);
+ rd_kafka_conf_set_opaque(conf, &state);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
+ TEST_ASSERT(rk, "Failed to create producer");
+
+ start_time = test_clock();
+
+ /* Poll for 5 seconds - should NOT see persistent failure error */
+ while ((test_clock() - start_time) / 1000 < 5000) {
+ rd_kafka_poll(rk, 100);
+ }
+
+ mtx_lock(&state.lock);
+ TEST_ASSERT(!state.persistent_failure_seen,
+ "Persistent failure should NOT be reported when disabled");
+ /* We should still see all_brokers_down though */
+ TEST_ASSERT(state.all_brokers_down_seen,
+ "ALL_BROKERS_DOWN should still be reported");
+ mtx_unlock(&state.lock);
+
+ rd_kafka_destroy(rk);
+ mtx_destroy(&state.lock);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test the new error code exists and has proper description.
+ */
+static void do_test_error_code(void) {
+ const char *errstr;
+
+ SUB_TEST("Test error code RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE");
+
+ errstr = rd_kafka_err2str(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE);
+ TEST_ASSERT(errstr != NULL, "Error string should not be NULL");
+ TEST_ASSERT(strlen(errstr) > 0, "Error string should not be empty");
+
+ TEST_SAY("Error code %d: %s\n",
+ RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE, errstr);
+
+ /* Verify it's a local error (negative) */
+ TEST_ASSERT(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE < 0,
+ "Should be a local error (negative value)");
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test that configuration option is properly set.
+ */
+static void do_test_config(void) {
+ rd_kafka_conf_t *conf;
+ char buf[64];
+ size_t buf_size = sizeof(buf);
+ rd_kafka_conf_res_t res;
+
+ SUB_TEST("Test reconnect.failure.report.ms configuration");
+
+ conf = rd_kafka_conf_new();
+
+ /* Test getting default value */
+ res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf,
+ &buf_size);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config");
+ TEST_SAY("Default reconnect.failure.report.ms: %s\n", buf);
+ TEST_ASSERT(strcmp(buf, "60000") == 0, "Default should be 60000");
+
+ /* Test setting custom value */
+ res = rd_kafka_conf_set(conf, "reconnect.failure.report.ms", "30000",
+ NULL, 0);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to set config");
+
+ buf_size = sizeof(buf);
+ res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf,
+ &buf_size);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config");
+ TEST_ASSERT(strcmp(buf, "30000") == 0, "Value should be 30000");
+
+ /* Test disabling (value 0) */
+ res = rd_kafka_conf_set(conf, "reconnect.failure.report.ms", "0", NULL,
+ 0);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to set config to 0");
+
+ buf_size = sizeof(buf);
+ res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf,
+ &buf_size);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config");
+ TEST_ASSERT(strcmp(buf, "0") == 0, "Value should be 0");
+
+ rd_kafka_conf_destroy(conf);
+
+ SUB_TEST_PASS();
+}
+
+int main_0154_persistent_broker_failure(int argc, char **argv) {
+ /* Quick tests that don't require a broker */
+ do_test_error_code();
+ do_test_config();
+
+ /* Integration tests that simulate broker failures */
+ do_test_persistent_failure_detection();
+ do_test_persistent_failure_disabled();
+
+ return 0;
+}
+
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 324281bd9..4c0334910 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -144,6 +144,7 @@ set(
0151-purge-brokers.c
0152-rebootstrap.c
0153-memberid.c
+ 0154-persistent_broker_failure.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
diff --git a/tests/test.c b/tests/test.c
index 42e525a9c..6ad3b49ae 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock);
_TEST_DECL(0151_purge_brokers_mock);
_TEST_DECL(0152_rebootstrap_local);
_TEST_DECL(0153_memberid);
+_TEST_DECL(0154_persistent_broker_failure);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -540,6 +541,7 @@ struct test tests[] = {
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
+ _TEST(0154_persistent_broker_failure, TEST_F_LOCAL),
/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),