From 542d58ac76051d7d8ab6b04a859bd09797bea5d9 Mon Sep 17 00:00:00 2001 From: VitorTrin Date: Tue, 21 Nov 2023 15:37:08 -0300 Subject: [PATCH 1/5] Create function in topic manager --- c_src/erlkaf_producer.cc | 31 +++++++++++++++++++++++++++++++ c_src/topicmanager.cc | 24 ++++++++++++++++++++++++ c_src/topicmanager.h | 1 + 3 files changed, 56 insertions(+) diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index 749585c..ca4befb 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -146,6 +146,37 @@ ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TER return ATOMS.atomOk; } +// ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +// { +// UNUSED(argc); + +// std::string topic_name; +// enif_producer* producer; + +// erlkaf_data* data = static_cast(enif_priv_data(env)); + +// if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast(&producer))) +// return make_badarg(env); + +// if(!get_string(env, argv[1], &topic_name)) +// return make_badarg(env); + +// scoped_ptr(config, rd_kafka_topic_conf_t, rd_kafka_topic_conf_new(), rd_kafka_topic_conf_destroy); + +// ERL_NIF_TERM parse_result = parse_topic_config(env, argv[2], config.get()); + +// if(parse_result != ATOMS.atomOk) +// return parse_result; + +// bool already_exist; + +// if(!producer->topics->AddTopic(topic_name, config.get(), &already_exist)) +// return make_error(env, already_exist ? "topic already exist" : "failed to create topic"); + +// config.release(); +// return ATOMS.atomOk; +// } + ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { UNUSED(argc); diff --git a/c_src/topicmanager.cc b/c_src/topicmanager.cc index 1e91998..0ff7b03 100644 --- a/c_src/topicmanager.cc +++ b/c_src/topicmanager.cc @@ -1,4 +1,6 @@ #include "topicmanager.h" +#include "macros.h" +#include #include "rdkafka.h" TopicManager::TopicManager(rd_kafka_t *rk) : rk_(rk) { } @@ -30,6 +32,28 @@ rd_kafka_topic_t* TopicManager::AddTopic(const std::string& name, rd_kafka_topic return topic; } +void* TopicManager::DeleteTopic(const std::string& name, bool* not_found) +{ + CritScope ss(&crt_); + + auto it = topics_.find(name); + + if(it == topics_.end()) + { + *not_found = true; + return NULL; + } + + scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); + + rd_kafka_DeleteTopic_t* del_topics = rd_kafka_DeleteTopic_new(name.c_str()); + + *not_found = false; + rd_kafka_DeleteTopics(rk_, &del_topics, 1, NULL, NULL); + + return NULL; +} + void TopicManager::Cleanup() { CritScope ss(&crt_); diff --git a/c_src/topicmanager.h b/c_src/topicmanager.h index 767cc6b..f81536d 100644 --- a/c_src/topicmanager.h +++ b/c_src/topicmanager.h @@ -19,6 +19,7 @@ class TopicManager ~TopicManager(); rd_kafka_topic_t* AddTopic(const std::string& name, rd_kafka_topic_conf_t* conf, bool* already_exist); + void* DeleteTopic(const std::string& name, bool* not_found); rd_kafka_topic_t* GetOrCreateTopic(const std::string& name); private: From e41d61a9936de967034ce267dc7ac922e5eca12c Mon Sep 17 00:00:00 2001 From: VitorTrin Date: Tue, 21 Nov 2023 17:08:39 -0300 Subject: [PATCH 2/5] Receive the struct --- c_src/erlkaf_producer.cc | 12 +++++++----- c_src/topicmanager.cc | 8 +++----- c_src/topicmanager.h | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index ca4befb..e951390 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -161,19 +161,21 @@ ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TER // if(!get_string(env, argv[1], &topic_name)) // return make_badarg(env); -// scoped_ptr(config, rd_kafka_topic_conf_t, rd_kafka_topic_conf_new(), rd_kafka_topic_conf_destroy); +// scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); // ERL_NIF_TERM parse_result = parse_topic_config(env, argv[2], config.get()); // if(parse_result != ATOMS.atomOk) // return parse_result; -// bool already_exist; +// bool not_found; -// if(!producer->topics->AddTopic(topic_name, config.get(), &already_exist)) -// return make_error(env, already_exist ? "topic already exist" : "failed to create topic"); +// producer->topics->AddTopic(topic_name, del_topics.get(), ¬_found); -// config.release(); +// if(not_found) +// return make_error(env,"topic not found"); + +// del_topics.release(); // return ATOMS.atomOk; // } diff --git a/c_src/topicmanager.cc b/c_src/topicmanager.cc index 0ff7b03..5dd9d2b 100644 --- a/c_src/topicmanager.cc +++ b/c_src/topicmanager.cc @@ -1,6 +1,4 @@ #include "topicmanager.h" -#include "macros.h" -#include #include "rdkafka.h" TopicManager::TopicManager(rd_kafka_t *rk) : rk_(rk) { } @@ -32,7 +30,7 @@ rd_kafka_topic_t* TopicManager::AddTopic(const std::string& name, rd_kafka_topic return topic; } -void* TopicManager::DeleteTopic(const std::string& name, bool* not_found) +void* TopicManager::DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found) { CritScope ss(&crt_); @@ -44,9 +42,9 @@ void* TopicManager::DeleteTopic(const std::string& name, bool* not_found) return NULL; } - scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); + // scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); - rd_kafka_DeleteTopic_t* del_topics = rd_kafka_DeleteTopic_new(name.c_str()); + // rd_kafka_DeleteTopic_t* del_topics = rd_kafka_DeleteTopic_new(name.c_str()); *not_found = false; rd_kafka_DeleteTopics(rk_, &del_topics, 1, NULL, NULL); diff --git a/c_src/topicmanager.h b/c_src/topicmanager.h index f81536d..a00fe84 100644 --- a/c_src/topicmanager.h +++ b/c_src/topicmanager.h @@ -3,6 +3,7 @@ #include "macros.h" #include "critical_section.h" +#include "rdkafka.h" #include #include @@ -19,7 +20,7 @@ class TopicManager ~TopicManager(); rd_kafka_topic_t* AddTopic(const std::string& name, rd_kafka_topic_conf_t* conf, bool* already_exist); - void* DeleteTopic(const std::string& name, bool* not_found); + void* DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found); rd_kafka_topic_t* GetOrCreateTopic(const std::string& name); private: From f7c3e0655e4ad45fb7a3a819e5ac37fc65a55a0f Mon Sep 17 00:00:00 2001 From: VitorTrin Date: Wed, 22 Nov 2023 12:30:33 -0300 Subject: [PATCH 3/5] Create enif_producer_topic_delete function --- c_src/erlkaf_producer.cc | 43 ++++++++++++++++++++-------------------- c_src/erlkaf_producer.h | 1 + c_src/topicmanager.cc | 10 ++++++---- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index e951390..cf7c7cc 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -146,38 +146,37 @@ ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TER return ATOMS.atomOk; } -// ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -// { -// UNUSED(argc); - -// std::string topic_name; -// enif_producer* producer; +ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + UNUSED(argc); -// erlkaf_data* data = static_cast(enif_priv_data(env)); + std::string topic_name; + enif_producer* producer; -// if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast(&producer))) -// return make_badarg(env); + erlkaf_data* data = static_cast(enif_priv_data(env)); -// if(!get_string(env, argv[1], &topic_name)) -// return make_badarg(env); + if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast(&producer))) + return make_badarg(env); -// scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); + if(!get_string(env, argv[1], &topic_name)) + return make_badarg(env); -// ERL_NIF_TERM parse_result = parse_topic_config(env, argv[2], config.get()); + rd_kafka_DeleteTopic_t **del_topics; + del_topics = (rd_kafka_DeleteTopic_t **)malloc(sizeof(*del_topics)); + del_topics[0] = rd_kafka_DeleteTopic_new(topic_name.data()); -// if(parse_result != ATOMS.atomOk) -// return parse_result; + bool not_found; -// bool not_found; + producer->topics->DeleteTopic(topic_name, *del_topics, ¬_found); -// producer->topics->AddTopic(topic_name, del_topics.get(), ¬_found); + if(not_found) + return make_error(env,"topic not found"); -// if(not_found) -// return make_error(env,"topic not found"); + rd_kafka_DeleteTopic_destroy_array(del_topics, 1); + free(del_topics); -// del_topics.release(); -// return ATOMS.atomOk; -// } + return ATOMS.atomOk; +} ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { diff --git a/c_src/erlkaf_producer.h b/c_src/erlkaf_producer.h index a51ae93..8235dc1 100644 --- a/c_src/erlkaf_producer.h +++ b/c_src/erlkaf_producer.h @@ -7,6 +7,7 @@ void enif_producer_free(ErlNifEnv* env, void* obj); ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_set_owner(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/topicmanager.cc b/c_src/topicmanager.cc index 5dd9d2b..e425980 100644 --- a/c_src/topicmanager.cc +++ b/c_src/topicmanager.cc @@ -42,12 +42,14 @@ void* TopicManager::DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* return NULL; } - // scoped_ptr(del_topics, rd_kafka_DeleteTopic_t*, rd_kafka_DeleteTopic_new(name.c_str()), rd_kafka_DeleteTopic_destroy); - - // rd_kafka_DeleteTopic_t* del_topics = rd_kafka_DeleteTopic_new(name.c_str()); + rd_kafka_AdminOptions_t *options; + options = rd_kafka_AdminOptions_new(rk_, RD_KAFKA_ADMIN_OP_DELETETOPICS); + *not_found = false; - rd_kafka_DeleteTopics(rk_, &del_topics, 1, NULL, NULL); + rd_kafka_DeleteTopics(rk_, &del_topics, 1, options, NULL); + + rd_kafka_AdminOptions_destroy(options); return NULL; } From 43d34c1150a25589cc72257464aae719118914fd Mon Sep 17 00:00:00 2001 From: VitorTrin Date: Wed, 22 Nov 2023 12:48:36 -0300 Subject: [PATCH 4/5] Expose new nif function --- c_src/erlkaf_nif.cc | 1 + c_src/topicmanager.cc | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/c_src/erlkaf_nif.cc b/c_src/erlkaf_nif.cc index 41b0b01..0e7d309 100755 --- a/c_src/erlkaf_nif.cc +++ b/c_src/erlkaf_nif.cc @@ -119,6 +119,7 @@ static ErlNifFunc nif_funcs[] = {"producer_new", 2, enif_producer_new}, {"producer_set_owner", 2, enif_producer_set_owner}, {"producer_topic_new", 3, enif_producer_topic_new}, + {"producer_topic_delete", 3, enif_producer_topic_delete}, {"producer_cleanup", 1, enif_producer_cleanup}, {"produce", 7, enif_produce}, {"get_metadata", 1, enif_get_metadata, ERL_NIF_DIRTY_JOB_IO_BOUND}, diff --git a/c_src/topicmanager.cc b/c_src/topicmanager.cc index e425980..d5e7b4b 100644 --- a/c_src/topicmanager.cc +++ b/c_src/topicmanager.cc @@ -45,11 +45,14 @@ void* TopicManager::DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* rd_kafka_AdminOptions_t *options; options = rd_kafka_AdminOptions_new(rk_, RD_KAFKA_ADMIN_OP_DELETETOPICS); + rd_kafka_queue_t *rkqu; + rkqu = rd_kafka_queue_new(rk_); *not_found = false; - rd_kafka_DeleteTopics(rk_, &del_topics, 1, options, NULL); + rd_kafka_DeleteTopics(rk_, &del_topics, 1, options, rkqu); rd_kafka_AdminOptions_destroy(options); + rd_kafka_queue_destroy(rkqu); return NULL; } From b2ed9836bc278bbeacf7cc2a06e006f5777f4d5c Mon Sep 17 00:00:00 2001 From: VitorTrin Date: Wed, 22 Nov 2023 15:37:51 -0300 Subject: [PATCH 5/5] Add functions to public api --- c_src/erlkaf_producer.cc | 4 ++-- src/erlkaf.erl | 12 ++++++++++++ src/erlkaf_manager.erl | 9 ++++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index cf7c7cc..03253ce 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -162,7 +162,7 @@ ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_ return make_badarg(env); rd_kafka_DeleteTopic_t **del_topics; - del_topics = (rd_kafka_DeleteTopic_t **)malloc(sizeof(*del_topics)); + del_topics = reinterpret_cast(malloc(sizeof(*del_topics))); del_topics[0] = rd_kafka_DeleteTopic_new(topic_name.data()); bool not_found; @@ -170,7 +170,7 @@ ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_ producer->topics->DeleteTopic(topic_name, *del_topics, ¬_found); if(not_found) - return make_error(env,"topic not found"); + return make_error(env, "topic not found"); rd_kafka_DeleteTopic_destroy_array(del_topics, 1); free(del_topics); diff --git a/src/erlkaf.erl b/src/erlkaf.erl index e78ff12..0da3ca4 100644 --- a/src/erlkaf.erl +++ b/src/erlkaf.erl @@ -16,6 +16,8 @@ create_topic/2, create_topic/3, + delete_topic/2, + produce/4, produce/5, produce/6, @@ -109,6 +111,16 @@ create_topic(ClientId, TopicName, TopicConfig) -> {error, ?ERR_UNDEFINED_CLIENT} end. +-spec delete_topic(client_id(), binary()) -> + ok | {error, reason()}. +delete_topic(ClientId, TopicName) -> + case erlkaf_cache_client:get(ClientId) of + {ok, ClientRef, _ClientPid} -> + erlkaf_manager:delete_topic(ClientRef, TopicName); + _ -> + {error, ?ERR_UNDEFINED_CLIENT} + end. + -spec get_metadata(client_id()) -> {ok, map()} | {error, reason()}. get_metadata(ClientId) -> diff --git a/src/erlkaf_manager.erl b/src/erlkaf_manager.erl index 2ef9b4c..d9f25db 100644 --- a/src/erlkaf_manager.erl +++ b/src/erlkaf_manager.erl @@ -13,6 +13,7 @@ start_consumer_group/5, stop_client/1, create_topic/3, + delete_topic/2, % gen_server @@ -43,6 +44,9 @@ stop_client(ClientId) -> create_topic(ClientRef, TopicName, TopicConfig) -> erlkaf_utils:safe_call(?MODULE, {create_topic, ClientRef, TopicName, TopicConfig}). +delete_topic(ClientRef, TopicName) -> + erlkaf_utils:safe_call(?MODULE, {delete_topic, ClientRef, TopicName}). + %gen server init([]) -> @@ -51,6 +55,9 @@ init([]) -> handle_call({create_topic, ClientRef, TopicName, TopicConfig}, _From, State) -> {reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName, TopicConfig), State}; +handle_call({delete_topic, ClientRef, TopicName}, _From, State) -> + {reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName), State}; + handle_call({start_producer, ClientId, ErlkafConfig, LibRdkafkaConfig}, _From, State) -> case internal_start_producer(ClientId, ErlkafConfig, LibRdkafkaConfig) of {ok, _Pid} -> @@ -149,4 +156,4 @@ valid_consumer_topics([H|T]) -> {error, {invalid_topic, H}} end; valid_consumer_topics([]) -> - ok. \ No newline at end of file + ok.