Skip to content

Commit 784574b

Browse files
authored
Merge pull request #5711 from apache/auto-purge-batching
Introduce a minimum batch size for auto purge
2 parents 0bedebf + e1313df commit 784574b

File tree

3 files changed

+151
-19
lines changed

3 files changed

+151
-19
lines changed

rel/overlay/etc/default.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,9 @@ url = {{nouveau_url}}
11951195
;[couch_quickjs_scanner_plugin.skip_{dbs,ddoc,docs}]
11961196

11971197
[couch_auto_purge_plugin]
1198+
; The fewest id/rev pairs the plugin will attempt to purge in
1199+
; one request, excepting at the end of a database scan.
1200+
;min_batch_size = 250
11981201
; The most id/rev pairs the plugin will attempt to purge in
11991202
; one request.
12001203
;max_batch_size = 500

src/couch/src/couch_auto_purge_plugin.erl

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ db_opened(#{} = St, Db) ->
6464
{0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.
6565

6666
db_closing(#{} = St, Db) ->
67-
#{count := Count} = St,
67+
#{count := Count} = flush_queue(St, Db),
6868
?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)),
6969
{ok, St}.
7070

@@ -73,47 +73,72 @@ doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
7373
?assert(
7474
FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq"
7575
),
76-
{ok, purge(St, FDI, Db)};
76+
{ok, enqueue(St, FDI, Db)};
7777
doc_fdi(#{} = St, #full_doc_info{}, _Db) ->
7878
{ok, St}.
7979

80-
purge(#{} = St, #full_doc_info{} = FDI, Db) ->
80+
enqueue(#{} = St, FDI, Db) ->
8181
{Id, Revs} = fdi_to_idrevs(FDI),
82-
MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500),
83-
purge(St, Id, Revs, MaxBatchSize, Db).
82+
enqueue(St, Id, Revs, Db).
8483

85-
purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize ->
84+
enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
85+
CurrentQueueSize = queue_size(Queue),
86+
NewQueueSize = CurrentQueueSize + length(Revs),
87+
MinBatchSize = min_batch_size(),
88+
MaxBatchSize = max_batch_size(),
89+
if
90+
NewQueueSize > MaxBatchSize ->
91+
{RevBatch, RevRest} = lists:split(MaxBatchSize - CurrentQueueSize, Revs),
92+
St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
93+
enqueue(St1, Id, RevRest, Db);
94+
NewQueueSize >= MinBatchSize ->
95+
flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
96+
NewQueueSize < MinBatchSize ->
97+
St0#{queue => [{Id, Revs} | Queue]}
98+
end.
99+
100+
flush_queue(#{queue := []} = St, _Db) ->
101+
St;
102+
flush_queue(#{queue := IdRevs} = St, Db) ->
86103
DbName = mem3:dbname(couch_db:name(Db)),
87-
PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end,
104+
PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
88105
Timeout = fabric_util:request_timeout(),
89106
try fabric_util:isolate(PurgeFun, Timeout) of
90107
{Health, Results} when Health == ok; Health == accepted ->
108+
?DEBUG(
109+
"flushed batch of ~B idrevs from ~s",
110+
[queue_size(IdRevs), couch_db:name(Db)],
111+
meta(St)
112+
),
91113
#{count := Count, limiter := Limiter0} = St,
92114
{Wait, Limiter1} = couch_scanner_rate_limiter:update(
93115
Limiter0, doc_write, length(Results)
94116
),
95117
timer:sleep(Wait),
96-
St#{count => Count + length(Results), limiter => Limiter1};
118+
St#{
119+
count => Count + length(Results),
120+
limiter => Limiter1,
121+
queue => []
122+
};
97123
Else ->
98124
?WARN(
99-
"Failed to purge deleted documents in ~s/~s for reason ~p",
100-
[DbName, Id, Else],
125+
"Failed to purge deleted documents in ~s for reason ~p",
126+
[DbName, Else],
101127
meta(St)
102128
),
103129
St
104130
catch
105131
Class:Reason ->
106132
?WARN(
107-
"Failed to purge deleted documents in ~s/~s for reason ~p:~p",
108-
[DbName, Id, Class, Reason],
133+
"Failed to purge deleted documents in ~s for reason ~p:~p",
134+
[DbName, Class, Reason],
109135
meta(St)
110136
),
111137
St
112-
end;
113-
purge(#{} = St0, Id, Revs, MaxBatchSize, Db) ->
114-
{RevBatch, RevRest} = lists:split(MaxBatchSize, Revs),
115-
St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db),
116-
purge(St1, Id, RevRest, MaxBatchSize, Db).
138+
end.
139+
140+
queue_size(Queue) when is_list(Queue) ->
141+
lists:sum([length(Revs) || {_Id, Revs} <- Queue]).
117142

118143
fdi_to_idrevs(#full_doc_info{} = FDI) ->
119144
Revs = [
@@ -123,7 +148,11 @@ fdi_to_idrevs(#full_doc_info{} = FDI) ->
123148
{FDI#full_doc_info.id, Revs}.
124149

125150
init_config(ScanId) ->
126-
#{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}.
151+
#{
152+
sid => ScanId,
153+
limiter => couch_scanner_rate_limiter:get(),
154+
queue => []
155+
}.
127156

128157
meta(#{sid := ScanId}) ->
129158
#{sid => ScanId}.
@@ -174,3 +203,9 @@ parse_ttl([$- | TTL]) ->
174203
-(parse_ttl(TTL));
175204
parse_ttl(TTL) ->
176205
couch_scanner_util:parse_non_weekday_period(TTL).
206+
207+
min_batch_size() ->
208+
erlang:max(1, config:get_integer(atom_to_list(?MODULE), "min_batch_size", 250)).
209+
210+
max_batch_size() ->
211+
erlang:max(min_batch_size(), config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500)).

src/couch/test/eunit/couch_auto_purge_plugin_tests.erl

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ couch_quickjs_scanner_plugin_test_() ->
2525
[
2626
?TDEF_FE(t_no_auto_purge_by_default, 10),
2727
?TDEF_FE(t_auto_purge_after_config_ttl, 10),
28-
?TDEF_FE(t_auto_purge_after_db_ttl, 10)
28+
?TDEF_FE(t_auto_purge_after_db_ttl, 10),
29+
?TDEF_FE(t_min_batch_size_1, 10),
30+
?TDEF_FE(t_min_batch_size_2, 10),
31+
?TDEF_FE(t_max_batch_size_1, 10),
32+
?TDEF_FE(t_max_batch_size_2, 10)
2933
]
3034
}.
3135

@@ -83,6 +87,92 @@ t_auto_purge_after_db_ttl({_, DbName}) ->
8387
?assertEqual(0, doc_del_count(DbName)),
8488
ok.
8589

90+
t_min_batch_size_1({_, DbName}) ->
91+
meck:new(fabric, [passthrough]),
92+
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 5),
93+
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
94+
[
95+
add_doc(DbName, <<"doc", (integer_to_binary(I))/binary>>, #{<<"_deleted">> => true})
96+
|| I <- lists:seq(1, 10)
97+
],
98+
?assertEqual(10, doc_del_count(DbName)),
99+
meck:reset(couch_scanner_server),
100+
meck:reset(?PLUGIN),
101+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
102+
wait_exit(10000),
103+
?assertEqual(2, meck:num_calls(fabric, purge_docs, '_')),
104+
?assertEqual(0, doc_del_count(DbName)),
105+
ok.
106+
107+
t_min_batch_size_2({_, DbName}) ->
108+
meck:new(fabric, [passthrough]),
109+
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 5),
110+
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
111+
[
112+
add_doc(DbName, <<"doc", (integer_to_binary(I))/binary>>, #{<<"_deleted">> => true})
113+
|| I <- lists:seq(1, 11)
114+
],
115+
?assertEqual(11, doc_del_count(DbName)),
116+
meck:reset(couch_scanner_server),
117+
meck:reset(?PLUGIN),
118+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
119+
wait_exit(10000),
120+
?assertEqual(4, meck:num_calls(fabric, purge_docs, '_')),
121+
?assertEqual(0, doc_del_count(DbName)),
122+
ok.
123+
124+
t_max_batch_size_1({_, DbName}) ->
125+
meck:new(fabric, [passthrough]),
126+
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 1),
127+
config:set_integer(atom_to_list(?PLUGIN), "max_batch_size", 5),
128+
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
129+
[
130+
add_replicated_doc(
131+
DbName,
132+
<<"doc">>,
133+
#{
134+
<<"_rev">> => <<"1-", (couch_uuids:random())/binary>>,
135+
<<"foo">> => I,
136+
<<"_deleted">> => true
137+
}
138+
)
139+
|| I <- lists:seq(1, 10)
140+
],
141+
?assertEqual(1, doc_del_count(DbName)),
142+
meck:reset(couch_scanner_server),
143+
meck:reset(?PLUGIN),
144+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
145+
wait_exit(10000),
146+
?assertEqual(2, meck:num_calls(fabric, purge_docs, '_')),
147+
?assertEqual(0, doc_del_count(DbName)),
148+
ok.
149+
150+
t_max_batch_size_2({_, DbName}) ->
151+
meck:new(fabric, [passthrough]),
152+
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 1),
153+
config:set_integer(atom_to_list(?PLUGIN), "max_batch_size", 5),
154+
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
155+
[
156+
add_replicated_doc(
157+
DbName,
158+
<<"doc">>,
159+
#{
160+
<<"_rev">> => <<"1-", (couch_uuids:random())/binary>>,
161+
<<"foo">> => I,
162+
<<"_deleted">> => true
163+
}
164+
)
165+
|| I <- lists:seq(1, 11)
166+
],
167+
?assertEqual(1, doc_del_count(DbName)),
168+
meck:reset(couch_scanner_server),
169+
meck:reset(?PLUGIN),
170+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
171+
wait_exit(10000),
172+
?assertEqual(3, meck:num_calls(fabric, purge_docs, '_')),
173+
?assertEqual(0, doc_del_count(DbName)),
174+
ok.
175+
86176
reset_stats() ->
87177
Counters = [
88178
[couchdb, query_server, process_error_exits],
@@ -106,6 +196,10 @@ add_doc(DbName, DocId, Body) ->
106196
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]),
107197
ok.
108198

199+
add_replicated_doc(DbName, DocId, Body) ->
200+
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX, ?REPLICATED_CHANGES]),
201+
ok.
202+
109203
mkdoc(Id, #{} = Body) ->
110204
Body1 = Body#{<<"_id">> => Id},
111205
jiffy:decode(jiffy:encode(Body1)).

0 commit comments

Comments
 (0)