Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c_src/erlkaf_nif.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ static ErlNifFunc nif_funcs[] =
{"producer_new", 2, enif_producer_new},
{"producer_set_owner", 2, enif_producer_set_owner},
{"producer_topic_new", 3, enif_producer_topic_new},
{"producer_topic_delete", 3, enif_producer_topic_delete},
{"producer_cleanup", 1, enif_producer_cleanup},
{"produce", 7, enif_produce},
{"get_metadata", 1, enif_get_metadata, ERL_NIF_DIRTY_JOB_IO_BOUND},
Expand Down
32 changes: 32 additions & 0 deletions c_src/erlkaf_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,38 @@ ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TER
return ATOMS.atomOk;
}

ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);

std::string topic_name;
enif_producer* producer;

erlkaf_data* data = static_cast<erlkaf_data*>(enif_priv_data(env));

if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast<void**>(&producer)))
return make_badarg(env);

if(!get_string(env, argv[1], &topic_name))
return make_badarg(env);

rd_kafka_DeleteTopic_t **del_topics;
del_topics = reinterpret_cast<rd_kafka_DeleteTopic_t **>(malloc(sizeof(*del_topics)));
del_topics[0] = rd_kafka_DeleteTopic_new(topic_name.data());

bool not_found;

producer->topics->DeleteTopic(topic_name, *del_topics, &not_found);

if(not_found)
return make_error(env, "topic not found");

rd_kafka_DeleteTopic_destroy_array(del_topics, 1);
free(del_topics);

return ATOMS.atomOk;
}

ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
UNUSED(argc);
Expand Down
1 change: 1 addition & 0 deletions c_src/erlkaf_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ void enif_producer_free(ErlNifEnv* env, void* obj);

ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_set_owner(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_producer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
Expand Down
27 changes: 27 additions & 0 deletions c_src/topicmanager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,33 @@ rd_kafka_topic_t* TopicManager::AddTopic(const std::string& name, rd_kafka_topic
return topic;
}

void* TopicManager::DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found)
{
CritScope ss(&crt_);

auto it = topics_.find(name);

if(it == topics_.end())
{
*not_found = true;
return NULL;
}

rd_kafka_AdminOptions_t *options;
options = rd_kafka_AdminOptions_new(rk_, RD_KAFKA_ADMIN_OP_DELETETOPICS);

rd_kafka_queue_t *rkqu;
rkqu = rd_kafka_queue_new(rk_);

*not_found = false;
rd_kafka_DeleteTopics(rk_, &del_topics, 1, options, rkqu);

rd_kafka_AdminOptions_destroy(options);
rd_kafka_queue_destroy(rkqu);

return NULL;
}

void TopicManager::Cleanup()
{
CritScope ss(&crt_);
Expand Down
2 changes: 2 additions & 0 deletions c_src/topicmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "macros.h"
#include "critical_section.h"
#include "rdkafka.h"

#include <map>
#include <string>
Expand All @@ -19,6 +20,7 @@ class TopicManager
~TopicManager();

rd_kafka_topic_t* AddTopic(const std::string& name, rd_kafka_topic_conf_t* conf, bool* already_exist);
void* DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found);
rd_kafka_topic_t* GetOrCreateTopic(const std::string& name);

private:
Expand Down
12 changes: 12 additions & 0 deletions src/erlkaf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
create_topic/2,
create_topic/3,

delete_topic/2,

produce/4,
produce/5,
produce/6,
Expand Down Expand Up @@ -109,6 +111,16 @@ create_topic(ClientId, TopicName, TopicConfig) ->
{error, ?ERR_UNDEFINED_CLIENT}
end.

-spec delete_topic(client_id(), binary()) ->
ok | {error, reason()}.
delete_topic(ClientId, TopicName) ->
case erlkaf_cache_client:get(ClientId) of
{ok, ClientRef, _ClientPid} ->
erlkaf_manager:delete_topic(ClientRef, TopicName);
_ ->
{error, ?ERR_UNDEFINED_CLIENT}
end.

-spec get_metadata(client_id()) ->
{ok, map()} | {error, reason()}.
get_metadata(ClientId) ->
Expand Down
9 changes: 8 additions & 1 deletion src/erlkaf_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
start_consumer_group/5,
stop_client/1,
create_topic/3,
delete_topic/2,

% gen_server

Expand Down Expand Up @@ -43,6 +44,9 @@ stop_client(ClientId) ->
create_topic(ClientRef, TopicName, TopicConfig) ->
erlkaf_utils:safe_call(?MODULE, {create_topic, ClientRef, TopicName, TopicConfig}).

delete_topic(ClientRef, TopicName) ->
erlkaf_utils:safe_call(?MODULE, {delete_topic, ClientRef, TopicName}).

%gen server

init([]) ->
Expand All @@ -51,6 +55,9 @@ init([]) ->
handle_call({create_topic, ClientRef, TopicName, TopicConfig}, _From, State) ->
{reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName, TopicConfig), State};

handle_call({delete_topic, ClientRef, TopicName}, _From, State) ->
{reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName), State};

handle_call({start_producer, ClientId, ErlkafConfig, LibRdkafkaConfig}, _From, State) ->
case internal_start_producer(ClientId, ErlkafConfig, LibRdkafkaConfig) of
{ok, _Pid} ->
Expand Down Expand Up @@ -149,4 +156,4 @@ valid_consumer_topics([H|T]) ->
{error, {invalid_topic, H}}
end;
valid_consumer_topics([]) ->
ok.
ok.