Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -592,26 +592,35 @@ last_written(#?MODULE{last_written_index_term = LWTI}) ->
%% forces the last index and last written index back to a prior index
-spec set_last_index(ra_index(), state()) ->
{ok, state()} | {not_found, state()}.
set_last_index(Idx, #?MODULE{cfg = Cfg,
last_written_index_term = {LWIdx0, _}} = State0) ->
set_last_index(Idx, State0) ->
case fetch_term(Idx, State0) of
{undefined, State} ->
{not_found, State};
{Term, State1} ->
LWIdx = min(Idx, LWIdx0),
{LWTerm, State2} = fetch_term(LWIdx, State1),
%% this should always be found but still assert just in case
%% _if_ this ends up as a genuine reversal next time we try
%% to write to the mem table it will detect this and open
%% a new one
true = LWTerm =/= undefined,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx),
{ok, State2#?MODULE{last_index = Idx,
last_term = Term,
last_written_index_term = {LWIdx, LWTerm}}}
case snapshot_index_term(State) of
{Idx, SnapTerm} ->
set_last_index0(Idx, SnapTerm, State);
_ ->
{not_found, State}
end;
{Term, State} ->
set_last_index0(Idx, Term, State)
end.

set_last_index0(Idx, Term,
#?MODULE{cfg = Cfg,
last_written_index_term = {LWIdx0, _}} = State0) ->
LWIdx = min(Idx, LWIdx0),
{LWTerm, State1} = fetch_term(LWIdx, State0),
%% this should always be found but still assert just in case
%% _if_ this ends up as a genuine reversal next time we try
%% to write to the mem table it will detect this and open
%% a new one
true = LWTerm =/= undefined,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx),
{ok, State1#?MODULE{last_index = Idx,
last_term = Term,
last_written_index_term = {LWIdx, LWTerm}}}.

-spec handle_event(event_body(), state()) ->
{state(), [effect()]}.
handle_event({written, _Term, {FromIdx, _ToIdx}},
Expand Down
23 changes: 17 additions & 6 deletions test/ra_log_memory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,28 @@ last_index_term(#state{last_index = LastIdx,

-spec set_last_index(ra_index(), ra_log_memory_state()) ->
{ok, ra_log_memory_state()} | {not_found, ra_log_memory_state()}.
set_last_index(Idx, #state{last_written = {LWIdx, _}} = State0) ->
set_last_index(Idx, State0) ->
case fetch_term(Idx, State0) of
{undefined, State} ->
{not_found, State};
{Term, State1} when Idx < LWIdx ->
case snapshot_index_term(State) of
{Idx, SnapTerm} ->
set_last_index0(Idx, SnapTerm, State);
_ ->
{not_found, State}
end;
{Term, State} ->
set_last_index0(Idx, Term, State)
end.

set_last_index0(Idx, Term, #state{last_written = {LWIdx, _}} = State0) ->
case Idx < LWIdx of
true ->
%% need to revert last_written too
State = State1#state{last_index = Idx,
State = State0#state{last_index = Idx,
last_written = {Idx, Term}},
{ok, State};
{_, State1} ->
State = State1#state{last_index = Idx},
false ->
State = State0#state{last_index = Idx},
{ok, State}
end.

Expand Down
57 changes: 57 additions & 0 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ all() ->
election_timeout,
follower_aer_diverged,
follower_aer_term_mismatch,
follower_aer_term_mismatch_at_snapshot,
follower_aer_term_mismatch_snapshot,
follower_handles_append_entries_rpc,
candidate_handles_append_entries_rpc,
Expand Down Expand Up @@ -710,6 +711,62 @@ follower_aer_term_mismatch(_Config) ->
last_term = 3}, Reply),
ok.

follower_aer_term_mismatch_at_snapshot(_Config) ->
%% case where the last correct entry in the log is the snapshot
State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3,
commit_index => 3
},
Log0 = maps:get(log, State0),
Meta = #{index => 3,
term => 5,
cluster => #{},
machine_version => 1},
Data = <<"hi3">>,
{Log,_} = ra_log_memory:install_snapshot({3, 5}, {Meta, Data}, Log0),
State = maps:put(log, Log, State0),

%% append entries from the current leader in the current term
AER = #append_entries_rpc{term = 5,
leader_id = ?N1,
prev_log_index = 3,
prev_log_term = 5, % same log term
leader_commit = 3,
entries = [
{4, 5, usr(<<"hi4">>)},
{5, 5, usr(<<"hi4">>)},
{6, 5, usr(<<"hi4">>)}
]},
{follower, State1, _} = ra_server:handle_follower(AER, State),
?assertMatch(#{last_applied := 3,
commit_index := 3}, State1),
{follower, State2,
[{cast, _, {_, #append_entries_reply{term = 5,
success = true,
next_index = 7}}}]}
= ra_server:handle_follower(written_evt(5, {4, 6}), State1),

%% a new leader deposes the old one and the uncommitted entries must be
%% truncated down to the snapshot index
AER1 = #append_entries_rpc{term = 6, % higher term
leader_id = ?N2,
prev_log_index = 3,
prev_log_term = 5, % same log term
leader_commit = 3,
entries = []},

% term mismatch scenario follower has index 3 but for different term
% rewinds back to last_applied + 1 as next index and enters await condition
{follower, State3,
[{cast, _, {_, #append_entries_reply{term = 6,
success = true,
next_index = 4,
last_index = 3,
last_term = 5}}} | _]}
= ra_server:handle_follower(AER1, State2),
?assertMatch(#{last_applied := 3,
commit_index := 3}, State3),
ok.

follower_aer_term_mismatch_snapshot(_Config) ->
%% case when we have to revert all the way back to a snapshot
State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3,
Expand Down