Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions src/syn_backbone.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
%% includes
-include("syn.hrl").

-include_lib("kernel/include/logger.hrl").

%% ===================================================================
%% API
%% ===================================================================
Expand Down Expand Up @@ -110,15 +112,30 @@ init([]) ->
{stop, Reason :: term(), Reply :: term(), State :: map()} |
{stop, Reason :: term(), State :: map()}.
handle_call({create_tables_for_scope, Scope}, _From, State) ->
error_logger:info_msg("SYN[~s] Creating tables for scope <~s>", [node(), Scope]),
ensure_table_existence(set, syn_registry_by_name, Scope),
ensure_table_existence(bag, syn_registry_by_pid, Scope),
ensure_table_existence(ordered_set, syn_pg_by_name, Scope),
ensure_table_existence(ordered_set, syn_pg_by_pid, Scope),
?LOG_INFO(#{
action => tables_created,
scope => Scope
},
#{
report_cb => fun syn_logger:scope/1,
domain => [syn, backbone]
}),
{reply, ok, State};

handle_call(Request, From, State) ->
error_logger:warning_msg("SYN[~s] Received from ~p an unknown call message: ~p", [node(), From, Request]),
?LOG_WARNING(#{
kind => call,
from => From,
msg => Request
},
#{
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, backbone]
}),
{reply, undefined, State}.

%% ----------------------------------------------------------------------------------------------------------
Expand All @@ -129,7 +146,14 @@ handle_call(Request, From, State) ->
{noreply, State :: map(), Timeout :: non_neg_integer()} |
{stop, Reason :: term(), State :: map()}.
handle_cast(Msg, State) ->
error_logger:warning_msg("SYN[~s] Received an unknown cast message: ~p", [node(), Msg]),
?LOG_WARNING(#{
kind => cast,
msg => Msg
},
#{
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, backbone]
}),
{noreply, State}.

%% ----------------------------------------------------------------------------------------------------------
Expand All @@ -140,15 +164,28 @@ handle_cast(Msg, State) ->
{noreply, State :: map(), Timeout :: non_neg_integer()} |
{stop, Reason :: term(), State :: map()}.
handle_info(Info, State) ->
error_logger:warning_msg("SYN[~s] Received an unknown info message: ~p", [node(), Info]),
?LOG_WARNING(#{
kind => info,
msg => Info
},
#{
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, backbone]
}),
{noreply, State}.

%% ----------------------------------------------------------------------------------------------------------
%% Terminate
%% ----------------------------------------------------------------------------------------------------------
-spec terminate(Reason :: term(), State :: map()) -> terminated.
terminate(Reason, _State) ->
error_logger:info_msg("SYN[~s] Terminating with reason: ~p", [node(), Reason]),
?LOG_INFO(#{
msg => {terminate, Reason}
},
#{
report_cb => fun syn_logger:terminate/1,
scope => [syn, backbone]
}),
%% return
terminated.

Expand Down
33 changes: 24 additions & 9 deletions src/syn_event_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@

-module(syn_event_handler).

-include_lib("kernel/include/logger.hrl").

%% API
-export([ensure_event_handler_loaded/0]).
-export([call_event_handler/2]).
Expand Down Expand Up @@ -230,10 +232,16 @@ call_event_handler(CallbackMethod, Args) ->
true ->
try apply(CustomEventHandler, CallbackMethod, Args)
catch Class:Reason:Stacktrace ->
error_logger:error_msg(
"SYN[~s] Error ~p:~p in custom handler ~p: ~p",
[node(), Class, Reason, CallbackMethod, Stacktrace]
)
?LOG_ERROR(#{
class => Class,
reason => Reason,
mfa => {CustomEventHandler, CallbackMethod, Args},
stacktrace => Stacktrace
},
#{
report_cb => fun syn_logger:callback_error/1,
domain => [syn, event_handler]
})
end;

_ ->
Expand All @@ -255,11 +263,18 @@ do_resolve_registry_conflict(Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Ti
PidToKeep when is_pid(PidToKeep) -> {PidToKeep, false};
_ -> {undefined, false}

catch Class:Reason ->
error_logger:error_msg(
"SYN[~s] Error ~p in custom handler resolve_registry_conflict: ~p",
[node(), Class, Reason]
),
catch Class:Reason:Stacktrace ->
?LOG_ERROR(#{
class => Class,
reason => Reason,
mfa => {CustomEventHandler,
resolve_registry_conflict, [Scope, Name, {Pid1, Meta1, Time1}, {Pid2, Meta2, Time2}]},
stacktrace => Stacktrace
},
#{
report_cb => fun syn_logger:callback_error/1,
domain => [syn, event_handler]
}),
{undefined, false}
end;

Expand Down
61 changes: 47 additions & 14 deletions src/syn_gen_scope.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
%% includes
-include("syn.hrl").

-include_lib("kernel/include/logger.hrl").

%% callbacks
-callback init(#state{}) ->
{ok, HandlerState :: term()}.
Expand Down Expand Up @@ -205,9 +207,13 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{
nodes_map = NodesMap
} = State) ->
RemoteScopeNode = node(RemoteScopePid),
error_logger:info_msg("SYN[~s|~s<~s>] Received DISCOVER request from node ~s",
[node(), HandlerLogName, Scope, RemoteScopeNode]
),
?LOG_INFO(#{msg => discover, from => RemoteScopeNode},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:?MODULE/1,
domain => [syn, gen_scope]
}),
%% send local data to remote
{ok, LocalData} = Handler:get_local_data(State),
send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State),
Expand All @@ -230,9 +236,13 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{
scope = Scope
} = State) ->
RemoteScopeNode = node(RemoteScopePid),
error_logger:info_msg("SYN[~s|~s<~s>] Received ACK SYNC (~w entries) from node ~s",
[node(), HandlerLogName, Scope, length(Data), RemoteScopeNode]
),
?LOG_INFO(#{msg => {ack_sync, Data}, from => RemoteScopeNode},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:?MODULE/1,
domain => [syn, gen_scope]
}),
%% save remote data
Handler:save_remote_data(Data, State),
%% is this a new node?
Expand Down Expand Up @@ -261,9 +271,16 @@ handle_info({'DOWN', MRef, process, Pid, Reason}, #state{
RemoteNode = node(Pid),
case maps:take(RemoteNode, NodesMap) of
{Pid, NodesMap1} ->
error_logger:info_msg("SYN[~s|~s<~s>] Scope Process is DOWN on node ~s: ~p",
[node(), HandlerLogName, Scope, RemoteNode, Reason]
),
?LOG_INFO(#{
msg => {down, Reason},
from => RemoteNode
},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:?MODULE/1,
domain => [syn, gen_scope]
}),
Handler:purge_local_data_for_node(RemoteNode, State),
{noreply, State#state{nodes_map = NodesMap1}};

Expand All @@ -280,9 +297,13 @@ handle_info({nodeup, RemoteNode}, #state{
handler_log_name = HandlerLogName,
scope = Scope
} = State) ->
error_logger:info_msg("SYN[~s|~s<~s>] Node ~s has joined the cluster, sending discover message",
[node(), HandlerLogName, Scope, RemoteNode]
),
?LOG_INFO(#{msg => nodeup, from => RemoteNode},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:?MODULE/1,
domain => [syn, gen_scope]
}),
send_to_node(RemoteNode, {'3.0', discover, self()}, State),
{noreply, State};

Expand All @@ -301,7 +322,13 @@ handle_continue(after_init, #state{
scope = Scope,
process_name = ProcessName
} = State) ->
error_logger:info_msg("SYN[~s|~s<~s>] Discovering the cluster", [node(), HandlerLogName, Scope]),
?LOG_INFO(#{msg => after_init},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:?MODULE/1,
domain => [syn, gen_scope]
}),
%% broadcasting is done in the scope process to avoid issues with ordering guarantees
lists:foreach(fun(RemoteNode) ->
{ProcessName, RemoteNode} ! {'3.0', discover, self()}
Expand All @@ -313,7 +340,13 @@ handle_continue(after_init, #state{
%% ----------------------------------------------------------------------------------------------------------
-spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), #state{}) -> any().
terminate(Reason, #state{handler_log_name = HandlerLogName, scope = Scope}) ->
error_logger:info_msg("SYN[~s|~s<~s>] Terminating with reason: ~p", [node(), HandlerLogName, Scope, Reason]).
?LOG_INFO(#{msg => {terminate, Reason}},
#{
handler_name => HandlerLogName,
scope => Scope,
report_cb => fun syn_logger:terminate/1,
domain => [syn, gen_scope]
}).

%% ----------------------------------------------------------------------------------------------------------
%% Convert process state when code is changed.
Expand Down
54 changes: 54 additions & 0 deletions src/syn_logger.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-module(syn_logger).

-export([syn_gen_scope/1,
terminate/1,
callback_error/1,
unknown_message/1,
scope/1,
conflict/1]).

syn_gen_scope(#{msg := discover, from := From}) ->
{"Received DISCOVER request from node ~s", [From]};
syn_gen_scope(#{msg := {ack_sync, Data}, from := From}) ->
{"Received ACK SYNC (~w entries) from node ~s", [length(Data), From]};
syn_gen_scope(#{msg := {down, Reason}, from := From}) ->
{"Scope Process is DOWN on node node ~s: ~p", [From, Reason]};
syn_gen_scope(#{msg := nodeup, from := From}) ->
{"Node ~s has joined the cluster, sending discover message", [From]};
syn_gen_scope(#{msg := after_init}) ->
{"Discover the cluster", []}.

terminate(#{msg := {terminate, Reason}}) ->
{"Terminating with reason: ~p", [Reason]}.

callback_error(#{class := Class,
reason := Reason,
mfa := {_, Func, _},
stacktrace := Stacktrace}) ->
{"Error ~p:~p in custom handler ~p: ~p", [Class, Reason, Func, Stacktrace]};
callback_error(#{class := Class,
reason := Reason,
fa := {Func, _},
stacktrace := Stacktrace}) ->
{"Error ~p:~p in custom handler ~p: ~p", [Class, Reason, Func, Stacktrace]}.

unknown_message(#{kind := down, pid := Pid, reason := Reason}) ->
{"Received a DOWN message from and unknown process ~p with reason: ~p",
[Pid, Reason]};
unknown_message(#{kind := call, from := From, msg := Msg}) ->
{"Received from ~p an unknown call message: ~p", [From, Msg]};
unknown_message(#{kind := Kind, msg := Msg}) ->
{"Received an unknown ~p message: ~p", [Kind, Msg]}.

scope(#{action := tables_created, new := Scope}) ->
{"Created tabled for scope <~s>", [Scope]};
scope(#{action := added, new := Scope}) ->
{"Added node to scope <~s>", [Scope]}.

conflict(#{name := Name, remote := Remote, local := Local, keep := {none, Pid}}) ->
{"Registry CONFLICT for name ~p: ~p vs ~p -> none chosen: ~p",
[Name, Remote, Local, Pid]};
conflict(#{name := Name, remote := Remote, local := Local, keep := Keep} = Msg) ->
#{Keep := {Pid, _}} = Msg,
{"Registry CONFLICT for name ~p: ~p vs ~p -> keeping ~s: ~p",
[Name, Remote, Local, Keep, Pid]}.
53 changes: 41 additions & 12 deletions src/syn_pg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
%% includes
-include("syn.hrl").

-include_lib("kernel/include/logger.hrl").

%% ===================================================================
%% API
%% ===================================================================
Expand Down Expand Up @@ -364,10 +366,16 @@ handle_call({'3.0', join_or_update_on_node, RequesterNode, GroupName, Pid, MetaO
do_join_on_node(GroupName, Pid, TableMeta, Meta, MRef, normal, RequesterNode, on_group_process_updated, State)

catch Class:Reason:Stacktrace ->
error_logger:error_msg(
"SYN[~s] Error ~p:~p in pg update function: ~p",
[node(), Class, Reason, Stacktrace]
),
?LOG_ERROR(#{
class => Class,
reason => Reason,
fa => {update, [TableMeta]},
stacktrace => Stacktrace
},
#{
report_cb => fun syn_logger:callback_error/1,
domain => [syn, pg]
}),
{reply, {raise, Class, Reason, Stacktrace}, State}
end;

Expand Down Expand Up @@ -407,9 +415,16 @@ handle_call({'3.0', leave_on_node, RequesterNode, GroupName, Pid}, _From, #state
end;

handle_call(Request, From, #state{scope = Scope} = State) ->
error_logger:warning_msg("SYN[~s|~s<~s>] Received from ~p an unknown call message: ~p",
[node(), ?MODULE_LOG_NAME, Scope, From, Request]
),
?LOG_WARNING(#{
kind => call,
from => From,
msg => Request
},
#{
scope => Scope,
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, pg]
}),
{reply, undefined, State}.

%% ----------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -456,10 +471,16 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
} = State) ->
case find_pg_entries_by_pid(Pid, TableByPid) of
[] ->
error_logger:warning_msg(
"SYN[~s|~s<~s>] Received a DOWN message from an unknown process ~p with reason: ~p",
[node(), ?MODULE_LOG_NAME, Scope, Pid, Reason]
);
?LOG_WARNING(#{
kind => down,
pid => Pid,
reason => Reason
},
#{
scope => Scope,
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, pg]
});

Entries ->
lists:foreach(fun({{_Pid, GroupName}, Meta, _, _, _}) ->
Expand All @@ -475,7 +496,15 @@ handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{
{noreply, State};

handle_info(Info, #state{scope = Scope} = State) ->
error_logger:warning_msg("SYN[~s|~s<~s>] Received an unknown info message: ~p", [node(), ?MODULE_LOG_NAME, Scope, Info]),
?LOG_WARNING(#{
kind => info,
msg => Info
},
#{
scope => Scope,
report_cb => fun syn_logger:unknown_message/1,
domain => [syn, pg]
}),
{noreply, State}.

%% ----------------------------------------------------------------------------------------------------------
Expand Down
Loading