Skip to content

Commit 4abddf9

Browse files
committed
Stop search indexer when it is deleted.
Allow dreyfus to listen to ddoc_updated events and check if the index is deleted as a result of update to it's design document. If the index is deleted then it will stop the index if waitlist is empty, if the waitlist is not empty then it will wait for a minute before shutting down. BugzId:85718
1 parent d838881 commit 4abddf9

File tree

4 files changed

+363
-7
lines changed

4 files changed

+363
-7
lines changed

src/dreyfus_index.erl

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
waiting_list=[]
4141
}).
4242

43+
-define(INDEX_SHUTDOWN_DELAY, index_shutdown_delay()).
44+
4345
% exported for callback.
4446
-export([search_int/2, group1_int/2, group2_int/2, info_int/1]).
4547

@@ -157,9 +159,28 @@ handle_call(info, _From, State) -> % obsolete
157159
Reply = info_int(State#state.index_pid),
158160
{reply, Reply, State}.
159161

162+
handle_cast({ddoc_updated, DDocResult}, State) ->
163+
#state{index=#index{sig=CurrentSig}, waiting_list = WaitList} = State,
164+
case check_if_index_is_deleted(DDocResult, CurrentSig) of
165+
true ->
166+
case WaitList of
167+
[] ->
168+
self() ! index_deleted;
169+
[_Index] ->
170+
erlang:send_after(?INDEX_SHUTDOWN_DELAY, self(), index_deleted)
171+
end
172+
end,
173+
{noreply, State};
174+
160175
handle_cast(_Msg, State) ->
161176
{noreply, State}.
162177

178+
handle_info(index_deleted, State) ->
179+
#state{index=Index} = State,
180+
couch_log:notice("Shutting down index for ~s. It is either deleted or invalidated with the update to it's design document",
181+
[index_name(Index)]),
182+
{stop, normal, State};
183+
163184
handle_info({'EXIT', FromPid, {updated, NewSeq}},
164185
#state{
165186
index=Index0,
@@ -222,6 +243,35 @@ code_change(_OldVsn, State, _Extra) ->
222243

223244
% private functions.
224245

246+
get_index_with_sig(#doc{body={Fields}}=Doc, Sig) ->
247+
RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
248+
case RawIndexes of
249+
{IndexList} when is_list(IndexList) ->
250+
{IndexNames, _} = lists:unzip(IndexList),
251+
lists:flatmap(
252+
fun(IndexName) ->
253+
case design_doc_to_index(Doc, IndexName) of
254+
{ok, #index{sig=Sig_New}} when Sig_New =/= Sig ->
255+
[];
256+
{ok, #index{sig=Sig_New}=Index} when Sig_New =:= Sig ->
257+
[Index]
258+
end
259+
end, IndexNames)
260+
end.
261+
262+
check_if_index_is_deleted(DDocResult, CurrentSig) ->
263+
case DDocResult of
264+
{not_found, deleted} ->
265+
true;
266+
{ok, DDoc} ->
267+
case get_index_with_sig(DDoc, CurrentSig) of
268+
[] ->
269+
true;
270+
[_Index] ->
271+
false
272+
end
273+
end.
274+
225275
open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
226276
Path = <<DbName/binary,"/",Sig/binary>>,
227277
case clouseau_rpc:open_index(self(), Path, Analyzer) of
@@ -367,5 +417,8 @@ group2_int(Pid, QueryArgs0) ->
367417
clouseau_rpc:group2(Pid, Props)
368418
end.
369419

420+
index_shutdown_delay() ->
421+
config:get_integer("dreyfus", "index_shutdown_delay", 60000). % 1 minute
422+
370423
info_int(Pid) ->
371424
clouseau_rpc:info(Pid).

src/dreyfus_index_manager.erl

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
-define(BY_SIG, dreyfus_by_sig).
2323
-define(BY_PID, dreyfus_by_pid).
24+
-define(BY_DB, dreyfus_by_db).
2425

2526
% public api.
2627
-export([start_link/0, get_index/2, get_disk_size/2]).
@@ -44,8 +45,9 @@ get_disk_size(DbName, Index) ->
4445
% gen_server functions.
4546

4647
init([]) ->
47-
ets:new(?BY_SIG, [set, private, named_table]),
48+
ets:new(?BY_SIG, [set, protected, named_table]),
4849
ets:new(?BY_PID, [set, private, named_table]),
50+
ets:new(?BY_DB, [bag, protected, named_table]),
4951
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
5052
process_flag(trap_exit, true),
5153
{ok, nil}.
@@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) ->
6971
Reply = clouseau_rpc:disk_size(Path),
7072
{reply, Reply, State};
7173

72-
handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
74+
handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) ->
7375
link(NewPid),
7476
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
7577
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
7678
ets:delete(?BY_PID, OpenerPid),
77-
add_to_ets(NewPid, DbName, Sig),
79+
add_to_ets(NewPid, DbName, DDocId, Sig),
7880
{reply, ok, State};
7981

8082
handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
@@ -121,25 +123,49 @@ handle_db_event(DbName, created, _St) ->
121123
handle_db_event(DbName, deleted, _St) ->
122124
gen_server:cast(?MODULE, {cleanup, DbName}),
123125
{ok, nil};
126+
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) ->
127+
DDocResult = couch_util:with_db(DbName, fun(Db) ->
128+
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
129+
end),
130+
couch_log:info("Received ddoc_updated event for ~s",[DDocId]),
131+
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
132+
lists:foreach(fun(DbShard) ->
133+
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
134+
case ets:lookup(?BY_SIG, {DbShard, Sig}) of
135+
[{_, IndexPid}] ->
136+
gen_server:cast(IndexPid, {ddoc_updated, DDocResult});
137+
[] ->
138+
ok
139+
end
140+
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
141+
end, DbShards),
142+
{ok, nil};
124143
handle_db_event(_DbName, _Event, _St) ->
125144
{ok, nil}.
126145

127-
new_index(DbName, #index{sig=Sig}=Index) ->
146+
new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) ->
128147
case (catch dreyfus_index:start_link(DbName, Index)) of
129148
{ok, NewPid} ->
130-
Msg = {open_ok, DbName, Sig, NewPid},
149+
Msg = {open_ok, DbName, DDocId, Sig, NewPid},
131150
ok = gen_server:call(?MODULE, Msg, infinity),
132151
unlink(NewPid);
133152
Error ->
134153
Msg = {open_error, DbName, Sig, Error},
135154
ok = gen_server:call(?MODULE, Msg, infinity)
136155
end.
137156

138-
add_to_ets(Pid, DbName, Sig) ->
157+
add_to_ets(Pid, DbName, DDocId, Sig) ->
139158
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
140-
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
159+
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
160+
true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
141161

142162
delete_from_ets(Pid, DbName, Sig) ->
163+
case ets:match_object(?BY_DB, {DbName, {'$1', Sig}}) of
164+
[{DbName, {DDocId, Sig}}] ->
165+
true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}});
166+
_Else ->
167+
true
168+
end,
143169
true = ets:delete(?BY_PID, Pid),
144170
true = ets:delete(?BY_SIG, {DbName, Sig}).
145171

test/.DS_Store

6 KB
Binary file not shown.

0 commit comments

Comments
 (0)