From a8ce2bba1026fb72121c7c763c054acbc8b7d186 Mon Sep 17 00:00:00 2001 From: jiangphcn Date: Wed, 6 Sep 2017 16:11:43 +0800 Subject: [PATCH] Rename geo index directories in place when database is deleted Bugzid: 86318 --- src/hastings_util.erl | 36 ++++++- src/hastings_vacuum.erl | 155 ++++++++++++++++++++++++------- test/hastings_delete_db_test.erl | 139 +++++++++++++++++++++++++++ test/hastings_test_util.erl | 53 +++++++++++ 4 files changed, 349 insertions(+), 34 deletions(-) create mode 100755 test/hastings_delete_db_test.erl create mode 100755 test/hastings_test_util.erl diff --git a/src/hastings_util.erl b/src/hastings_util.erl index 97c8188..d96e52c 100644 --- a/src/hastings_util.erl +++ b/src/hastings_util.erl @@ -14,7 +14,10 @@ -export([ - get_json_docs/2 + get_json_docs/2, + do_rename/1, + get_existing_index_dirs/2, + calculate_delete_directory/1 ]). @@ -42,3 +45,34 @@ callback(complete, Acc) -> {ok, lists:reverse(Acc)}; callback(timeout, _Acc) -> {error, timeout}. + + +do_rename(IdxDir) -> + IdxName = filename:basename(IdxDir), + DbDir = filename:dirname(IdxDir), + DeleteDir = calculate_delete_directory(DbDir), + RenamePath = filename:join([DeleteDir, IdxName]), + filelib:ensure_dir(RenamePath), + Now = calendar:local_time(), + case file:rename(IdxDir, RenamePath) of + ok -> file:change_time(DeleteDir, Now); + Else -> Else + end. + + +get_existing_index_dirs(BaseDir, ShardDbName) -> + Pattern0 = filename:join([BaseDir, ShardDbName, "*"]), + Pattern = binary_to_list(iolist_to_binary(Pattern0)), + DirListStrs = filelib:wildcard(Pattern), + [iolist_to_binary(DL) || DL <- DirListStrs]. + + +calculate_delete_directory(DbNameDir) -> + {{Y, Mon, D}, {H, Min, S}} = calendar:universal_time(), + Suffix = lists:flatten( + io_lib:format(".~w~2.10.0B~2.10.0B." + ++ "~2.10.0B~2.10.0B~2.10.0B.deleted" + ++ filename:extension(binary_to_list(DbNameDir)), + [Y, Mon, D, H, Min, S]) + ), + binary_to_list(filename:rootname(DbNameDir)) ++ Suffix. diff --git a/src/hastings_vacuum.erl b/src/hastings_vacuum.erl index 0a63a44..63fc4b1 100644 --- a/src/hastings_vacuum.erl +++ b/src/hastings_vacuum.erl @@ -34,13 +34,18 @@ -export([ handle_db_event/3, - clean_db/1 + clean_db/1, + clean_db/2, + clean_shard_db/1, + clean_shard_db/2 ]). -record(st, { - queue = queue:new(), - cleaner + selective_clean = queue:new(), + full_clean = queue:new(), + selective_cleaner, + full_cleaner }). @@ -50,7 +55,7 @@ start_link() -> cleanup(DbName) -> lists:foreach(fun(Node) -> - rexi:cast(Node, {gen_server, cast, [?MODULE, {cleanup, DbName}]}) + rexi:cast(Node, {gen_server, cast, [?MODULE, {cleanup, DbName, []}]}) end, mem3:nodes()). @@ -67,22 +72,38 @@ handle_call(Msg, _From, St) -> {stop, {bad_call, Msg}, {bad_call, Msg}, St}. -handle_cast({cleanup, DbName0}, St) -> - DbName = mem3:dbname(DbName0), - case queue:member(DbName, St#st.queue) of +handle_cast({cleanup, ShardDbName, Options}, St) -> + DbName = mem3:dbname(ShardDbName), + Context = couch_util:get_value(context, Options, compaction), + case Context =:= delete of true -> - {noreply, St}; + case queue:member(ShardDbName, St#st.full_clean) of + true -> + {noreply, St}; + false -> + NewQ = queue:in(ShardDbName, St#st.full_clean), + maybe_start_full_cleaner(St#st{full_clean = NewQ}) + end; false -> - NewQ = queue:in(DbName, St#st.queue), - maybe_start_cleaner(St#st{queue = NewQ}) - end; + case queue:member(DbName, St#st.selective_clean) of + true -> + {noreply, St}; + false -> + NewQ = queue:in(DbName, St#st.selective_clean), + maybe_start_selective_cleaner( + St#st{selective_clean = NewQ}) + end + end; handle_cast(Msg, St) -> {stop, {bad_cast, Msg}, St}. -handle_info({'DOWN', _, _, Pid, _}, #st{cleaner = Pid} = St) -> - maybe_start_cleaner(St#st{cleaner = undefined}); +handle_info({'DOWN', _, _, Pid, _}, #st{selective_cleaner = Pid} = St) -> + maybe_start_selective_cleaner(St#st{selective_cleaner = undefined}); + +handle_info({'DOWN', _, _, Pid, _}, #st{full_cleaner = Pid} = St) -> + maybe_start_full_cleaner(St#st{full_cleaner = undefined}); handle_info(Msg, St) -> {stop, {bad_info, Msg}, St}. @@ -93,38 +114,79 @@ code_change(_OldVsn, St, _Extra) -> handle_db_event(DbName, created, _St) -> - gen_server:cast(?MODULE, {cleanup, DbName}), + gen_server:cast(?MODULE, {cleanup, DbName, []}), {ok, nil}; handle_db_event(DbName, deleted, _St) -> - gen_server:cast(?MODULE, {cleanup, DbName}), + gen_server:cast(?MODULE, {cleanup, DbName, [{context, delete}]}), {ok, nil}; handle_db_event(_DbName, _Event, _St) -> {ok, nil}. -maybe_start_cleaner(#st{cleaner=undefined}=St) -> - case queue:is_empty(St#st.queue) of +maybe_start_selective_cleaner(#st{selective_cleaner=undefined}=St) -> + case queue:is_empty(St#st.selective_clean) of false -> - start_cleaner(St); + start_selective_cleaner(St); true -> {noreply, St} end; -maybe_start_cleaner(St) -> +maybe_start_selective_cleaner(St) -> {noreply, St}. -start_cleaner(St) -> - {{value, DbName}, NewQ} = queue:out(St#st.queue), - {Pid, _} = spawn_monitor(?MODULE, clean_db, [DbName]), - {noreply, St#st{queue = NewQ, cleaner = Pid}}. +maybe_start_full_cleaner(#st{full_cleaner=undefined}=St) -> + case queue:is_empty(St#st.full_clean) of + false -> + start_full_cleaner(St); + true -> + {noreply, St} + end; + +maybe_start_full_cleaner(St) -> + {noreply, St}. + + +start_selective_cleaner(St) -> + {{value, DbName}, NewQ} = queue:out(St#st.selective_clean), + {Pid, _} = spawn_monitor(?MODULE, clean_db, [DbName, []]), + {noreply, St#st{selective_clean = NewQ, selective_cleaner = Pid}}. + + +start_full_cleaner(St) -> + {{value, ShardDbName}, NewQ} = queue:out(St#st.full_clean), + {Pid, _} = spawn_monitor( + ?MODULE, clean_shard_db, [ShardDbName, [{context, delete}]] + ), + {noreply, St#st{full_clean = NewQ, full_cleaner = Pid}}. clean_db(DbName) -> + clean_db(DbName, []). + +clean_db(DbName, _Options) -> + delete_inactive_indexes(DbName, get_active_sigs(DbName)). + + +clean_shard_db(ShardDbName) -> + clean_db(ShardDbName, []). + +clean_shard_db(ShardDbName, Options) -> + delete = couch_util:get_value(context, Options, delete), + DoRecovery = config:get_boolean("couchdb", + "enable_database_recovery", false), + case DoRecovery of + true -> + rename_all_indexes(ShardDbName); + false -> + delete_all_indexes(ShardDbName) + end. + + +get_active_sigs(DbName) -> {ok, JsonDDocs} = get_ddocs(DbName), DDocs = [couch_doc:from_json_obj(DD) || DD <- JsonDDocs], - ActiveSigs = lists:usort(lists:flatmap(fun active_sigs/1, DDocs)), - cleanup(DbName, ActiveSigs). + lists:usort(lists:flatmap(fun active_sigs/1, DDocs)). get_ddocs(DbName) -> @@ -168,15 +230,9 @@ active_sigs(#doc{body={Fields}}=Doc) -> end, IndexNames). -cleanup(DbName, ActiveSigs) -> +delete_inactive_indexes(DbName, ActiveSigs) -> BaseDir = config:get("couchdb", "geo_index_dir", "/srv/geo_index"), - - % Find the existing index directories on disk - DbNamePattern = <>, - Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]), - Pattern = binary_to_list(iolist_to_binary(Pattern0)), - DirListStrs = filelib:wildcard(Pattern), - DirList = [iolist_to_binary(DL) || DL <- DirListStrs], + DirList = hastings_util:get_existing_index_dirs(BaseDir, DbName), % Create the list of active index directories LocalShards = mem3:local_shards(DbName), @@ -200,3 +256,36 @@ cleanup(DbName, ActiveSigs) -> [{E, T}, Stack]) end end, DeadDirs). + + +delete_all_indexes(ShardDbName) -> + BaseDir = config:get("couchdb", "geo_index_dir", "/srv/geo_index"), + DirList = hastings_util:get_existing_index_dirs(BaseDir, ShardDbName), + + lists:foreach(fun(IdxDir) -> + try + hastings_index:destroy(IdxDir), + file:del_dir(IdxDir) + catch E:T -> + Stack = erlang:get_stacktrace(), + couch_log:error( + "Failed to remove hastings index directory: ~p ~p", + [{E, T}, Stack]) + end + end, DirList). + + +rename_all_indexes(ShardDbName) -> + BaseDir = config:get("couchdb", "geo_index_dir", "/srv/geo_index"), + DirList = hastings_util:get_existing_index_dirs(BaseDir, ShardDbName), + + lists:foreach(fun(IdxDir) -> + try + hastings_util:do_rename(IdxDir) + catch E:T -> + Stack = erlang:get_stacktrace(), + couch_log:error( + "Failed to rename hastings index directory: ~p ~p", + [{E, T}, Stack]) + end + end, DirList). diff --git a/test/hastings_delete_db_test.erl b/test/hastings_delete_db_test.erl new file mode 100755 index 0000000..1f928f6 --- /dev/null +++ b/test/hastings_delete_db_test.erl @@ -0,0 +1,139 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(hastings_delete_db_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("hastings/src/hastings.hrl"). + + +-define(TIMEOUT, 10000). + + +setup() -> + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [?ADMIN_CTX]), + meck:new(hastings_vacuum, [passthrough]), + meck:expect(hastings_vacuum, clean_db, fun(A) -> + meck:passthrough([A]) + end), + DbName. + + +teardown(_DbName) -> + (catch meck:unload(hastings_vacuum)). + + +hastings_delete_db_test_() -> + { + "Hastings Delete Database Tests", + { + setup, + fun() -> test_util:start_couch([fabric, mem3, hastings]) end, + fun test_util:stop/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_delete_index_after_deleting_database/1, + fun should_rename_index_after_deleting_database/1 + ] + } + } + }. + + +should_delete_index_after_deleting_database(DbName) -> + ?_test(begin + Docs = hastings_test_util:make_docs(5), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]), + {ok, _} = fabric:update_doc( + DbName, + hastings_test_util:ddoc(geo), + [?ADMIN_CTX] + ), + + {ok, Hits} = run_hastings_search(DbName), + ?assertEqual(5, lists:flatlength(Hits)), + + config:set("couchdb", "enable_database_recovery", "false", false), + BaseDir = config:get("couchdb", "geo_index_dir", "/srv/geo_index"), + + [FirstShard | _RestShards] = mem3:shards(DbName), + [GeoIdxDir | _RestIdxDirs] = hastings_util:get_existing_index_dirs( + BaseDir, + FirstShard#shard.name + ), + GeoDirExistsBefore = filelib:is_dir(GeoIdxDir), + + fabric:delete_db(DbName, [?ADMIN_CTX]), + meck:wait(hastings_vacuum, clean_shard_db, '_', 5000), + + RenamedPath = hastings_util:calculate_delete_directory(GeoIdxDir), + GeoDirExistsAfter = filelib:is_dir(GeoIdxDir), + GeoRenamedDirExistsAfter = filelib:is_dir(RenamedPath), + + [ + ?assert(GeoDirExistsBefore), + ?assertNot(GeoDirExistsAfter), + ?assertNot(GeoRenamedDirExistsAfter) + ] + end). + + +should_rename_index_after_deleting_database(DbName) -> + ?_test(begin + Docs = hastings_test_util:make_docs(5), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]), + {ok, _} = fabric:update_doc( + DbName, + hastings_test_util:ddoc(geo), + [?ADMIN_CTX] + ), + + {ok, Hits} = run_hastings_search(DbName), + ?assertEqual(5, lists:flatlength(Hits)), + + config:set("couchdb", "enable_database_recovery", "true", false), + BaseDir = config:get("couchdb", "geo_index_dir", "/srv/geo_index"), + + [FirstShard | _RestShards] = mem3:shards(DbName), + [GeoIdxDir | _RestIdxDirs] = hastings_util:get_existing_index_dirs( + BaseDir, + FirstShard#shard.name + ), + GeoDirExistsBefore = filelib:is_dir(GeoIdxDir), + + fabric:delete_db(DbName, [?ADMIN_CTX]), + meck:wait(hastings_vacuum, clean_shard_db, '_', 5000), + + RenamedPath = hastings_util:calculate_delete_directory( + filename:dirname(GeoIdxDir) + ), + GeoDirExistsAfter = filelib:is_dir(GeoIdxDir), + GeoRenamedDirExistsAfter = filelib:is_dir(RenamedPath), + + [ + ?assert(GeoDirExistsBefore), + ?assertNot(GeoDirExistsAfter), + ?assert(GeoRenamedDirExistsAfter) + ] + end). + + +run_hastings_search(DbName) -> + HQArgs = #h_args{geom = {bbox,[-180.0,-90.0,180.0,90.0]}}, + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/geodd">>, []), + hastings_fabric_search:go(DbName, DDoc, <<"geoidx">>, HQArgs). diff --git a/test/hastings_test_util.erl b/test/hastings_test_util.erl new file mode 100755 index 0000000..ecf846e --- /dev/null +++ b/test/hastings_test_util.erl @@ -0,0 +1,53 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(hastings_test_util). + +-compile(export_all). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). + + +save_docs(DbName, Docs) -> + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). + + +make_docs(Count) -> + [doc(I) || I <- lists:seq(1, Count)]. + + +ddoc(geo) -> + couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/geodd">>}, + {<<"st_indexes">>, {[ + {<<"geoidx">>, {[ + {<<"index">>, << + "function(doc) {" + " if (doc.geometry && doc.geometry.coordinates)" + " {st_index(doc.geometry);}" + "}" + >>} + ]}} + ]}} + ]}). + + +doc(Id) -> + couch_doc:from_json_obj({[ + {<<"_id">>, list_to_binary("point" ++ integer_to_list(Id))}, + {<<"val">>, Id}, + {<<"geometry">>, {[ + {<<"type">>, <<"Point">>}, + {<<"coordinates">>, [Id, Id]} + ]}} + ]}).