diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e2f77e0b..732f17b60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,33 @@ +# librdkafka v2.13.0 + +librdkafka v2.13.0 is a feature release: + +* Fix memory management for interceptors in rd_kafka_conf to prevent +double-free errors (#5240). +* Fix for the pseudo-random generator seed on Windows involving as well + the uniqueness of the new consumer group protocol member id (#5265). + + +## Fixes + +### General fixes + +* Issues: #4142. + Fix memory management for interceptors in rd_kafka_conf to prevent double-free errors. + In case the client instance fails the users needs to destroy the configuration + data structure, it was causing a double-free because the interceptors were + already freed in the constructor. + Happening since 1.x (#5240). +* Issues: #5263, #3929. + Fix for the pseudo-random seed on Windows. The function `rand_r` isn't present + on Windows and the global seed wasn't based on the current microseconds and thread + id. Also it wasn't called on every thread as required on this platform but + only once per process. The fix allows on this platform the uniqueness of client side + member id generation in next-generation consumer group protocol. + Happening since 1.x (#5265). + + + # librdkafka v2.12.1 librdkafka v2.12.1 is a maintenance release: diff --git a/src/rdkafka.c b/src/rdkafka.c index f3d5b7f3f..b155ff898 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -57,6 +57,7 @@ #include "rdkafka_interceptor.h" #include "rdkafka_idempotence.h" #include "rdkafka_sasl_oauthbearer.h" +#include "rdmurmur2.h" #if WITH_OAUTHBEARER_OIDC #include "rdkafka_sasl_oauthbearer_oidc.h" #endif @@ -82,8 +83,14 @@ #endif -static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT; -static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT; +static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT; +#ifdef _WIN32 +/* On Windows srand needs to be called on each thread. */ +static RD_TLS once_flag rd_kafka_srand_once = ONCE_FLAG_INIT; +#else +static once_flag rd_kafka_srand_once = ONCE_FLAG_INIT; +#endif + /** * @brief Global counter+lock for all active librdkafka instances @@ -130,6 +137,22 @@ void rd_kafka_set_thread_name(const char *fmt, ...) { */ static char RD_TLS rd_kafka_thread_sysname[16] = "app"; +/** + * @brief Seed the PRNG with current microseconds and thread ID. + */ +static void rd_kafka_srand(void) { + unsigned int seed = 0; + struct timeval tv; + rd_gettimeofday(&tv, NULL); + seed = (unsigned int)(tv.tv_usec); + seed ^= thrd_current_id(); + + /* Apply the murmur2 hash to distribute entropy to + * the whole seed. */ + seed = (unsigned int)rd_murmur2(&seed, sizeof(seed)); + srand(seed); +} + void rd_kafka_set_thread_sysname(const char *fmt, ...) { va_list ap; @@ -141,6 +164,30 @@ void rd_kafka_set_thread_sysname(const char *fmt, ...) { thrd_setname(rd_kafka_thread_sysname); } +/** + * @brief Seed the PRNG for the current thread or for the whole process. + * Depending on the platform implementation of srand() the seed can + * be a thread local or global one. In case it's thread local we + * need to call it on each thread. + * + * @param rk Client instance. + * @param internal_thread If true, seed the PRNG if + * it's required per-thread. + */ +void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread) { +#ifdef _WIN32 + rd_bool_t required_per_thread = rd_true; +#else + rd_bool_t required_per_thread = rd_false; +#endif + if ((required_per_thread && + (rk->rk_conf.enable_random_seed || internal_thread)) || + (!required_per_thread && rk->rk_conf.enable_random_seed && + !internal_thread)) { + call_once(&rd_kafka_srand_once, rd_kafka_srand); + } +} + static void rd_kafka_global_init0(void) { cJSON_Hooks json_hooks = {.malloc_fn = rd_malloc, .free_fn = rd_free}; @@ -171,18 +218,6 @@ void rd_kafka_global_init(void) { } -/** - * @brief Seed the PRNG with current_time.milliseconds - */ -static void rd_kafka_global_srand(void) { - struct timeval tv; - - rd_gettimeofday(&tv, NULL); - - srand((unsigned int)(tv.tv_usec / 1000)); -} - - /** * @returns the current number of active librdkafka instances */ @@ -2218,6 +2253,7 @@ static int rd_kafka_thread_main(void *arg) { rd_kafka_set_thread_name("main"); rd_kafka_set_thread_sysname("rdk:main"); + rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */); rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN); @@ -2367,10 +2403,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, * freed from rd_kafka_destroy_internal() * as the rk itself is destroyed. */ - /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap. - */ - if (rk->rk_conf.enable_random_seed) - call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand); + rd_kafka_thread_srand(rk, rd_false /* we're on an app thread */); /* Call on_new() interceptors */ rd_kafka_interceptors_on_new(rk, &rk->rk_conf); diff --git a/src/rdkafka_background.c b/src/rdkafka_background.c index a9c96606c..47b2d30ba 100644 --- a/src/rdkafka_background.c +++ b/src/rdkafka_background.c @@ -111,6 +111,7 @@ int rd_kafka_background_thread_main(void *arg) { rd_kafka_set_thread_name("background"); rd_kafka_set_thread_sysname("rdk:bg"); + rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */); rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND); diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index eb8e84924..7fc957b15 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -4512,6 +4512,7 @@ static int rd_kafka_broker_thread_main(void *arg) { rd_kafka_set_thread_name("%s", rkb->rkb_name); rd_kafka_set_thread_sysname("rdk:broker%" PRId32, rkb->rkb_nodeid); + rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */); rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BROKER); diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index d8370ff59..d9986b320 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -1122,6 +1122,7 @@ extern char RD_TLS rd_kafka_thread_name[64]; void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2); void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2); +void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread); int rd_kafka_path_is_dir(const char *path); rd_bool_t rd_kafka_dir_is_empty(const char *path); diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index c8ca39e83..cdc0445f2 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -1490,6 +1490,8 @@ static int rd_kafka_mock_cluster_thread_main(void *arg) { rd_kafka_set_thread_name("mock"); rd_kafka_set_thread_sysname("rdk:mock"); + rd_kafka_thread_srand(mcluster->rk, + rd_true /* we're in an internal thread */); rd_kafka_interceptors_on_thread_start(mcluster->rk, RD_KAFKA_THREAD_BACKGROUND); rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index 6747d346e..acf5f9820 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -1993,14 +1993,7 @@ rd_kafka_transport_ssl_lock_cb(int mode, int i, const char *file, int line) { #endif static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb(void) { -#ifdef _WIN32 - /* Windows makes a distinction between thread handle - * and thread id, which means we can't use the - * thrd_current() API that returns the handle. */ - return (unsigned long)GetCurrentThreadId(); -#else - return (unsigned long)(intptr_t)thrd_current(); -#endif + return thrd_current_id(); } #ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK diff --git a/src/rdrand.c b/src/rdrand.c index 104b289d6..a6a3c9002 100644 --- a/src/rdrand.c +++ b/src/rdrand.c @@ -42,7 +42,7 @@ int rd_jitter(int low, int high) { struct timeval tv; rd_gettimeofday(&tv, NULL); seed = (unsigned int)(tv.tv_usec); - seed ^= (unsigned int)(intptr_t)thrd_current(); + seed ^= thrd_current_id(); /* When many threads are created at the same time and the * thread id is different only by a few bits it's possible that diff --git a/src/tinycthread_extra.c b/src/tinycthread_extra.c index 6f6d0a595..1c62cdce2 100644 --- a/src/tinycthread_extra.c +++ b/src/tinycthread_extra.c @@ -59,6 +59,17 @@ int thrd_is_current(thrd_t thr) { #endif } +unsigned long thrd_current_id(void) { +#ifdef _WIN32 + /* Windows makes a distinction between thread handle + * and thread id, which means we can't use the + * thrd_current() API that returns the handle. */ + return (unsigned long)GetCurrentThreadId(); +#else + return (unsigned long)(intptr_t)thrd_current(); +#endif +} + #ifdef _WIN32 void cnd_wait_enter(cnd_t *cond) { diff --git a/src/tinycthread_extra.h b/src/tinycthread_extra.h index cb6b611ea..340f65ee8 100644 --- a/src/tinycthread_extra.h +++ b/src/tinycthread_extra.h @@ -55,6 +55,12 @@ int thrd_setname(const char *name); */ int thrd_is_current(thrd_t thr); +/** + * @brief Get current thread ID as an unsigned long. + * @return Current thread ID. + */ +unsigned long thrd_current_id(void); + #ifdef _WIN32 /**