Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions src/dreyfus/src/dreyfus_index_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ update(IndexPid, Index) ->
erlang:put(io_priority, {search, DbName, IndexName}),
{ok, Db} = couch_db:open_int(DbName, []),
try
CheckpointPSeq = get_local_doc_purge_seq(Db, Index),
{ok, ClouseauPSeq} = clouseau_rpc:get_purge_seq(IndexPid),
IdxPurgeSeq = get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, DDocId, IndexName),
DbPurgeSeq = couch_db:get_purge_seq(Db),
TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq,
TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
TotalChanges = TotalUpdateChanges + TotalPurgeChanges,

couch_task_status:add_task([
Expand All @@ -49,7 +53,7 @@ update(IndexPid, Index) ->

%ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
%The Rev is the final Rev, not purged Rev.
{ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
{ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq, ClouseauPSeq),
%% compute on all docs modified since we last computed.

NewCurSeq = couch_db:get_update_seq(Db),
Expand Down Expand Up @@ -87,8 +91,7 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = A
{ok, setelement(1, Acc, I + 1)}
end.

purge_index(Db, IndexPid, Index) ->
{ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
purge_index(Db, IndexPid, Index, IdxPurgeSeq, OldClouseauPSeq) ->
Proc = get_os_process(Index#index.def_lang),
try
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
Expand All @@ -113,18 +116,19 @@ purge_index(Db, IndexPid, Index) ->
end,
{ok, ExcludeList} = couch_db:fold_purge_infos(Db, IdxPurgeSeq, FoldFun, []),
NewPurgeSeq = couch_db:get_purge_seq(Db),
ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq),
case NewPurgeSeq > OldClouseauPSeq of
true ->
ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq);
false ->
% Save an rpc call if we aren't advancing the purge sequence
ok
end,
update_local_doc(Db, Index, NewPurgeSeq),
{ok, ExcludeList}
after
ret_os_process(Proc)
end.

count_pending_purged_docs_since(Db, IndexPid) ->
DbPurgeSeq = couch_db:get_purge_seq(Db),
{ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
DbPurgeSeq - IdxPurgeSeq.

update_or_delete_index(IndexPid, Db, DI, Proc) ->
#doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI,
case Del of
Expand Down Expand Up @@ -152,6 +156,38 @@ update_local_doc(Db, Index, PurgeSeq) ->
DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index),
couch_db:update_doc(Db, DocContent, []).

get_local_doc_purge_seq(Db, Index) ->
DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
% We're implicitly asserting this purge checkpoint doc exists. It is
% created either on open or during compaction in on_compact handler.
{ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId),
couch_util:get_value(<<"purge_seq">>, Props).

get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, DDocId, IndexName) when
is_integer(CheckpointPSeq), is_integer(ClouseauPSeq), CheckpointPSeq >= 0, ClouseauPSeq >= 0
->
if
CheckpointPSeq == ClouseauPSeq ->
% The default state is they both match
CheckpointPSeq;
CheckpointPSeq > ClouseauPSeq ->
% Somehow index fell behind. We should reset the index but don't really
% have a facility for it, so log an error instead. We still can only fold
% purges start from checkpoint sequence and higher (and not lower).
DbName = couch_db:name(Db),
Msg = "~p : index pseq:~p is behind the checkpoint pseq:~p db:~p ddoc:~p index:~p",
couch_log:error(Msg, [?MODULE, ClouseauPSeq, CheckpointPSeq, DbName, DDocId, IndexName]),
CheckpointPSeq;
CheckpointPSeq < ClouseauPSeq ->
% Somehow the checkpoint fell behind. Perhaps someone manually
% manipulated checkpoint docs, or the the system crashed right
% after the set_purge_seq was called but before the checkpoint doc
% was written. Choose to reprocess the changes from the
% checkpointed sequence, it may add extra work, but should still be
% correct.
CheckpointPSeq
end.

update_task(NumChanges) ->
[Changes, Total] = couch_task_status:get([changes_done, total_changes]),
Changes2 = Changes + NumChanges,
Expand Down
Loading