Skip to content

Commit fd8e486

Browse files
committed
Stop search indexer when it is deleted.
Allow dreyfus to listen to ddoc_updated events and check if the index is deleted as a result of update to it's design document. If the index is deleted then it will stop the index if waitlist is empty, if the waitlist is not empty then it will wait for a minute before shutting down. BugzId:85718
1 parent d838881 commit fd8e486

File tree

4 files changed

+441
-9
lines changed

4 files changed

+441
-9
lines changed

src/dreyfus_index.erl

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,18 @@ handle_call(info, _From, State) -> % obsolete
157157
Reply = info_int(State#state.index_pid),
158158
{reply, Reply, State}.
159159

160+
handle_cast({ddoc_updated, DDocResult},
161+
#state{
162+
index=#index{sig=Sig, dbname=DbName}=Index
163+
}=State) ->
164+
case check_if_index_is_deleted(DbName, DDocResult, Sig) of
165+
true ->
166+
couch_log:notice("Index ~s will be closed as it's not active or valid anymore", [index_name(Index)]),
167+
{stop, {shutdown, ddoc_updated}, State};
168+
false ->
169+
{noreply, State}
170+
end;
171+
160172
handle_cast(_Msg, State) ->
161173
{noreply, State}.
162174

@@ -214,13 +226,65 @@ handle_info({'DOWN',_,_,Pid,Reason}, #state{
214226
[gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
215227
{stop, normal, State}.
216228

217-
terminate(_Reason, _State) ->
218-
ok.
229+
terminate(Reason, State) ->
230+
case Reason of
231+
{shutdown, ddoc_updated} ->
232+
send_all(State#state.waiting_list, ddoc_updated),
233+
ok;
234+
_ ->
235+
ok
236+
end.
219237

220238
code_change(_OldVsn, State, _Extra) ->
221239
{ok, State}.
222240

223241
% private functions.
242+
send_all(Waiters, Reply) ->
243+
[gen_server:reply(From, Reply) || {From, _} <- Waiters].
244+
245+
check_if_index_is_deleted(DbName, DDocResult, CurrentSig) ->
246+
case DDocResult of
247+
{not_found, deleted} ->
248+
IndicesWithSig = indices_with_sig_from_other_ddocs(DbName, CurrentSig);
249+
{ok, DDoc} ->
250+
IndicesWithSig = indices_with_sig(DbName, DDoc, CurrentSig)
251+
end,
252+
253+
case IndicesWithSig of
254+
[] ->
255+
true;
256+
[_H | _T] ->
257+
false
258+
end.
259+
260+
indices_with_sig(DbName, Doc, Sig) ->
261+
case indices_matching_sig({Doc, Sig}) of
262+
[] ->
263+
% Index is deleted from this design doc, check if it's referred from any other design docs.
264+
indices_with_sig_from_other_ddocs(DbName, Sig);
265+
Indices ->
266+
Indices
267+
end.
268+
269+
indices_with_sig_from_other_ddocs(DbName, CurrentSig) ->
270+
{ok, DesignDocs} = fabric:design_docs(mem3:dbname(DbName)),
271+
ActiveSigs = lists:usort(lists:flatmap(
272+
fun indices_matching_sig/1,
273+
[{couch_doc:from_json_obj(DD), CurrentSig} || DD <- DesignDocs])),
274+
ActiveSigs.
275+
276+
indices_matching_sig({#doc{body={Fields}}=Doc, CurrentSig}) ->
277+
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
278+
{IndexNames, _} = lists:unzip(RawIndexes),
279+
lists:filter(fun check_if_index_matches_sig/1, [{Doc, IndexName, CurrentSig} || IndexName <- IndexNames]).
280+
281+
check_if_index_matches_sig({Doc, IndexName, Sig}) ->
282+
case design_doc_to_index(Doc, IndexName) of
283+
{ok, #index{sig=Sig_New}} when Sig_New =/= Sig ->
284+
false;
285+
{ok, #index{sig=Sig_New}} when Sig_New =:= Sig ->
286+
true
287+
end.
224288

225289
open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
226290
Path = <<DbName/binary,"/",Sig/binary>>,

src/dreyfus_index_manager.erl

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
-define(BY_SIG, dreyfus_by_sig).
2323
-define(BY_PID, dreyfus_by_pid).
24+
-define(BY_DB, dreyfus_by_db).
2425

2526
% public api.
2627
-export([start_link/0, get_index/2, get_disk_size/2]).
@@ -44,8 +45,9 @@ get_disk_size(DbName, Index) ->
4445
% gen_server functions.
4546

4647
init([]) ->
47-
ets:new(?BY_SIG, [set, private, named_table]),
48+
ets:new(?BY_SIG, [set, protected, named_table]),
4849
ets:new(?BY_PID, [set, private, named_table]),
50+
ets:new(?BY_DB, [bag, protected, named_table]),
4951
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
5052
process_flag(trap_exit, true),
5153
{ok, nil}.
@@ -69,12 +71,12 @@ handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) ->
6971
Reply = clouseau_rpc:disk_size(Path),
7072
{reply, Reply, State};
7173

72-
handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) ->
74+
handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) ->
7375
link(NewPid),
7476
[{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}),
7577
[gen_server:reply(From, {ok, NewPid}) || From <- WaitList],
7678
ets:delete(?BY_PID, OpenerPid),
77-
add_to_ets(NewPid, DbName, Sig),
79+
add_to_ets(NewPid, DbName, DDocId, Sig),
7880
{reply, ok, State};
7981

8082
handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) ->
@@ -121,25 +123,49 @@ handle_db_event(DbName, created, _St) ->
121123
handle_db_event(DbName, deleted, _St) ->
122124
gen_server:cast(?MODULE, {cleanup, DbName}),
123125
{ok, nil};
126+
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, _St) ->
127+
DDocResult = couch_util:with_db(DbName, fun(Db) ->
128+
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
129+
end),
130+
couch_log:info("Received ddoc_updated event for ~s",[DDocId]),
131+
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
132+
lists:foreach(fun(DbShard) ->
133+
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
134+
case ets:lookup(?BY_SIG, {DbShard, Sig}) of
135+
[{_, IndexPid}] ->
136+
gen_server:cast(IndexPid, {ddoc_updated, DDocResult});
137+
[] ->
138+
ok
139+
end
140+
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '_'}}))
141+
end, DbShards),
142+
{ok, nil};
124143
handle_db_event(_DbName, _Event, _St) ->
125144
{ok, nil}.
126145

127-
new_index(DbName, #index{sig=Sig}=Index) ->
146+
new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) ->
128147
case (catch dreyfus_index:start_link(DbName, Index)) of
129148
{ok, NewPid} ->
130-
Msg = {open_ok, DbName, Sig, NewPid},
149+
Msg = {open_ok, DbName, DDocId, Sig, NewPid},
131150
ok = gen_server:call(?MODULE, Msg, infinity),
132151
unlink(NewPid);
133152
Error ->
134153
Msg = {open_error, DbName, Sig, Error},
135154
ok = gen_server:call(?MODULE, Msg, infinity)
136155
end.
137156

138-
add_to_ets(Pid, DbName, Sig) ->
157+
add_to_ets(Pid, DbName, DDocId, Sig) ->
139158
true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
140-
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}).
159+
true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
160+
true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
141161

142162
delete_from_ets(Pid, DbName, Sig) ->
163+
case ets:match_object(?BY_DB, {DbName, {'_', Sig}}) of
164+
[{DbName, {DDocId, Sig}}] ->
165+
true = ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}});
166+
_Else ->
167+
true
168+
end,
143169
true = ets:delete(?BY_PID, Pid),
144170
true = ets:delete(?BY_SIG, {DbName, Sig}).
145171

test/.DS_Store

6 KB
Binary file not shown.

0 commit comments

Comments
 (0)