From d304929878b2dfb1fa6daad0ea1de7a8fa498142 Mon Sep 17 00:00:00 2001 From: Shubham Sinha Date: Mon, 8 Dec 2025 09:22:56 +0530 Subject: [PATCH 1/2] Implement persistent connection failure tracking and DNS re-resolution on authentication errors This commit introduces a mechanism to track persistent broker reconnection failures and report them to the application if a specified threshold is reached. It also forces DNS re-resolution on authentication failures to handle scenarios where broker IPs may change after restarts. Additionally, new configuration options and error codes are added to support these features. --- src/rdkafka.c | 2 ++ src/rdkafka.h | 2 ++ src/rdkafka_broker.c | 73 ++++++++++++++++++++++++++++++++++++++++++-- src/rdkafka_broker.h | 11 +++++++ src/rdkafka_conf.c | 9 ++++++ src/rdkafka_conf.h | 1 + 6 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index f3d5b7f3fc..77e22d0b7f 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -491,6 +491,8 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = { _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD, "Local: an invalid record in the same batch caused " "the failure of this message too"), + _ERR_DESC(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE, + "Local: Broker reconnection has been failing persistently"), _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY_BROKER, "Local: Broker handle destroyed without termination"), diff --git a/src/rdkafka.h b/src/rdkafka.h index 3565b1c5a8..c2fd1ab5c5 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -414,6 +414,8 @@ typedef enum { RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD = -138, /** Broker is going away but client isn't terminating */ RD_KAFKA_RESP_ERR__DESTROY_BROKER = -137, + /** Broker reconnection has been failing persistently */ + RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE = -136, /** End internal error codes */ RD_KAFKA_RESP_ERR__END = -100, diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index eb8e849240..4bb03e24d1 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -652,6 +652,62 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, rkb->rkb_reauth_in_progress = rd_false; + /* Track persistent connection failures and report to application + * if threshold is reached. This helps detect scenarios where + * broker reconnection is persistently failing (e.g., after broker + * restart with AWS MSK IAM auth). */ + { + rd_ts_t now = rd_clock(); + int reconnect_failure_report_ms = + rkb->rkb_rk->rk_conf.reconnect_failure_report_ms; + + /* Start tracking if this is the first failure */ + if (rkb->rkb_ts_first_failure == 0) + rkb->rkb_ts_first_failure = now; + + /* On authentication failures, force DNS re-resolution on next + * connection attempt. This helps with scenarios where broker + * IPs change after restarts (e.g., AWS MSK with private links). + */ + if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION || + err == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) { + rkb->rkb_force_dns_reresolution = rd_true; + rd_rkb_dbg(rkb, BROKER, "AUTHFAIL", + "Authentication failure, will force DNS " + "re-resolution on next connect attempt"); + } + + /* Report persistent failure if threshold reached and not yet + * reported */ + if (reconnect_failure_report_ms > 0 && + !rkb->rkb_persistent_failure_reported && + rkb->rkb_ts_first_failure > 0 && + (now - rkb->rkb_ts_first_failure) >= + ((rd_ts_t)reconnect_failure_report_ms * 1000)) { + rkb->rkb_persistent_failure_reported = rd_true; + + rd_kafka_log(rkb->rkb_rk, LOG_WARNING, "BRKFAIL", + "%s: Broker reconnection has been failing " + "persistently for %dms (threshold: %dms). " + "Last error: %s", + rkb->rkb_name, + (int)((now - rkb->rkb_ts_first_failure) / + 1000), + reconnect_failure_report_ms, + rd_kafka_err2str(err)); + + /* Send persistent failure error to application */ + rd_kafka_q_op_err( + rkb->rkb_rk->rk_rep, + RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE, + "%s: Broker reconnection has been failing " + "persistently for %dms: %s", + rkb->rkb_name, + (int)((now - rkb->rkb_ts_first_failure) / 1000), + rd_kafka_err2str(err)); + } + } + va_start(ap, fmt); rd_kafka_broker_set_error(rkb, level, err, fmt, ap); va_end(ap); @@ -2294,9 +2350,17 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) { rd_kafka_broker_lock(rkb); rd_strlcpy(nodename, rkb->rkb_nodename, sizeof(nodename)); - /* If the nodename was changed since the last connect, + /* If the nodename was changed since the last connect, or if + * DNS re-resolution was forced (e.g., after auth failure), * reset the address cache. */ - reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch); + reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch) || + rkb->rkb_force_dns_reresolution; + if (rkb->rkb_force_dns_reresolution) { + rd_rkb_dbg(rkb, BROKER, "CONNECT", + "Forcing DNS re-resolution due to previous " + "authentication or connection failure"); + rkb->rkb_force_dns_reresolution = rd_false; + } rkb->rkb_connect_epoch = rkb->rkb_nodename_epoch; /* Logical brokers might not have a hostname set, in which case * we should not try to connect. */ @@ -2345,6 +2409,11 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) { rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight; rkb->rkb_reauth_in_progress = rd_false; + /* Reset persistent failure tracking on successful connection */ + rkb->rkb_ts_first_failure = 0; + rkb->rkb_persistent_failure_reported = rd_false; + rkb->rkb_force_dns_reresolution = rd_false; + rd_kafka_broker_lock(rkb); rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP); rd_kafka_broker_unlock(rkb); diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index a649b7445e..d4ebda1c91 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -305,6 +305,17 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ /** Absolute time of last connection attempt. */ rd_ts_t rkb_ts_connect; + /** Absolute time when first failure in current failure streak started. + * Reset to 0 when a successful connection is made. */ + rd_ts_t rkb_ts_first_failure; + + /** Whether we've already reported this persistent failure to the app. */ + rd_bool_t rkb_persistent_failure_reported; + + /** Whether DNS should be re-resolved on next connect attempt. + * Set to true on authentication failures. */ + rd_bool_t rkb_force_dns_reresolution; + /** True if a reauthentication is in progress. */ rd_bool_t rkb_reauth_in_progress; diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 1f8bbf106b..cc85b6297f 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -653,6 +653,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "The maximum time to wait before reconnecting to a broker " "after the connection has been closed.", 0, 60 * 60 * 1000, 10 * 1000}, + {_RK_GLOBAL | _RK_MED, "reconnect.failure.report.ms", _RK_C_INT, + _RK(reconnect_failure_report_ms), + "The time after which persistent broker reconnection failures should " + "be reported to the application via an error event " + "(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). " + "This helps detect scenarios where a broker is unreachable or " + "authentication is failing persistently (e.g., after broker restarts " + "with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min).", + 0, 24 * 60 * 60 * 1000, 60 * 1000}, {_RK_GLOBAL | _RK_HIGH, "statistics.interval.ms", _RK_C_INT, _RK(stats_interval_ms), "librdkafka statistics emit interval. The application also needs to " diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 92e5193eb7..9d69d83e26 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -252,6 +252,7 @@ struct rd_kafka_conf_s { int reconnect_backoff_ms; int reconnect_backoff_max_ms; int reconnect_jitter_ms; + int reconnect_failure_report_ms; int socket_connection_setup_timeout_ms; int connections_max_idle_ms; int sparse_connections; From c1dd65f4dffd60e9f2cad562c839f30f6ee27028 Mon Sep 17 00:00:00 2001 From: Shubham Sinha Date: Mon, 8 Dec 2025 09:31:38 +0530 Subject: [PATCH 2/2] Add reconnect.failure.report.ms configuration and unit tests for persistent failure tracking This commit introduces the `reconnect.failure.report.ms` configuration option, which allows applications to receive notifications of persistent broker reconnection failures after a specified duration. It includes unit tests to validate the functionality of persistent failure tracking, ensuring correct behavior for failure detection, reporting, and configuration validation. Additionally, a new integration test is added to verify the error handling when brokers are unreachable for extended periods. --- CONFIGURATION.md | 1 + src/rdkafka_broker.c | 107 +++++++++ tests/0154-persistent_broker_failure.c | 301 +++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 2 + 5 files changed, 412 insertions(+) create mode 100644 tests/0154-persistent_broker_failure.c diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 40b7412efd..51c75a52d6 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -36,6 +36,7 @@ connections.max.idle.ms | * | 0 .. 2147483647 | 0 reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`.
*Type: integer* reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
*Type: integer* reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed.
*Type: integer* +reconnect.failure.report.ms | * | 0 .. 86400000 | 60000 | medium | The time after which persistent broker reconnection failures should be reported to the application via an error event (RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE). This helps detect scenarios where a broker is unreachable or authentication is failing persistently (e.g., after broker restarts with AWS MSK IAM auth). Set to 0 to disable. Default is 60000ms (1 min).
*Type: integer* statistics.interval.ms | * | 0 .. 86400000 | 0 | high | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. A value of 0 disables statistics.
*Type: integer* enabled_events | * | 0 .. 2147483647 | 0 | low | See `rd_kafka_conf_set_events()`
*Type: integer* error_cb | * | | | low | Error callback (set with rd_kafka_conf_set_error_cb())
*Type: see dedicated API* diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 4bb03e24d1..86ff313d77 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -6389,6 +6389,111 @@ static int rd_ut_ApiVersion_at_least(void) { RD_UT_PASS(); } +/** + * @brief Unittest for persistent failure tracking + * + * Tests that: + * 1. First failure time is tracked correctly + * 2. Persistent failure is reported after threshold + * 3. Failure tracking is reset on successful connection + * 4. DNS re-resolution is triggered on auth failures + */ +static int rd_ut_persistent_failure_tracking(void) { + rd_kafka_broker_t rkb = RD_ZERO_INIT; + rd_kafka_conf_t conf = {.reconnect_failure_report_ms = 100}; + rd_kafka_t rk = RD_ZERO_INIT; + rd_ts_t now = 1000000; /* 1 second in microseconds */ + + RD_UT_SAY("Testing persistent failure tracking"); + + rkb.rkb_rk = &rk; + rk.rk_conf = conf; + + /* Test 1: First failure should set rkb_ts_first_failure */ + RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0, + "Initial rkb_ts_first_failure should be 0"); + + /* Simulate setting first failure (normally done in rd_kafka_broker_fail) + */ + rkb.rkb_ts_first_failure = now; + RD_UT_ASSERT(rkb.rkb_ts_first_failure == now, + "rkb_ts_first_failure should be set to now"); + + /* Test 2: After threshold, persistent failure should be reported */ + rd_ts_t after_threshold = now + (conf.reconnect_failure_report_ms * 1000); + RD_UT_ASSERT( + (after_threshold - rkb.rkb_ts_first_failure) >= + ((rd_ts_t)conf.reconnect_failure_report_ms * 1000), + "Time difference should exceed threshold"); + + /* Test 3: Simulating successful connection reset */ + rkb.rkb_persistent_failure_reported = rd_true; + rkb.rkb_force_dns_reresolution = rd_true; + + /* Reset (normally done in rd_kafka_broker_connect_up) */ + rkb.rkb_ts_first_failure = 0; + rkb.rkb_persistent_failure_reported = rd_false; + rkb.rkb_force_dns_reresolution = rd_false; + + RD_UT_ASSERT(rkb.rkb_ts_first_failure == 0, + "rkb_ts_first_failure should be reset to 0"); + RD_UT_ASSERT(rkb.rkb_persistent_failure_reported == rd_false, + "rkb_persistent_failure_reported should be reset"); + RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_false, + "rkb_force_dns_reresolution should be reset"); + + /* Test 4: Auth failure should set DNS re-resolution flag */ + /* Simulate auth failure setting the flag */ + rkb.rkb_force_dns_reresolution = rd_true; + RD_UT_ASSERT(rkb.rkb_force_dns_reresolution == rd_true, + "Auth failure should set DNS re-resolution flag"); + + /* Test 5: Disabled persistent failure reporting (threshold = 0) */ + rk.rk_conf.reconnect_failure_report_ms = 0; + rkb.rkb_ts_first_failure = now; + rkb.rkb_persistent_failure_reported = rd_false; + + /* With threshold 0, reporting should not happen even after long time */ + rd_ts_t way_later = now + (1000000 * 1000); /* 1000 seconds later */ + rd_bool_t should_report = + (rk.rk_conf.reconnect_failure_report_ms > 0 && + !rkb.rkb_persistent_failure_reported && + rkb.rkb_ts_first_failure > 0 && + (way_later - rkb.rkb_ts_first_failure) >= + ((rd_ts_t)rk.rk_conf.reconnect_failure_report_ms * 1000)); + + RD_UT_ASSERT(should_report == rd_false, + "With threshold=0, should never report persistent failure"); + + RD_UT_PASS(); +} + +/** + * @brief Unittest for reconnect.failure.report.ms configuration validation + */ +static int rd_ut_reconnect_failure_config(void) { + rd_kafka_conf_t conf = RD_ZERO_INIT; + + RD_UT_SAY("Testing reconnect.failure.report.ms configuration"); + + /* Test default value */ + conf.reconnect_failure_report_ms = 60000; /* default */ + RD_UT_ASSERT(conf.reconnect_failure_report_ms == 60000, + "Default should be 60000ms"); + + /* Test disabled (0) */ + conf.reconnect_failure_report_ms = 0; + RD_UT_ASSERT(conf.reconnect_failure_report_ms == 0, + "Value 0 should disable persistent failure reporting"); + + /* Test custom value */ + conf.reconnect_failure_report_ms = 30000; + RD_UT_ASSERT(conf.reconnect_failure_report_ms == 30000, + "Custom value should be accepted"); + + RD_UT_PASS(); +} + /** * @name Unit tests * @{ @@ -6399,6 +6504,8 @@ int unittest_broker(void) { fails += rd_ut_reconnect_backoff(); fails += rd_ut_ApiVersion_at_least(); + fails += rd_ut_persistent_failure_tracking(); + fails += rd_ut_reconnect_failure_config(); return fails; } diff --git a/tests/0154-persistent_broker_failure.c b/tests/0154-persistent_broker_failure.c new file mode 100644 index 0000000000..f59a113589 --- /dev/null +++ b/tests/0154-persistent_broker_failure.c @@ -0,0 +1,301 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2024, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "../src/rdkafka_proto.h" + +/** + * @name Test persistent broker failure detection and error surfacing + * + * This test verifies that: + * 1. When a broker is unreachable for longer than reconnect.failure.report.ms, + * an RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE error is surfaced. + * 2. The error is only reported once per failure streak. + * 3. After successful reconnection, the failure tracking is reset. + */ + +struct test_state { + rd_bool_t persistent_failure_seen; + rd_bool_t all_brokers_down_seen; + int persistent_failure_count; + mtx_t lock; +}; + +static void +error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { + struct test_state *state = (struct test_state *)opaque; + + mtx_lock(&state->lock); + TEST_SAY("Error callback: %s: %s\n", rd_kafka_err2name(err), reason); + + if (err == RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE) { + state->persistent_failure_seen = rd_true; + state->persistent_failure_count++; + TEST_SAY("Persistent broker failure detected (count: %d)\n", + state->persistent_failure_count); + } else if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) { + state->all_brokers_down_seen = rd_true; + TEST_SAY("All brokers down\n"); + } + mtx_unlock(&state->lock); +} + +/** + * @brief Test that persistent failure error is surfaced when broker + * is unreachable for longer than the configured threshold. + */ +static void do_test_persistent_failure_detection(void) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + struct test_state state = {.persistent_failure_seen = rd_false, + .all_brokers_down_seen = rd_false, + .persistent_failure_count = 0}; + int64_t start_time, elapsed; + const int threshold_ms = 2000; /* 2 seconds for faster testing */ + + SUB_TEST("Test persistent failure detection"); + + mtx_init(&state.lock, mtx_plain); + + conf = rd_kafka_conf_new(); + + /* Use non-existent broker addresses to trigger connection failures */ + test_conf_set(conf, "bootstrap.servers", "127.0.0.1:19091,127.0.0.1:19092"); + + /* Set a short threshold for testing */ + test_conf_set(conf, "reconnect.failure.report.ms", "2000"); + + /* Short reconnect backoff for faster testing */ + test_conf_set(conf, "reconnect.backoff.ms", "100"); + test_conf_set(conf, "reconnect.backoff.max.ms", "500"); + + /* Set the error callback */ + rd_kafka_conf_set_error_cb(conf, error_cb); + rd_kafka_conf_set_opaque(conf, &state); + + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); + TEST_ASSERT(rk, "Failed to create producer"); + + start_time = test_clock(); + + /* Poll until we see the persistent failure error or timeout */ + while (!state.persistent_failure_seen) { + rd_kafka_poll(rk, 100); + + elapsed = (test_clock() - start_time) / 1000; + if (elapsed > threshold_ms + 5000) { + /* Give extra 5 seconds for the error to propagate */ + TEST_FAIL( + "Persistent failure error not received within " + "expected time (waited %dms, threshold was %dms)", + (int)elapsed, threshold_ms); + } + } + + elapsed = (test_clock() - start_time) / 1000; + TEST_SAY( + "Persistent failure detected after %dms " + "(threshold: %dms)\n", + (int)elapsed, threshold_ms); + + /* Verify the error was received around the threshold time */ + TEST_ASSERT(elapsed >= threshold_ms - 500, + "Persistent failure reported too early: %dms", (int)elapsed); + + /* Continue polling to verify we don't get MORE duplicate reports + * Note: Each broker reports independently, so with 2 brokers we may + * get 2 reports (one per broker). This is correct behavior. + * We verify that after seeing the first, we don't get repeated + * reports from the SAME broker (count should stay stable). */ + int initial_count; + mtx_lock(&state.lock); + initial_count = state.persistent_failure_count; + mtx_unlock(&state.lock); + + TEST_SAY("Initial persistent failure count: %d\n", initial_count); + TEST_ASSERT(initial_count >= 1, + "Expected at least 1 persistent failure report, got %d", + initial_count); + + /* Wait and verify count doesn't keep increasing (no repeated reports + * from same broker) */ + start_time = test_clock(); + while ((test_clock() - start_time) / 1000 < 3000) { + rd_kafka_poll(rk, 100); + } + + mtx_lock(&state.lock); + /* With 2 brokers, we expect at most 2 reports (one per broker) */ + TEST_ASSERT(state.persistent_failure_count <= 2, + "Expected at most 2 persistent failure reports " + "(one per broker), got %d", + state.persistent_failure_count); + mtx_unlock(&state.lock); + + rd_kafka_destroy(rk); + mtx_destroy(&state.lock); + + SUB_TEST_PASS(); +} + +/** + * @brief Test that persistent failure reporting is disabled when + * reconnect.failure.report.ms is set to 0. + */ +static void do_test_persistent_failure_disabled(void) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + struct test_state state = {.persistent_failure_seen = rd_false, + .all_brokers_down_seen = rd_false, + .persistent_failure_count = 0}; + int64_t start_time; + + SUB_TEST("Test persistent failure reporting disabled"); + + mtx_init(&state.lock, mtx_plain); + + conf = rd_kafka_conf_new(); + + /* Use non-existent broker addresses */ + test_conf_set(conf, "bootstrap.servers", "127.0.0.1:19091"); + + /* Disable persistent failure reporting */ + test_conf_set(conf, "reconnect.failure.report.ms", "0"); + + /* Short reconnect backoff for faster testing */ + test_conf_set(conf, "reconnect.backoff.ms", "100"); + test_conf_set(conf, "reconnect.backoff.max.ms", "500"); + + rd_kafka_conf_set_error_cb(conf, error_cb); + rd_kafka_conf_set_opaque(conf, &state); + + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); + TEST_ASSERT(rk, "Failed to create producer"); + + start_time = test_clock(); + + /* Poll for 5 seconds - should NOT see persistent failure error */ + while ((test_clock() - start_time) / 1000 < 5000) { + rd_kafka_poll(rk, 100); + } + + mtx_lock(&state.lock); + TEST_ASSERT(!state.persistent_failure_seen, + "Persistent failure should NOT be reported when disabled"); + /* We should still see all_brokers_down though */ + TEST_ASSERT(state.all_brokers_down_seen, + "ALL_BROKERS_DOWN should still be reported"); + mtx_unlock(&state.lock); + + rd_kafka_destroy(rk); + mtx_destroy(&state.lock); + + SUB_TEST_PASS(); +} + +/** + * @brief Test the new error code exists and has proper description. + */ +static void do_test_error_code(void) { + const char *errstr; + + SUB_TEST("Test error code RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE"); + + errstr = rd_kafka_err2str(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE); + TEST_ASSERT(errstr != NULL, "Error string should not be NULL"); + TEST_ASSERT(strlen(errstr) > 0, "Error string should not be empty"); + + TEST_SAY("Error code %d: %s\n", + RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE, errstr); + + /* Verify it's a local error (negative) */ + TEST_ASSERT(RD_KAFKA_RESP_ERR__BROKER_PERSISTENT_FAILURE < 0, + "Should be a local error (negative value)"); + + SUB_TEST_PASS(); +} + +/** + * @brief Test that configuration option is properly set. + */ +static void do_test_config(void) { + rd_kafka_conf_t *conf; + char buf[64]; + size_t buf_size = sizeof(buf); + rd_kafka_conf_res_t res; + + SUB_TEST("Test reconnect.failure.report.ms configuration"); + + conf = rd_kafka_conf_new(); + + /* Test getting default value */ + res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf, + &buf_size); + TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config"); + TEST_SAY("Default reconnect.failure.report.ms: %s\n", buf); + TEST_ASSERT(strcmp(buf, "60000") == 0, "Default should be 60000"); + + /* Test setting custom value */ + res = rd_kafka_conf_set(conf, "reconnect.failure.report.ms", "30000", + NULL, 0); + TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to set config"); + + buf_size = sizeof(buf); + res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf, + &buf_size); + TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config"); + TEST_ASSERT(strcmp(buf, "30000") == 0, "Value should be 30000"); + + /* Test disabling (value 0) */ + res = rd_kafka_conf_set(conf, "reconnect.failure.report.ms", "0", NULL, + 0); + TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to set config to 0"); + + buf_size = sizeof(buf); + res = rd_kafka_conf_get(conf, "reconnect.failure.report.ms", buf, + &buf_size); + TEST_ASSERT(res == RD_KAFKA_CONF_OK, "Failed to get config"); + TEST_ASSERT(strcmp(buf, "0") == 0, "Value should be 0"); + + rd_kafka_conf_destroy(conf); + + SUB_TEST_PASS(); +} + +int main_0154_persistent_broker_failure(int argc, char **argv) { + /* Quick tests that don't require a broker */ + do_test_error_code(); + do_test_config(); + + /* Integration tests that simulate broker failures */ + do_test_persistent_failure_detection(); + do_test_persistent_failure_disabled(); + + return 0; +} + diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 324281bd99..4c03349106 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -144,6 +144,7 @@ set( 0151-purge-brokers.c 0152-rebootstrap.c 0153-memberid.c + 0154-persistent_broker_failure.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 42e525a9cc..6ad3b49ae4 100644 --- a/tests/test.c +++ b/tests/test.c @@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock); _TEST_DECL(0151_purge_brokers_mock); _TEST_DECL(0152_rebootstrap_local); _TEST_DECL(0153_memberid); +_TEST_DECL(0154_persistent_broker_failure); /* Manual tests */ _TEST_DECL(8000_idle); @@ -540,6 +541,7 @@ struct test tests[] = { _TEST(0151_purge_brokers_mock, TEST_F_LOCAL), _TEST(0152_rebootstrap_local, TEST_F_LOCAL), _TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)), + _TEST(0154_persistent_broker_failure, TEST_F_LOCAL), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL),