Skip to content
41 changes: 41 additions & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
%% Topic APIs
-export([ create_topics/3
, create_topics/4
, create_partitions/3
, create_partitions/4
, delete_topics/3
, delete_topics/4
]).
Expand Down Expand Up @@ -190,6 +192,7 @@
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
Expand Down Expand Up @@ -1049,6 +1052,44 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).

%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs).

%% @doc Create partitions(s) in kafka.
%% <ul>
%% <li>`topic'
%% The topic name.
%% </li>
%%
%% <li>`new_partitions'
%% The `count` of how many partitions will exist for the topic. current + desired
%% The number of `assignment` should be equal to the number of new partitions.
%% Each list for assignment specify the prefferred broker ids to assign
%% </li>
%%
%% Example:
%% ```
%% > TopicPartitionConfigs = [
%% #{
%% topic: <<"my_topic">>,
%% new_partitions: #{
%% count: 6,
%% assignment: [[1,2], [2,3], [3,1]]
%% }
%% }
%% ].
%% > brod:create_partitions([{"localhost", 9092}], TopicPartitionConfigs, #{timeout => 1000}, []).
%% ok
%% '''
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
conn_config()) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options) ->
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options).

%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
Expand Down
43 changes: 43 additions & 0 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

%% Test export
-export([ lookup_partitions_count_cache/2
, sync_topics_metadata/2
]).

-export([ code_change/3
Expand All @@ -72,6 +73,7 @@
-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).
-define(DEFAULT_SYNC_METADATA_INTERVAL_SECONDS, 60).

%% ClientId as ets table name.
-define(ETS(ClientId), ClientId).
Expand Down Expand Up @@ -130,6 +132,7 @@
, consumers_sup :: ?undef | pid()
, config :: ?undef | config()
, workers_tab :: ?undef | ets:tab()
, sync_ref :: ?undef | reference()
}).

-type state() :: #state{}.
Expand Down Expand Up @@ -345,10 +348,12 @@ init({BootstrapEndpoints, ClientId, Config}) ->
Tab = ets:new(?ETS(ClientId),
[named_table, protected, {read_concurrency, true}]),
self() ! init,
SyncRef = maybe_start_metadata_sync(Config),
{ok, #state{ client_id = ClientId
, bootstrap_endpoints = BootstrapEndpoints
, config = Config
, workers_tab = Tab
, sync_ref = SyncRef
}}.

%% @private
Expand All @@ -362,6 +367,11 @@ handle_info(init, State0) ->
, consumers_sup = ConsumersSupPid
},
{noreply, State};
handle_info(sync, #state{config = Config, producers_sup = Sup} = State) ->
Children = brod_supervisor3:which_children(Sup),
NewState = sync_topics_metadata(Children, State),
Ref = erlang:send_after(sync_interval(Config), self(), sync),
{noreply, NewState#state{ sync_ref = Ref }};
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, producers_sup = Pid
} = State) ->
Expand Down Expand Up @@ -695,6 +705,11 @@ timeout(Config) ->
?DEFAULT_GET_METADATA_TIMEOUT_SECONDS),
timer:seconds(T).

sync_interval(Config) ->
T = config(sync_metadata_interval_seconds, Config,
?DEFAULT_SYNC_METADATA_INTERVAL_SECONDS),
timer:seconds(T).

config(Key, Config, Default) ->
proplists:get_value(Key, Config, Default).

Expand All @@ -719,6 +734,34 @@ ensure_binary(ClientId) when is_atom(ClientId) ->
ensure_binary(ClientId) when is_binary(ClientId) ->
ClientId.

maybe_start_metadata_sync(Config) ->
case config(sync_metadata, Config, false) of
true ->
erlang:send_after(sync_interval(Config), self(), sync);
false ->
undefined
end.

sync_topics_metadata(Children, State) ->
lists:foldl(fun({Topic, _, _, _}, StateAcc) -> sync_topic_metadata(Topic, StateAcc) end, State, Children).

sync_topic_metadata(Topic, State) ->
case do_get_metadata(Topic, State) of
{{ok, #{ topics := [ #{ partitions := Partitions } ] }}, NewState} ->
lists:foreach(fun (Partition) -> sync_topic_partition(Partition, Topic, State) end, Partitions),
NewState;
{{error, _}, NewState} ->
NewState
end.

sync_topic_partition(#{ partition_index := Partition }, Topic, #state{ producers_sup = Sup, client_id = Client, config = Config}) ->
case brod_producers_sup:find_producer(Sup, Topic, Partition) of
{ok, _} ->
ok;
{error, {producer_not_found, _, _}} ->
brod_producer:start_link(Client, Topic, Partition, Config)
end.

-spec maybe_connect(state(), endpoint()) ->
{Result, state()} when Result :: {ok, pid()} | {error, any()}.
maybe_connect(#state{} = State, Endpoint) ->
Expand Down
33 changes: 17 additions & 16 deletions src/brod_kafka_apis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,23 @@ lookup_vsn_range(Conn, API) ->
%% Do not change range without verification.
supported_versions(API) ->
case API of
produce -> {0, 7};
fetch -> {0, 10};
list_offsets -> {0, 2};
metadata -> {0, 2};
offset_commit -> {2, 2};
offset_fetch -> {1, 2};
find_coordinator -> {0, 0};
join_group -> {0, 1};
heartbeat -> {0, 0};
leave_group -> {0, 0};
sync_group -> {0, 0};
describe_groups -> {0, 0};
list_groups -> {0, 0};
create_topics -> {0, 0};
delete_topics -> {0, 0};
_ -> erlang:error({unsupported_api, API})
produce -> {0, 7};
fetch -> {0, 10};
list_offsets -> {0, 2};
metadata -> {0, 2};
offset_commit -> {2, 2};
offset_fetch -> {1, 2};
find_coordinator -> {0, 0};
join_group -> {0, 1};
heartbeat -> {0, 0};
leave_group -> {0, 0};
sync_group -> {0, 0};
describe_groups -> {0, 0};
list_groups -> {0, 0};
create_topics -> {0, 0};
create_partitions -> {0, 1};
delete_topics -> {0, 0};
_ -> erlang:error({unsupported_api, API})
end.

monitor_connection(Conn) ->
Expand Down
12 changes: 12 additions & 0 deletions src/brod_kafka_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-module(brod_kafka_request).

-export([ create_topics/3
, create_partitions/3
, delete_topics/3
, fetch/8
, list_groups/1
Expand All @@ -36,6 +37,7 @@
-type vsn() :: brod_kafka_apis:vsn().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type conn() :: kpro:connection().
Expand Down Expand Up @@ -65,6 +67,16 @@ create_topics(Connection, TopicConfigs, RequestConfigs)
create_topics(Vsn, TopicConfigs, RequestConfigs) ->
kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs).

%% @doc Make a create_partitions request.
-spec create_partitions(vsn() | conn(), [topic_partition_config()], #{timeout => kpro:int32(),
validate_only => boolean()}) -> kpro:req().
create_partitions(Connection, TopicPartitionConfigs, RequestConfigs)
when is_pid(Connection) ->
Vsn = brod_kafka_apis:pick_version(Connection, create_partitions),
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs);
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs) ->
kpro_req_lib:create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs).

%% @doc Make a delete_topics request.
-spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req().
delete_topics(Connection, Topics, Timeout) when is_pid(Connection) ->
Expand Down
24 changes: 24 additions & 0 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
, describe_groups/3
, create_topics/3
, create_topics/4
, create_partitions/3
, create_partitions/4
, delete_topics/3
, delete_topics/4
, epoch_ms/0
Expand Down Expand Up @@ -73,6 +75,7 @@
-type conn_config() :: brod:conn_config().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type endpoint() :: brod:endpoint().
Expand Down Expand Up @@ -103,6 +106,27 @@ create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
Pid, TopicConfigs, RequestConfigs),
request_sync(Pid, Request)
end).

%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, _ConnCfg = []).

%% @doc Try to connect to the controller node using the given
%% connection options and create the given partitions with configs
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
conn_config()) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, ConnCfg) ->
KproOpts = kpro_connection_options(ConnCfg),
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
fun(Pid) ->
Request = brod_kafka_request:create_partitions(
Pid, TopicPartitionConfigs, RequestConfigs),
request_sync(Pid, Request)
end).

%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
Expand Down
17 changes: 15 additions & 2 deletions test/brod_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
]).

%% Test cases
-export([ t_create_delete_topics/1
-export([ t_create_update_delete_topics/1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add another case which starts producers for a topic, then create new partitions, then assert that partition producer is started automatically?

, t_delete_topics_not_found/1
]).

Expand Down Expand Up @@ -59,7 +59,7 @@ all() -> [F || {F, _A} <- module_info(exports),

%%%_* Test functions ===========================================================

t_create_delete_topics(Config) when is_list(Config) ->
t_create_update_delete_topics(Config) when is_list(Config) ->
Topic = iolist_to_binary(["test-topic-", integer_to_list(erlang:system_time())]),
TopicConfig = [
#{
Expand All @@ -70,9 +70,22 @@ t_create_delete_topics(Config) when is_list(Config) ->
name => Topic
}
],
TopicPartitionConfig = [
#{
topic => Topic,
new_partitions => #{
count => 2,
assignment => [[0]]
}
}
],
try
?assertEqual(ok,
brod:create_topics(?HOSTS, TopicConfig, #{timeout => ?TIMEOUT},
#{connect_timeout => ?TIMEOUT})),

?assertEqual(ok,
brod:create_partitions(?HOSTS, TopicPartitionConfig, #{timeout => ?TIMEOUT},
#{connect_timeout => ?TIMEOUT}))
after
?assertEqual(ok, brod:delete_topics(?HOSTS, [Topic], ?TIMEOUT,
Expand Down
28 changes: 26 additions & 2 deletions test/brod_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
-export([ auth/6 ]).

%% Test cases
-export([ t_skip_unreachable_endpoint/1
-export([ t_optional_metadata_syncing/1
, t_skip_unreachable_endpoint/1
, t_no_reachable_endpoint/1
, t_call_bad_client_id/1
, t_metadata_connection_restart/1
Expand Down Expand Up @@ -69,6 +70,19 @@
end
end()).


-record(state,
{ client_id :: pid()
, bootstrap_endpoints :: [brod:endpoint()]
, meta_conn :: kpro:connection()
, payload_conns = [] :: list()
, producers_sup :: pid()
, consumers_sup :: pid()
, config :: list()
, workers_tab :: ets:tab()
, sync_ref :: reference()
}).

%%%_* ct callbacks =============================================================

suite() -> [{timetrap, {seconds, 30}}].
Expand Down Expand Up @@ -115,6 +129,17 @@ all() ->

%%%_* Test functions ===========================================================

t_optional_metadata_syncing(Config) when is_list(Config) ->
Client0 = has_sync,
Config0 = [{sync_metadata, true}],
{ok, #state{ sync_ref = HasRef }} = brod_client:init({?HOSTS, Client0, Config0}),
?assertNot(HasRef =:= undefined),

Config1 = [{sync_metadata, false}],
Client1 = no_sync,
{ok, #state{ sync_ref = NoRef }} = brod_client:init({?HOSTS, Client1, Config1}),
?assertMatch(undefined, NoRef).

t_get_partitions_count_safe(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
ok = start_client(?HOSTS, Client),
Expand All @@ -141,7 +166,6 @@ t_get_partitions_count_configure_cache_ttl(Config) when is_list(Config) ->
?assertMatch(false, Res3),
ok = brod:stop_client(Client).


t_skip_unreachable_endpoint(Config) when is_list(Config) ->
Client = t_skip_unreachable_endpoint,
ok = start_client([{"localhost", 8192} | ?HOSTS], Client),
Expand Down
Loading