Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions src/dreyfus/src/clouseau_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

-include("dreyfus.hrl").

-export([index_key_from_pid/1, index_key_from_path/2, get_index_pid_from_key/1]).
-export([open_index/3]).
-export([await/2, commit/2, get_update_seq/1, info/1, search/2]).
-export([group1/7, group2/2]).
Expand Down Expand Up @@ -47,7 +48,21 @@

-type analyzer_fields() :: [{field_name(), analyzer_name() | [analyzer_name()]}].

-type indexer_pid() :: pid().
-type indexer_key() :: {path, string_as_binary(_), pid()} | {pid, pid()}.

-spec index_key_from_pid(pid()) -> indexer_key().
index_key_from_pid(Pid) ->
{pid, Pid}.

-spec index_key_from_path(string_as_binary(_), pid()) -> indexer_key().
index_key_from_path(Path, Pid) ->
{path, Path, Pid}.

-spec get_index_pid_from_key(indexer_key()) -> pid().
get_index_pid_from_key({path, _, Pid}) ->
Pid;
get_index_pid_from_key({pid, Pid}) ->
Pid.

%% Example of the message
%% {[
Expand All @@ -69,7 +84,7 @@
-define(SEARCH_SERVICE_TIMEOUT, 2000).

-spec open_index(Peer :: pid(), Path :: shard(), Analyzer :: analyzer()) ->
{ok, indexer_pid()} | error().
{ok, indexer_key()} | error().
open_index(Peer, Path, Analyzer) ->
rpc({main, clouseau()}, {open, Peer, Path, Analyzer}).

Expand All @@ -86,14 +101,14 @@ get_root_dir() ->
rpc({main, clouseau()}, {get_root_dir}).

%% not used ???
-spec await(Ref :: indexer_pid(), MinSeq :: commit_seq()) ->
-spec await(Ref :: indexer_key(), MinSeq :: commit_seq()) ->
ok | error().

await(Ref, MinSeq) ->
rpc(Ref, {await, MinSeq}).

%% deprecated
-spec commit(Ref :: indexer_pid(), NewCommitSeq :: commit_seq()) ->
-spec commit(Ref :: indexer_key(), NewCommitSeq :: commit_seq()) ->
ok | error().

commit(Ref, NewCommitSeq) ->
Expand All @@ -107,25 +122,25 @@ commit(Ref, NewCommitSeq) ->
| {committed_seq, committed_seq()}
| {purge_seq, purge_seq()}.

-spec info(Ref :: indexer_pid()) ->
-spec info(Ref :: indexer_key()) ->
{ok, [info_result_item()]} | error().

info(Ref) ->
rpc(Ref, info).

-spec get_update_seq(Ref :: indexer_pid()) ->
-spec get_update_seq(Ref :: indexer_key()) ->
{ok, update_seq()} | error().

get_update_seq(Ref) ->
rpc(Ref, get_update_seq).

-spec set_purge_seq(Ref :: indexer_pid(), Seq :: purge_seq()) ->
-spec set_purge_seq(Ref :: indexer_key(), Seq :: purge_seq()) ->
ok | error().

set_purge_seq(Ref, Seq) ->
rpc(Ref, {set_purge_seq, Seq}).

-spec get_purge_seq(Ref :: indexer_pid()) ->
-spec get_purge_seq(Ref :: indexer_key()) ->
{ok, purge_seq()} | error().

get_purge_seq(Ref) ->
Expand Down Expand Up @@ -157,11 +172,11 @@ get_purge_seq(Ref) ->
| {highlight_size, pos_integer()}
| {legacy, boolean()}.

-spec search(Ref :: indexer_pid(), Args :: [search_arg()]) ->
-spec search(Key :: indexer_key(), Args :: [search_arg()]) ->
{ok, #top_docs{}} | error().

search(Ref, Args) ->
case rpc(Ref, {search, Args}) of
search(Key, Args) ->
case rpc(Key, {search, Args}) of
{ok, Response} when is_list(Response) ->
{ok, #top_docs{
update_seq = couch_util:get_value(update_seq, Response),
Expand All @@ -181,7 +196,7 @@ search(Ref, Args) ->
| [field_name()].

-spec group1(
Ref :: indexer_pid(),
Key :: indexer_key(),
Query :: query(),
GroupBy :: field_name(),
Refresh :: boolean(),
Expand All @@ -190,8 +205,8 @@ search(Ref, Args) ->
Limit :: limit()
) -> {ok, [{field_name(), sort_values()}]} | error().

group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) ->
rpc(Ref, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}).
group1(Key, Query, GroupBy, Refresh, Sort, Offset, Limit) ->
rpc(Key, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}).

-type group_name() :: string_as_binary(_) | null.
-type sort_values() :: [string_as_binary(_) | null].
Expand All @@ -213,17 +228,17 @@ group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) ->
| {highlight_size, pos_integer()}.

-type grouped_results() :: [{field_name(), TotalHits :: non_neg_integer(), [#hit{}]}].
-spec group2(Ref :: indexer_pid(), Args :: [query_arg()]) ->
-spec group2(Key :: indexer_key(), Args :: [query_arg()]) ->
{ok, {TotalHits :: non_neg_integer(), TotalGroupedHits :: non_neg_integer(), grouped_results()}}.

group2(Ref, Args) ->
rpc(Ref, {group2, Args}).
group2(Key, Args) ->
rpc(Key, {group2, Args}).

-spec delete(Ref :: indexer_pid(), Id :: docid()) ->
-spec delete(Key :: indexer_key(), Id :: docid()) ->
ok.

delete(Ref, Id) ->
rpc(Ref, {delete, couch_util:to_binary(Id)}).
delete(Key, Id) ->
rpc(Key, {delete, couch_util:to_binary(Id)}).

-type docid() :: string_as_binary(_).

Expand All @@ -242,11 +257,11 @@ delete(Ref, Id) ->
-type yes_or_no() :: string_as_binary(yes) | string_as_binary(no).

-spec update(
Ref :: indexer_pid(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}]
Key :: indexer_key(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}]
) ->
ok.
update(Ref, Id, Fields) ->
rpc(Ref, {update, Id, Fields}).
update(Key, Id, Fields) ->
rpc(Key, {update, Id, Fields}).

-spec cleanup(DbName :: string_as_binary(_)) -> ok.
cleanup(DbName) ->
Expand Down Expand Up @@ -337,7 +352,13 @@ connected() ->
end
end.

rpc(Ref, Msg) ->
rpc(Key, Message) ->
{Ref, Msg} =
case Key of
{pid, Pid} -> {Pid, Message};
{path, Path} -> {main, {forward, Path, Message}};
Other -> Other
end,
ioq:call_search(Ref, Msg, erlang:get(io_priority)).

clouseau() ->
Expand Down
Loading