Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,9 @@ url = {{nouveau_url}}
;[couch_quickjs_scanner_plugin.skip_{dbs,ddoc,docs}]

[couch_auto_purge_plugin]
; The fewest id/rev pairs the plugin will attempt to purge in
; one request, excepting at the end of a database scan.
;min_batch_size = 250
; The most id/rev pairs the plugin will attempt to purge in
; one request.
;max_batch_size = 500
Expand Down
71 changes: 53 additions & 18 deletions src/couch/src/couch_auto_purge_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ db_opened(#{} = St, Db) ->
{0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.

db_closing(#{} = St, Db) ->
#{count := Count} = St,
#{count := Count} = flush_queue(St, Db),
?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)),
{ok, St}.

Expand All @@ -73,47 +73,72 @@ doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
?assert(
FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq"
),
{ok, purge(St, FDI, Db)};
{ok, enqueue(St, FDI, Db)};
doc_fdi(#{} = St, #full_doc_info{}, _Db) ->
{ok, St}.

purge(#{} = St, #full_doc_info{} = FDI, Db) ->
enqueue(#{} = St, FDI, Db) ->
{Id, Revs} = fdi_to_idrevs(FDI),
MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500),
purge(St, Id, Revs, MaxBatchSize, Db).
enqueue(St, Id, Revs, Db).

purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize ->
enqueue(#{queue := Queue} = St0, Id, Revs, Db) ->
CurrentQueueSize = queue_size(Queue),
NewQueueSize = CurrentQueueSize + length(Revs),
MinBatchSize = min_batch_size(),
MaxBatchSize = max_batch_size(),
if
NewQueueSize > MaxBatchSize ->
{RevBatch, RevRest} = lists:split(MaxBatchSize - CurrentQueueSize, Revs),
St1 = flush_queue(St0#{queue => [{Id, RevBatch} | Queue]}, Db),
enqueue(St1, Id, RevRest, Db);
NewQueueSize >= MinBatchSize ->
flush_queue(St0#{queue => [{Id, Revs} | Queue]}, Db);
NewQueueSize < MinBatchSize ->
St0#{queue => [{Id, Revs} | Queue]}
end.

flush_queue(#{queue := []} = St, _Db) ->
St;
flush_queue(#{queue := IdRevs} = St, Db) ->
DbName = mem3:dbname(couch_db:name(Db)),
PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end,
PurgeFun = fun() -> fabric:purge_docs(DbName, IdRevs, [?ADMIN_CTX]) end,
Timeout = fabric_util:request_timeout(),
try fabric_util:isolate(PurgeFun, Timeout) of
{Health, Results} when Health == ok; Health == accepted ->
?DEBUG(
"flushed batch of ~B idrevs from ~s",
[queue_size(IdRevs), couch_db:name(Db)],
meta(St)
),
#{count := Count, limiter := Limiter0} = St,
{Wait, Limiter1} = couch_scanner_rate_limiter:update(
Limiter0, doc_write, length(Results)
),
timer:sleep(Wait),
St#{count => Count + length(Results), limiter => Limiter1};
St#{
count => Count + length(Results),
limiter => Limiter1,
queue => []
};
Else ->
?WARN(
"Failed to purge deleted documents in ~s/~s for reason ~p",
[DbName, Id, Else],
"Failed to purge deleted documents in ~s for reason ~p",
[DbName, Else],
meta(St)
),
St
catch
Class:Reason ->
?WARN(
"Failed to purge deleted documents in ~s/~s for reason ~p:~p",
[DbName, Id, Class, Reason],
"Failed to purge deleted documents in ~s for reason ~p:~p",
[DbName, Class, Reason],
meta(St)
),
St
end;
purge(#{} = St0, Id, Revs, MaxBatchSize, Db) ->
{RevBatch, RevRest} = lists:split(MaxBatchSize, Revs),
St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db),
purge(St1, Id, RevRest, MaxBatchSize, Db).
end.

queue_size(Queue) when is_list(Queue) ->
lists:sum([length(Revs) || {_Id, Revs} <- Queue]).

fdi_to_idrevs(#full_doc_info{} = FDI) ->
Revs = [
Expand All @@ -123,7 +148,11 @@ fdi_to_idrevs(#full_doc_info{} = FDI) ->
{FDI#full_doc_info.id, Revs}.

init_config(ScanId) ->
#{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}.
#{
sid => ScanId,
limiter => couch_scanner_rate_limiter:get(),
queue => []
}.

meta(#{sid := ScanId}) ->
#{sid => ScanId}.
Expand Down Expand Up @@ -174,3 +203,9 @@ parse_ttl([$- | TTL]) ->
-(parse_ttl(TTL));
parse_ttl(TTL) ->
couch_scanner_util:parse_non_weekday_period(TTL).

min_batch_size() ->
erlang:max(1, config:get_integer(atom_to_list(?MODULE), "min_batch_size", 250)).

max_batch_size() ->
erlang:max(min_batch_size(), config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500)).
96 changes: 95 additions & 1 deletion src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ couch_quickjs_scanner_plugin_test_() ->
[
?TDEF_FE(t_no_auto_purge_by_default, 10),
?TDEF_FE(t_auto_purge_after_config_ttl, 10),
?TDEF_FE(t_auto_purge_after_db_ttl, 10)
?TDEF_FE(t_auto_purge_after_db_ttl, 10),
?TDEF_FE(t_min_batch_size_1, 10),
?TDEF_FE(t_min_batch_size_2, 10),
?TDEF_FE(t_max_batch_size_1, 10),
?TDEF_FE(t_max_batch_size_2, 10)
]
}.

Expand Down Expand Up @@ -83,6 +87,92 @@ t_auto_purge_after_db_ttl({_, DbName}) ->
?assertEqual(0, doc_del_count(DbName)),
ok.

t_min_batch_size_1({_, DbName}) ->
meck:new(fabric, [passthrough]),
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 5),
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
[
add_doc(DbName, <<"doc", (integer_to_binary(I))/binary>>, #{<<"_deleted">> => true})
|| I <- lists:seq(1, 10)
],
?assertEqual(10, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(2, meck:num_calls(fabric, purge_docs, '_')),
?assertEqual(0, doc_del_count(DbName)),
ok.

t_min_batch_size_2({_, DbName}) ->
meck:new(fabric, [passthrough]),
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 5),
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
[
add_doc(DbName, <<"doc", (integer_to_binary(I))/binary>>, #{<<"_deleted">> => true})
|| I <- lists:seq(1, 11)
],
?assertEqual(11, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(4, meck:num_calls(fabric, purge_docs, '_')),
?assertEqual(0, doc_del_count(DbName)),
ok.

t_max_batch_size_1({_, DbName}) ->
meck:new(fabric, [passthrough]),
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 1),
config:set_integer(atom_to_list(?PLUGIN), "max_batch_size", 5),
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
[
add_replicated_doc(
DbName,
<<"doc">>,
#{
<<"_rev">> => <<"1-", (couch_uuids:random())/binary>>,
<<"foo">> => I,
<<"_deleted">> => true
}
)
|| I <- lists:seq(1, 10)
],
?assertEqual(1, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(2, meck:num_calls(fabric, purge_docs, '_')),
?assertEqual(0, doc_del_count(DbName)),
ok.

t_max_batch_size_2({_, DbName}) ->
meck:new(fabric, [passthrough]),
config:set_integer(atom_to_list(?PLUGIN), "min_batch_size", 1),
config:set_integer(atom_to_list(?PLUGIN), "max_batch_size", 5),
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, "-3_hour"}]),
[
add_replicated_doc(
DbName,
<<"doc">>,
#{
<<"_rev">> => <<"1-", (couch_uuids:random())/binary>>,
<<"foo">> => I,
<<"_deleted">> => true
}
)
|| I <- lists:seq(1, 11)
],
?assertEqual(1, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(3, meck:num_calls(fabric, purge_docs, '_')),
?assertEqual(0, doc_del_count(DbName)),
ok.

reset_stats() ->
Counters = [
[couchdb, query_server, process_error_exits],
Expand All @@ -106,6 +196,10 @@ add_doc(DbName, DocId, Body) ->
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]),
ok.

add_replicated_doc(DbName, DocId, Body) ->
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX, ?REPLICATED_CHANGES]),
ok.

mkdoc(Id, #{} = Body) ->
Body1 = Body#{<<"_id">> => Id},
jiffy:decode(jiffy:encode(Body1)).
Expand Down