Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
branches:
- master
env:
OTP_VERSION: "27.2"
OTP_VERSION: "27"
REBAR_VERSION: "3.24.0"

jobs:
Expand All @@ -18,9 +18,11 @@ jobs:
- name: Checkout
uses: actions/checkout@v2
- name: OTP
uses: erlef/setup-beam@v1.20.4
uses: erlef/setup-beam@v1
with:
version-type: strict
# NOTE: Due to set the erlang version is `OTP-27` which not assign a specific version,
# so the option of `version-type` should be `loose`.
version-type: loose
otp-version: ${{ env.OTP_VERSION }}
rebar3-version: ${{ env.REBAR_VERSION }}
- name: Cache Build
Expand Down Expand Up @@ -57,7 +59,7 @@ jobs:
- name: Install Erlang
uses: erlef/setup-beam@v1
with:
version-type: strict
version-type: loose
otp-version: ${{matrix.vsn[0]}}
rebar3-version: ${{ env.REBAR_VERSION }}
- name: Compile
Expand Down Expand Up @@ -96,7 +98,7 @@ jobs:
- name: OTP
uses: erlef/setup-beam@v1
with:
version-type: strict
version-type: loose
otp-version: ${{ env.OTP_VERSION }}
rebar3-version: ${{ env.REBAR_VERSION }}
- name: Build Documentation
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

- 4.4.5
- Start supervisor process for the new increase partitions at `do_get_metadata` function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the goal is to automatically start producers for newly discovered partitions, maybe brod_client should start a timer to periodically refresh partition counts for each topic which has a producers_sup started

Copy link
Contributor Author

@wiserfz wiserfz Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of this, I do really consider of this, maybe should start a process and automatically get kafka metadata then start new partition producer, and also I noticed the PR of #623 which already implemented this feature.

But, there are two reasons lead me open this PR:

  1. The bug of function brod_client:get_metadata/2, this function only modify the number of topic partition in ETS table, but not start the partition producer.
  2. In my situation, I get kafka metadata and calculate the partition index which I need to produce the message; use Support Create Partitions #623 feature, there may be a jet lag that the partition producer is not start when I need to produce.

Of course, #623 feature is good and I think it's not conflict with this PR.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think i can agree to this.
but should also stop producers if some partitions are deleted ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think i can agree to this. but should also stop producers if some partitions are deleted ?

kafka topic partitions only can be increased, it's can't be reduced, so it doesn't need to be stopped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i'm referring to the case when a topic is deleted then created.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just getting back around to #623, will get it moving again sorry for the delay


- 4.4.4
- Fixed `ListGroups` API request for Kafka Protocol API version 3.

Expand Down
75 changes: 72 additions & 3 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ do_get_metadata(FetchMetadataFor, Topics,
#state{ client_id = ClientId
, workers_tab = Ets
, config = Config
, producers_sup = ProducersSup
} = State0) ->
FetchTopics = case FetchMetadataFor of
all -> all; %% in case no topic is given, get all
Expand All @@ -633,11 +634,18 @@ do_get_metadata(FetchMetadataFor, Topics,
Request = brod_kafka_request:metadata(Conn, FetchTopics),
case request_sync(State, Request) of
{ok, #kpro_rsp{api = metadata, msg = Metadata}} ->
TopicMetadataArray = kf(topics, Metadata),
TopicsMetadata = kf(topics, Metadata),
ok = maybe_start_partition_producer(
filter_topics(TopicsMetadata),
brod_producers_sup:count_started_children(ProducersSup),
Topics,
ProducersSup
),

UnknownTopicCacheTtl = config(unknown_topic_cache_ttl, Config,
?DEFAULT_UNKNOWN_TOPIC_CACHE_TTL),
ok = update_partitions_count_cache(Ets, TopicMetadataArray, UnknownTopicCacheTtl),
ok = maybe_cache_unknown_topic_partition(Ets, Topics, TopicMetadataArray,
ok = update_partitions_count_cache(Ets, TopicsMetadata, UnknownTopicCacheTtl),
ok = maybe_cache_unknown_topic_partition(Ets, Topics, TopicsMetadata,
UnknownTopicCacheTtl),
{{ok, Metadata}, State};
{error, Reason} ->
Expand Down Expand Up @@ -981,6 +989,67 @@ ensure_partition_workers(TopicName, State, F) ->
end
end).

-spec maybe_start_partition_producer(TopicsMetadata, Producers, Topics, ProducerSup) -> ok
when
TopicsMetadata :: #{Key => non_neg_integer()},
Producers :: #{Key => non_neg_integer()},
Topics :: [topic()],
ProducerSup :: pid(),
Key :: topic().
maybe_start_partition_producer(_TopicsMetadata, _Producers, [], _ProducerSuf) ->
ok;
maybe_start_partition_producer(TopicsMetadata, Producers, [Topic | Rest], ProducerSup) when
is_map_key(Topic, TopicsMetadata), is_map_key(Topic, Producers)
->
OldPartitionCnt = maps:get(Topic, Producers),
NewPartitionCnt = maps:get(Topic, TopicsMetadata),

case NewPartitionCnt > OldPartitionCnt of
true ->
do_start_partition_producer(ProducerSup, Topic, OldPartitionCnt, NewPartitionCnt);
_ ->
ok
end,

maybe_start_partition_producer(TopicsMetadata, Producers, Rest, ProducerSup);
maybe_start_partition_producer(TopicsMetadata, Producers, [_Topic | Rest], ProducerSup) ->
maybe_start_partition_producer(TopicsMetadata, Producers, Rest, ProducerSup).

-spec do_start_partition_producer(pid(), topic(), non_neg_integer(), non_neg_integer()) -> ok.
do_start_partition_producer(ProducerSup, Topic, OldPartitionCnt, NewPartitionCnt) ->
%% Get producer config from partition producer which partition index is 0.
case brod_producers_sup:get_producer_config(ProducerSup, Topic, 0) of
{ok, Config} ->
%% start new producers for the partitions that are not started yet
lists:foreach(
fun
(Partition) when is_integer(Partition) ->
%% start a new producer for the partition
brod_producers_sup:start_producer(ProducerSup, self(), Topic, Partition, Config),
ok;
(_) ->
ok
end,
lists:seq(OldPartitionCnt, NewPartitionCnt - 1)
);
_ ->
%% get producer config failed, do not start new producers
ok
end.

-spec filter_topics([kpro:struct()]) -> #{topic() => non_neg_integer()}.
filter_topics(TopicsMetadataArray) ->
{_, NoErrors1} =
lists:partition(fun(#{error_code := E}) -> ?IS_ERROR(E) end, TopicsMetadataArray),

NoErrors = [
{TopicName, erlang:length(Partitions)}
|| #{name := <<TopicName/binary>>, partitions := Partitions} <- NoErrors1,
is_list(Partitions)
],

maps:from_list(NoErrors).

%% Catches exit exceptions when making gen_server:call.
-spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return
when Call :: term(),
Expand Down
60 changes: 60 additions & 0 deletions src/brod_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
, start_link/0
, find_producer/3
, start_producer/4
, start_producer/5
, stop_producer/2
, get_producer_config/3
, count_started_children/1
]).

-include("brod_int.hrl").
Expand Down Expand Up @@ -60,6 +63,13 @@ start_producer(SupPid, ClientPid, TopicName, Config) ->
Spec = producers_sup_spec(ClientPid, TopicName, Config),
brod_supervisor3:start_child(SupPid, Spec).

%% @doc Dynamically start a partition producer
-spec start_producer(pid(), pid(), brod:topic(), brod:partition(), brod:producer_config()) ->
{ok, pid()} | {error, any()}.
start_producer(SupPid, ClientPid, Topic, Partition, Config) ->
Spec = producer_spec(ClientPid, Topic, Partition, Config),
brod_supervisor3:start_child(SupPid, Spec).

%% @doc Dynamically stop a per-topic supervisor
-spec stop_producer(pid(), brod:topic()) -> ok | {}.
stop_producer(SupPid, TopicName) ->
Expand Down Expand Up @@ -92,6 +102,56 @@ find_producer(SupPid, Topic, Partition) ->
end
end.

%% @doc Get topic producer config from producer child specification.
-spec get_producer_config(pid(), brod:topic(), brod:partition()) ->
{ok, brod_producer:config()} | {error, Reason} when
Reason :: {producer_not_found, brod:topic()}
| {not_found, brod:topic(), brod:partition()}
| {producer_down, any()}.
get_producer_config(SupPid, Topic, Partition) ->
case brod_supervisor3:find_child(SupPid, Topic) of
[] ->
{error, {producer_not_found, Topic}};
[PartitionsSupPid]->
try
case brod_supervisor3:get_childspec(PartitionsSupPid, Partition) of
{ok, {_Name, {brod_producer, start_link, [_, _, _, Config]}, _Restart,
_Shutdown, _Type, _Module}} when is_list(Config) ->
{ok, Config};
_ ->
{error, {not_found, Topic, Partition}}
end
catch exit : {Reason, _} ->
{error, {producer_down, Reason}}
end
end.

%% @doc Count started children under a given supervisor process.
-spec count_started_children(pid()) -> #{brod:topic() => non_neg_integer()}.
count_started_children(SupRef) ->
TopicSups = which_producers(SupRef),
lists:foldl(
fun
({Topic, Pid, _Type, _Mods}, Acc) when is_binary(Topic), is_pid(Pid) ->
PartitionWorkers = which_producers(Pid),
case length(PartitionWorkers) of
0 ->
Acc;
N ->
Acc#{Topic => N}
end;
(_, Acc) ->
Acc
end, #{}, TopicSups).

which_producers(Sup) ->
try
brod_supervisor3:which_children(Sup)
catch
_:_ ->
[]
end.

%% @doc brod_supervisor3 callback.
init(?TOPICS_SUP) ->
{ok, {{one_for_one, 0, 1}, []}};
Expand Down
39 changes: 37 additions & 2 deletions src/brod_supervisor3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
which_children/1, count_children/1,
find_child/2, check_childspecs/1]).
find_child/2, check_childspecs/1,
get_childspec/2]).

%% Internal exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -297,6 +298,21 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.

%%-----------------------------------------------------------------
%% Func: get_childspec/2
%% Returns: {ok, child_spec()} | {error, Reason}
%% the child specification tuple for the child identified by `Name' under
%% supervisor `SupRef'. The returned map contains all keys, both mandatory and
%% optional.
%%-----------------------------------------------------------------
-spec get_childspec(SupRef, Name) -> Result when
SupRef :: sup_ref(),
Name :: child_id(),
Result :: {'ok', child_spec()} | {'error', Error},
Error :: 'not_found'.
get_childspec(Supervisor, Name) ->
call(Supervisor, {get_childspec, Name}).

%%%-----------------------------------------------------------------
%%% Called by timer:apply_after from restart/2
-spec try_again_restart(SupRef, Child, Reason) -> ok when
Expand Down Expand Up @@ -586,7 +602,15 @@ handle_call(count_children, _From, State) ->
%% Reformat counts to a property list.
Reply = [{specs, Specs}, {active, Active},
{supervisors, Supers}, {workers, Workers}],
{reply, Reply, State}.
{reply, Reply, State};

handle_call({get_childspec, Name}, _From, State) ->
case get_child(Name, State) of
{value, Child} ->
{reply, {ok, child_to_spec(Child)}, State};
_ ->
{reply, {error, not_found}, State}
end.

count_if_alive(Pid, Alive, Total) ->
case is_pid(Pid) andalso is_process_alive(Pid) of
Expand All @@ -612,6 +636,17 @@ count_child(#child{pid = Pid, child_type = supervisor},
false -> {Specs + 1, Active, Supers + 1, Workers}
end.

-spec child_to_spec(child_rec()) -> child_spec().
child_to_spec(#child{
pid = _Pid,
name = Name,
mfargs = Func,
restart_type = Restart,
shutdown = Shutdown,
child_type = Type,
modules = Mods
}) ->
{Name, Func, Restart, Shutdown, Type, Mods}.

%%% If a restart attempt failed, this message is sent via
%%% timer:apply_after(0,...) in order to give gen_server the chance to
Expand Down
46 changes: 46 additions & 0 deletions test/brod_producer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
, t_produce_fire_n_forget/1
, t_configure_produce_api_vsn/1
, t_produce_pre_defined_partitioner/1
, t_producer_increased_partition/1
]).

-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -204,6 +205,51 @@ t_produce_no_ack(Config) when is_list(Config) ->
ReceiveFun(K1, V1),
ReceiveFun(K2, V2).

t_producer_increased_partition({init, Config}) ->
Client = t_producer_increased_partition,
Topic = <<"brod_test_increased_partition">>,
case whereis(Client) of
?undef -> ok;
Pid_ -> brod:stop_client(Pid_)
end,
TesterPid = self(),
kafka_test_helper:create_topic(Topic, 3, 1),
ok = brod:start_client(?HOSTS, Client, client_config()),
ok = brod:start_producer(Client, Topic, []),
[{client, Client}, {topic, Topic}, {tester, TesterPid} | Config];
t_producer_increased_partition(Config) when is_list(Config) ->
Client = ?config(client),
Topic = ?config(topic),
Partition = 4,
?assertEqual(
{error, {producer_not_found, Topic, Partition}},
brod:produce(Client, Topic, Partition, <<"k">>, <<"v">>)
),

kafka_test_helper:increase_topic_partition(Topic, 5),
Ret = brod_client:get_metadata(Client, Topic),
?assertMatch({ok, _}, Ret),

TesterPid = ?config(tester),
ok = brod:start_consumer(Client, Topic, []),
Subscriber = spawn_link(fun() -> subscriber_loop(TesterPid) end),
{ok, _ConsumerPid} = brod:subscribe(Client, Subscriber, Topic, Partition, []),

{Key, Value} = make_unique_kv(),
{ok, CallRef} = brod:produce(Client, Topic, Partition, Key, Value),
ok = brod:sync_produce_request(CallRef),
receive
{_, _, K, V} ->
?assertEqual(Key, K),
?assertEqual(Value, V)
after
5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,

kafka_test_helper:delete_topic(Topic),
unlink(Subscriber) andalso exit(Subscriber, kill).

t_produce_no_ack_offset({init, Config}) ->
t_produce_no_ack({init, Config});
t_produce_no_ack_offset(Config) when is_list(Config) ->
Expand Down
8 changes: 8 additions & 0 deletions test/kafka_test_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
, bootstrap_hosts/0
, kill_process/2
, kafka_version/0
, increase_topic_partition/2
, delete_topic/1
]).

-include("brod_test_macros.hrl").
Expand Down Expand Up @@ -125,6 +127,12 @@ create_topic(Name, NumPartitions, NumReplicas) ->
0 = exec_in_kafka_container(Create, [NumPartitions, NumReplicas, Name]),
wait_for_topic_by_describe(Name).

increase_topic_partition(Name, NumPartitions) ->
Increase = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++
" --alter --partitions ~p --topic ~s",
0 = exec_in_kafka_container(Increase, [NumPartitions, Name]),
wait_for_topic_by_describe(Name).

exec_in_kafka_container(FMT, Args) ->
CMD0 = lists:flatten(io_lib:format(FMT, Args)),
CMD = "docker exec kafka-1 bash -c '" ++ CMD0 ++ "'",
Expand Down