@@ -64,7 +64,7 @@ db_opened(#{} = St, Db) ->
6464 {0 , ChangeOpts , St #{count => 0 , end_seq => EndSeq }}.
6565
6666db_closing (#{} = St , Db ) ->
67- #{count := Count } = St ,
67+ #{count := Count } = flush_queue ( St , Db ) ,
6868 ? INFO (" purged ~B deleted documents from ~s " , [Count , couch_db :name (Db )], meta (St )),
6969 {ok , St }.
7070
@@ -73,47 +73,67 @@ doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
7373 ? assert (
7474 FDI # full_doc_info .update_seq =< EndSeq , " FDI update_seq should not be greater than end seq"
7575 ),
76- {ok , purge (St , FDI , Db )};
76+ {ok , enqueue (St , FDI , Db )};
7777doc_fdi (#{} = St , # full_doc_info {}, _Db ) ->
7878 {ok , St }.
7979
80- purge (#{} = St , # full_doc_info {} = FDI , Db ) ->
80+ enqueue (#{} = St , FDI , Db ) ->
8181 {Id , Revs } = fdi_to_idrevs (FDI ),
82- MaxBatchSize = config :get_integer (atom_to_list (? MODULE ), " max_batch_size" , 500 ),
83- purge (St , Id , Revs , MaxBatchSize , Db ).
82+ enqueue (St , Id , Revs , Db ).
8483
85- purge (#{} = St , Id , Revs , MaxBatchSize , Db ) when length (Revs ) =< MaxBatchSize ->
84+ enqueue (#{queue := Queue } = St0 , Id , Revs , Db ) ->
85+ CurrentQueueSize = queue_size (Queue ),
86+ NewQueueSize = CurrentQueueSize + length (Revs ),
87+ MinBatchSize = min_batch_size (),
88+ MaxBatchSize = max_batch_size (),
89+
90+ if
91+ NewQueueSize > MaxBatchSize ->
92+ {RevBatch , RevRest } = lists :split (NewQueueSize - MaxBatchSize , Revs ),
93+ St1 = flush_queue (St0 #{queue => [{Id , RevBatch } | Queue ]}, Db ),
94+ enqueue (St1 , Id , RevRest , Db );
95+ NewQueueSize >= MinBatchSize ->
96+ flush_queue (St0 #{queue => [{Id , Revs } | Queue ]}, Db );
97+ NewQueueSize < MinBatchSize ->
98+ St0 #{queue => [{Id , Revs } | Queue ]}
99+ end .
100+
101+ flush_queue (#{queue := IdRevs } = St , Db ) ->
86102 DbName = mem3 :dbname (couch_db :name (Db )),
87- PurgeFun = fun () -> fabric :purge_docs (DbName , [{ Id , Revs }] , [? ADMIN_CTX ]) end ,
103+ PurgeFun = fun () -> fabric :purge_docs (DbName , IdRevs , [? ADMIN_CTX ]) end ,
88104 Timeout = fabric_util :request_timeout (),
89105 try fabric_util :isolate (PurgeFun , Timeout ) of
90106 {Health , Results } when Health == ok ; Health == accepted ->
107+ ? DEBUG (" flushed batch of ~B idrevs from ~s " , [queue_size (IdRevs ), couch_db :name (Db )], meta (St )),
91108 #{count := Count , limiter := Limiter0 } = St ,
92109 {Wait , Limiter1 } = couch_scanner_rate_limiter :update (
93110 Limiter0 , doc_write , length (Results )
94111 ),
95112 timer :sleep (Wait ),
96- St #{count => Count + length (Results ), limiter => Limiter1 };
113+ St #{
114+ count => Count + length (Results ),
115+ limiter => Limiter1 ,
116+ queue => []
117+ };
97118 Else ->
98119 ? WARN (
99- " Failed to purge deleted documents in ~s / ~s for reason ~p " ,
100- [DbName , Id , Else ],
120+ " Failed to purge deleted documents in ~s for reason ~p " ,
121+ [DbName , Else ],
101122 meta (St )
102123 ),
103124 St
104125 catch
105126 Class :Reason ->
106127 ? WARN (
107- " Failed to purge deleted documents in ~s / ~s for reason ~p :~p " ,
108- [DbName , Id , Class , Reason ],
128+ " Failed to purge deleted documents in ~s for reason ~p :~p " ,
129+ [DbName , Class , Reason ],
109130 meta (St )
110131 ),
111132 St
112- end ;
113- purge (#{} = St0 , Id , Revs , MaxBatchSize , Db ) ->
114- {RevBatch , RevRest } = lists :split (MaxBatchSize , Revs ),
115- St1 = purge (St0 , Id , RevBatch , MaxBatchSize , Db ),
116- purge (St1 , Id , RevRest , MaxBatchSize , Db ).
133+ end .
134+
135+ queue_size (Queue ) when is_list (Queue ) ->
136+ lists :sum ([length (Revs ) || {_Id , Revs } <- Queue ]).
117137
118138fdi_to_idrevs (# full_doc_info {} = FDI ) ->
119139 Revs = [
@@ -123,7 +143,11 @@ fdi_to_idrevs(#full_doc_info{} = FDI) ->
123143 {FDI # full_doc_info .id , Revs }.
124144
125145init_config (ScanId ) ->
126- #{sid => ScanId , limiter => couch_scanner_rate_limiter :get ()}.
146+ #{
147+ sid => ScanId ,
148+ limiter => couch_scanner_rate_limiter :get (),
149+ queue => []
150+ }.
127151
128152meta (#{sid := ScanId }) ->
129153 #{sid => ScanId }.
@@ -174,3 +198,9 @@ parse_ttl([$- | TTL]) ->
174198 - (parse_ttl (TTL ));
175199parse_ttl (TTL ) ->
176200 couch_scanner_util :parse_non_weekday_period (TTL ).
201+
202+ min_batch_size () ->
203+ erlang :max (1 , config :get_integer (atom_to_list (? MODULE ), " min_batch_size" , 100 )).
204+
205+ max_batch_size () ->
206+ erlang :max (min_batch_size (), config :get_integer (atom_to_list (? MODULE ), " max_batch_size" , 500 )).
0 commit comments