2222 doc_count ,
2323 w ,
2424 grouped_docs ,
25- reply
25+ reply ,
26+ update_options ,
27+ leaders = [],
28+ started = []
2629}).
2730
2831go (_ , [], _ ) ->
@@ -33,25 +36,25 @@ go(DbName, AllDocs0, Opts) ->
3336 validate_atomic_update (DbName , AllDocs , lists :member (all_or_nothing , Opts )),
3437 Options = lists :delete (all_or_nothing , Opts ),
3538 GroupedDocs = lists :map (
36- fun ({# shard {name = Name , node = Node } = Shard , Docs }) ->
37- Docs1 = untag_docs (Docs ),
38- Ref = rexi :cast (Node , {fabric_rpc , update_docs , [Name , Docs1 , Options ]}),
39- {Shard # shard {ref = Ref }, Docs }
39+ fun ({# shard {} = Shard , Docs }) ->
40+ {Shard # shard {ref = make_ref ()}, Docs }
4041 end ,
4142 group_docs_by_shard (DbName , AllDocs )
4243 ),
4344 {Workers , _ } = lists :unzip (GroupedDocs ),
4445 RexiMon = fabric_util :create_monitors (Workers ),
4546 W = couch_util :get_value (w , Options , integer_to_list (mem3 :quorum (DbName ))),
4647 Acc0 = # acc {
48+ update_options = Options ,
4749 waiting_count = length (Workers ),
4850 doc_count = length (AllDocs ),
4951 w = list_to_integer (W ),
5052 grouped_docs = GroupedDocs ,
5153 reply = dict :new ()
5254 },
5355 Timeout = fabric_util :request_timeout (),
54- try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc0 , infinity , Timeout ) of
56+ Acc1 = start_leaders (Acc0 ),
57+ try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc1 , infinity , Timeout ) of
5558 {ok , {Health , Results }} when
5659 Health =:= ok ; Health =:= accepted ; Health =:= error
5760 ->
@@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) ->
7275 rexi_monitor :stop (RexiMon )
7376 end .
7477
75- handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _Worker , # acc {} = Acc0 ) ->
78+ handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, Worker , # acc {} = Acc0 ) ->
7679 # acc {grouped_docs = GroupedDocs } = Acc0 ,
7780 NewGrpDocs = [X || {# shard {node = N }, _ } = X <- GroupedDocs , N =/= NodeRef ],
78- skip_message (Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs });
81+ Acc1 = Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs },
82+ Acc2 = start_followers (Worker , Acc1 ),
83+ skip_message (Acc2 );
7984handle_message ({rexi_EXIT , _ }, Worker , # acc {} = Acc0 ) ->
8085 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
8186 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
82- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
87+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
88+ Acc2 = start_followers (Worker , Acc1 ),
89+ skip_message (Acc2 );
8390handle_message ({error , all_dbs_active }, Worker , # acc {} = Acc0 ) ->
8491 % treat it like rexi_EXIT, the hope at least one copy will return successfully
8592 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
8693 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
87- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
94+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
95+ Acc2 = start_followers (Worker , Acc1 ),
96+ skip_message (Acc2 );
8897handle_message (internal_server_error , Worker , # acc {} = Acc0 ) ->
8998 % happens when we fail to load validation functions in an RPC worker
9099 # acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
91100 NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
92- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
101+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
102+ Acc2 = start_followers (Worker , Acc1 ),
103+ skip_message (Acc2 );
93104handle_message (attachment_chunk_received , _Worker , # acc {} = Acc0 ) ->
94105 {ok , Acc0 };
95106handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
96107 # acc {
97- waiting_count = WaitingCount ,
98108 doc_count = DocCount ,
99109 w = W ,
100110 grouped_docs = GroupedDocs ,
101111 reply = DocReplyDict0
102112 } = Acc0 ,
103- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
104- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
105- case {WaitingCount , dict :size (DocReplyDict )} of
106- {1 , _ } ->
107- % last message has arrived, we need to conclude things
108- {Health , W , Reply } = dict :fold (
109- fun force_reply /3 ,
110- {ok , W , []},
111- DocReplyDict
112- ),
113- {stop , {Health , Reply }};
114- {_ , DocCount } ->
115- % we've got at least one reply for each document, let's take a look
116- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
117- continue ->
118- {ok , Acc0 # acc {
119- waiting_count = WaitingCount - 1 ,
120- grouped_docs = NewGrpDocs ,
121- reply = DocReplyDict
122- }};
123- {stop , W , FinalReplies } ->
124- {stop , {ok , FinalReplies }}
125- end ;
126- _ ->
127- {ok , Acc0 # acc {
128- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
129- }}
113+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
114+ IsLeader = lists :member (Worker # shard .ref , Acc0 # acc .leaders ),
115+ DocReplyDict = append_update_replies (Docs , Replies , W , IsLeader , DocReplyDict0 ),
116+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
117+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
118+ NewGrpDocs = Acc2 # acc .grouped_docs ,
119+ case skip_message (Acc2 ) of
120+ {stop , Msg } ->
121+ {stop , Msg };
122+ {ok , Acc3 } ->
123+ Acc4 = start_followers (Worker , Acc3 ),
124+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
125+ {1 , _ } ->
126+ % last message has arrived, we need to conclude things
127+ {Health , W , Reply } = dict :fold (
128+ fun force_reply /3 ,
129+ {ok , W , []},
130+ DocReplyDict
131+ ),
132+ {stop , {Health , Reply }};
133+ {_ , DocCount } ->
134+ % we've got at least one reply for each document, let's take a look
135+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
136+ continue ->
137+ {ok , Acc4 # acc {
138+ waiting_count = Acc4 # acc .waiting_count - 1 ,
139+ grouped_docs = NewGrpDocs
140+ }};
141+ {stop , W , FinalReplies } ->
142+ {stop , {ok , FinalReplies }}
143+ end ;
144+ _ ->
145+ {ok , Acc4 # acc {
146+ waiting_count = Acc4 # acc .waiting_count - 1 ,
147+ grouped_docs = NewGrpDocs
148+ }}
149+ end
130150 end ;
131151handle_message ({missing_stub , Stub }, _ , _ ) ->
132152 throw ({missing_stub , Stub });
@@ -318,13 +338,91 @@ group_docs_by_shard(DbName, Docs) ->
318338 )
319339 ).
320340
321- append_update_replies ([], [], DocReplyDict ) ->
341+ % % use 'lowest' node that hosts this shard range as leader
342+ is_leader (Worker , Workers ) ->
343+ Worker # shard .node ==
344+ lists :min ([W # shard .node || W <- Workers , W # shard .range == Worker # shard .range ]).
345+
346+ start_leaders (# acc {} = Acc0 ) ->
347+ # acc {grouped_docs = GroupedDocs } = Acc0 ,
348+ {Workers , _ } = lists :unzip (GroupedDocs ),
349+ LeaderRefs = lists :foldl (
350+ fun ({Worker , Docs }, RefAcc ) ->
351+ case is_leader (Worker , Workers ) of
352+ true ->
353+ start_worker (Worker , Docs , Acc0 ),
354+ [Worker # shard .ref | RefAcc ];
355+ false ->
356+ RefAcc
357+ end
358+ end ,
359+ [],
360+ GroupedDocs
361+ ),
362+ Acc0 # acc {leaders = LeaderRefs , started = LeaderRefs }.
363+
364+ start_followers (# shard {} = Leader , # acc {} = Acc0 ) ->
365+ Followers = [
366+ {Worker , Docs }
367+ || {Worker , Docs } <- Acc0 # acc .grouped_docs ,
368+ Worker # shard .range == Leader # shard .range ,
369+ not lists :member (Worker # shard .ref , Acc0 # acc .started )
370+ ],
371+ lists :foreach (
372+ fun ({Worker , Docs }) ->
373+ start_worker (Worker , Docs , Acc0 )
374+ end ,
375+ Followers
376+ ),
377+ Started = [Ref || {# shard {ref = Ref }, _Docs } <- Followers ],
378+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
379+
380+ start_worker (# shard {ref = Ref } = Worker , Docs , # acc {} = Acc0 ) when is_reference (Ref ) ->
381+ # shard {name = Name , node = Node } = Worker ,
382+ # acc {update_options = UpdateOptions } = Acc0 ,
383+ rexi :cast_ref (Ref , Node , {fabric_rpc , update_docs , [Name , untag_docs (Docs ), UpdateOptions ]}),
384+ ok ;
385+ start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
386+ % for unit tests below.
387+ ok .
388+
389+ append_update_replies ([], [], _W , _IsLeader , DocReplyDict ) ->
322390 DocReplyDict ;
323- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
391+ append_update_replies ([Doc | Rest ], [], W , IsLeader , Dict0 ) ->
324392 % icky, if replicated_changes only errors show up in result
325- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
326- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
327- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
393+ append_update_replies (Rest , [], W , IsLeader , dict :append (Doc , noreply , Dict0 ));
394+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , true , Dict0 ) ->
395+ % % fake conflict replies from followers as we won't ask them
396+ append_update_replies (
397+ Rest1 , Rest2 , W , true , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
398+ );
399+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , IsLeader , Dict0 ) ->
400+ append_update_replies (Rest1 , Rest2 , W , IsLeader , dict :append (Doc , Reply , Dict0 )).
401+
402+ % % leader found a conflict, remove that doc from the other (follower) workers,
403+ % % removing the worker entirely if no docs remain.
404+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
405+ Acc0 ;
406+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
407+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
408+ GroupedDocs1 = lists :foldl (
409+ fun ({Worker , Docs }, FoldAcc ) ->
410+ case lists :delete (Doc , Docs ) of
411+ [] ->
412+ FoldAcc # acc {waiting_count = FoldAcc # acc .waiting_count - 1 };
413+ Rest ->
414+ [{Worker , Rest } | FoldAcc ]
415+ end
416+ end ,
417+ [],
418+ GroupedDocs0
419+ ),
420+ Acc1 = Acc0 # acc {grouped_docs = GroupedDocs1 },
421+ remove_conflicts (DocRest , ReplyRest , Acc1 );
422+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
423+ remove_conflicts (DocRest , ReplyRest , Acc0 );
424+ remove_conflicts ([_Doc | DocRest ], [], # acc {} = Acc0 ) ->
425+ remove_conflicts (DocRest , [], Acc0 ).
328426
329427skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
330428 {Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments