diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 77b424911a6..1f5755de09e 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -17,6 +17,14 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +-record(acc, { + waiting_count, + doc_count, + w, + grouped_docs, + reply +}). + go(_, [], _) -> {ok, []}; go(DbName, AllDocs0, Opts) -> @@ -35,7 +43,13 @@ go(DbName, AllDocs0, Opts) -> {Workers, _} = lists:unzip(GroupedDocs), RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), - Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, dict:new()}, + Acc0 = #acc{ + waiting_count = length(Workers), + doc_count = length(AllDocs), + w = list_to_integer(W), + grouped_docs = GroupedDocs, + reply = dict:new() + }, Timeout = fabric_util:request_timeout(), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of {ok, {Health, Results}} when @@ -43,7 +57,7 @@ go(DbName, AllDocs0, Opts) -> -> ensure_all_responses(Health, AllDocs, Results); {timeout, Acc} -> - {_, _, W1, GroupedDocs1, DocReplDict} = Acc, + #acc{w = W1, grouped_docs = GroupedDocs1, reply = DocReplDict} = Acc, {DefunctWorkers, _} = lists:unzip(GroupedDocs1), fabric_util:log_timeout(DefunctWorkers, "update_docs"), {Health, _, Resp} = dict:fold( @@ -58,28 +72,34 @@ go(DbName, AllDocs0, Opts) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, Acc0) -> - {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs} = Acc0, NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message({rexi_EXIT, _}, Worker, Acc0) -> - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, + skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}); +handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) -> + #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message({error, all_dbs_active}, Worker, Acc0) -> + skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); +handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) -> % treat it like rexi_EXIT, the hope at least one copy will return successfully - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, + #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message(internal_server_error, Worker, Acc0) -> + skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); +handle_message(internal_server_error, Worker, #acc{} = Acc0) -> % happens when we fail to load validation functions in an RPC worker - {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, + #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); -handle_message(attachment_chunk_received, _Worker, Acc0) -> + skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); +handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) -> {ok, Acc0}; -handle_message({ok, Replies}, Worker, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, +handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> + #acc{ + waiting_count = WaitingCount, + doc_count = DocCount, + w = W, + grouped_docs = GroupedDocs, + reply = DocReplyDict0 + } = Acc0, {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of @@ -95,17 +115,23 @@ handle_message({ok, Replies}, Worker, Acc0) -> % we've got at least one reply for each document, let's take a look case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of continue -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; + {ok, Acc0#acc{ + waiting_count = WaitingCount - 1, + grouped_docs = NewGrpDocs, + reply = DocReplyDict + }}; {stop, W, FinalReplies} -> {stop, {ok, FinalReplies}} end; _ -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}} + {ok, Acc0#acc{ + waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict + }} end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); -handle_message({not_found, no_db_file} = X, Worker, Acc0) -> - {_, _, _, GroupedDocs, _} = Acc0, +handle_message({not_found, no_db_file} = X, Worker, #acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs} = Acc0, Docs = couch_util:get_value(Worker, GroupedDocs), handle_message({ok, [X || _D <- Docs]}, Worker, Acc0); handle_message({bad_request, Msg}, _, _) -> @@ -300,10 +326,10 @@ append_update_replies([Doc | Rest], [], Dict0) -> append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). -skip_message({0, _, W, _, DocReplyDict}) -> +skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) -> {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), {stop, {Health, Reply}}; -skip_message(Acc0) -> +skip_message(#acc{} = Acc0) -> {ok, Acc0}. validate_atomic_update(_, _, false) -> @@ -372,9 +398,15 @@ doc_update1() -> GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), % test for W = 2 - AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, Dict}, + AccW2 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = Dict + }, - {ok, {WaitingCountW2_1, _, _, _, _} = AccW2_1} = + {ok, #acc{waiting_count = WaitingCountW2_1} = AccW2_1} = handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW2), ?assertEqual(WaitingCountW2_1, 2), {stop, FinalReplyW2} = @@ -382,13 +414,19 @@ doc_update1() -> ?assertEqual({ok, [{Doc1, {ok, Doc1}}]}, FinalReplyW2), % test for W = 3 - AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, Dict}, + AccW3 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 3, + grouped_docs = GroupedDocs, + reply = Dict + }, - {ok, {WaitingCountW3_1, _, _, _, _} = AccW3_1} = + {ok, #acc{waiting_count = WaitingCountW3_1} = AccW3_1} = handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW3), ?assertEqual(WaitingCountW3_1, 2), - {ok, {WaitingCountW3_2, _, _, _, _} = AccW3_2} = + {ok, #acc{waiting_count = WaitingCountW3_2} = AccW3_2} = handle_message({ok, [{ok, Doc1}]}, lists:nth(2, Shards), AccW3_1), ?assertEqual(WaitingCountW3_2, 1), @@ -402,7 +440,13 @@ doc_update1() -> GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>, Shards2, Docs), AccW4 = - {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, + #acc{ + waiting_count = length(Shards2), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs2, + reply = Dict + }, Bool = case handle_message({ok, [{ok, Doc1}]}, hd(Shards2), AccW4) of {stop, _Reply} -> @@ -418,7 +462,13 @@ doc_update1() -> SA2 = #shard{node = a, range = 2}, SB2 = #shard{node = b, range = 2}, GroupedDocs3 = [{SA1, [Doc1]}, {SB1, [Doc1]}, {SA2, [Doc2]}, {SB2, [Doc2]}], - StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, + StW5_0 = #acc{ + waiting_count = length(GroupedDocs3), + doc_count = length(Docs2), + w = 2, + grouped_docs = GroupedDocs3, + reply = Dict2 + }, {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), @@ -435,19 +485,19 @@ doc_update2() -> Shards = mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -466,19 +516,19 @@ doc_update3() -> Shards = mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -494,19 +544,19 @@ handle_all_dbs_active() -> Shards = mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -522,19 +572,19 @@ handle_two_all_dbs_actives() -> Shards = mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({error, all_dbs_active}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -554,19 +604,19 @@ one_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, noreply]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message( {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 ), @@ -591,19 +641,19 @@ two_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, noreply]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message( {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 ), @@ -633,19 +683,19 @@ extend_tree_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message( {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, lists:nth(2, Shards), Acc1 ), @@ -664,19 +714,19 @@ other_errors_one_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"foo">>}}]}, hd(Shards), Acc0), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"bar">>}}]}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -694,21 +744,21 @@ one_error_two_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message( {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, hd(Shards), Acc0 ), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({ok, [{ok, Doc1}, {Doc2, {error, <<"foo">>}}]}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -728,21 +778,21 @@ one_success_two_forbid() -> mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc0 = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc0 = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, - {ok, {WaitingCount1, _, _, _, _} = Acc1} = + {ok, #acc{waiting_count = WaitingCount1} = Acc1} = handle_message( {ok, [{ok, Doc1}, {Doc2, {forbidden, <<"not allowed">>}}]}, hd(Shards), Acc0 ), ?assertEqual(WaitingCount1, 2), - {ok, {WaitingCount2, _, _, _, _} = Acc2} = + {ok, #acc{waiting_count = WaitingCount2} = Acc2} = handle_message({ok, [{ok, Doc1}, {Doc2, {ok, Doc2}}]}, lists:nth(2, Shards), Acc1), ?assertEqual(WaitingCount2, 1), @@ -760,12 +810,12 @@ worker_before_doc_update_forbidden() -> Shards = mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), - Acc = { - length(Shards), - length(Docs), - list_to_integer("2"), - GroupedDocs, - dict:from_list([{Doc, []} || Doc <- Docs]) + Acc = #acc{ + waiting_count = length(Shards), + doc_count = length(Docs), + w = 2, + grouped_docs = GroupedDocs, + reply = dict:from_list([{Doc, []} || Doc <- Docs]) }, ?assertThrow({forbidden, <<"msg">>}, handle_message({forbidden, <<"msg">>}, hd(Shards), Acc)).