Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7b6d913
Use read replicas for read operations
Mar 24, 2025
a133043
fix: compile error due to state
Mar 25, 2025
28c0261
fix: Export get_replicas_for_slot function and fix type specs
Mar 25, 2025
57a5d13
fix: get_all_pools to return both master and replicas
Mar 26, 2025
09f70e1
fix: fix get_all_pools again
Mar 26, 2025
cd5ba63
fix: Replace incorrect list:map in parse_cluster_slots
Mar 26, 2025
8dfc4a6
fix: Handle empty replica list correctly
Mar 27, 2025
0f7449d
fix: fix connect_ function
Mar 27, 2025
241a20b
fix: properly handle the SSL connection in the connect_ function
Mar 27, 2025
54f55d5
chore: update deps to be able to run independently
Mar 27, 2025
55ba8b9
fix: Add missing definitions
Mar 27, 2025
5529a13
fix: parsed_cluster_nodes and connect_ functions
Mar 27, 2025
9d090ee
fix: Remove duplicate def for record state
Mar 27, 2025
1a700fd
fix: handle no init nodes
Mar 27, 2025
a9a3e2e
feat: add lager config
Mar 27, 2025
f7adae4
feat: add more debug logging
Mar 27, 2025
66f6ef9
fix: update parse_cluster_nodes
Mar 27, 2025
48f3f43
fix: added error handling to cluster slots and cluster nodes commands
Mar 27, 2025
4c4e427
fix: fix port handling in parse_cluster_nodes
Mar 27, 2025
d5587cf
fix: properly close the connection after cluster info
Mar 27, 2025
46de06d
fix: fix variables
Mar 27, 2025
141e488
fix: fix variables again
Mar 27, 2025
1484952
fix: fix variable Error in try catch
Mar 27, 2025
d522654
fix: fix variable Error in try catch again
Mar 27, 2025
1f7f5b4
fix: fix variable Error in try catch some more
Mar 27, 2025
1dcde3d
fix: do not QUIT a closed connection
Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/sys.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[{lager, [
{handlers, [
{lager_console_backend, [
{level, debug},
{formatter, lager_default_formatter},
{formatter_config, [time, " [", severity, "] ", message, "\n"]}
]}
]}
]},
{eredis_cluster, [
{clusters, [
{memorydb_for_group_db, [
{init_nodes, [{"clustercfg.xmpp-group-memorydb-env7.ekur4t.memorydb.us-east-1.amazonaws.com", 6379}]},
{pool_max_overflow, 0},
{pool_size, 50}
]}
]}
]}].
43 changes: 40 additions & 3 deletions include/eredis_cluster.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,41 @@

-type options() :: [{term(), term()}].

%% @doc Record for Redis cluster node information
-record(node, {
address :: string(),
port :: integer(),
options :: options() | undefined, % not used for init_nodes
pool :: atom() % not used for init_nodes
role :: master | replica,
options = [] :: options(),
pool :: atom() | undefined
}).

%% @doc Record for Redis cluster slot mapping
-record(slots_map, {
index :: integer(),
start_slot :: integer(),
end_slot :: integer(),
index :: integer(),
node :: #node{}
}).

%% @doc Record for Redis cluster information
-record(cluster_info, {
nodes :: [#node{}],
slots :: [#slots_map{}]
}).

%% @doc Record for Redis cluster monitor state
-record(state, {
slots_maps = {} :: tuple(),
slots_table :: ets:tid(),
pool_sup :: pid(),
version = 0 :: integer(),
cluster_info :: #cluster_info{},
options = [] :: options(),
init_nodes = [] :: [#node{}],
node_options = [] :: options()
}).

-define(default_cluster, eredis_cluster_default).
-define(redis_cluster_request_max_retries, 16).
-define(optimistic_locking_transaction_max_retries, 16).
Expand Down Expand Up @@ -114,3 +135,19 @@
16#af,16#9b,16#bf,16#ba,16#8f,16#d9,16#9f,16#f8,
16#6e,16#17,16#7e,16#36,16#4e,16#55,16#5e,16#74,
16#2e,16#93,16#3e,16#b2,16#0e,16#d1,16#1e,16#f0>>).

-type redis_command_type() :: read | write | admin.

%% List of read-only commands that can be routed to replicas
-define(READ_COMMANDS, [
"get", "mget", "exists", "type", "ttl", "pttl", "strlen",
"llen", "scard", "sismember", "srandmember", "zcard",
"zcount", "zlexcount", "zrange", "zrangebyscore", "zrank",
"zrevrange", "zrevrangebyscore", "zrevrank", "zscore",
"hget", "hgetall", "hexists", "hkeys", "hlen", "hmget",
"hvals", "lindex", "lrange", "llen", "randomkey", "keys",
"scan", "sscan", "hscan", "zscan"
]).

-type connection() :: pid().
-type cluster_info() :: #cluster_info{}.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
, {preprocess, true}
]}.

{deps, [{eredis, "1.7.0"},
{deps, [{eredis, {git, "git@github.com:tigertext/eredis", {branch, "xmpp_only"}}},
{poolboy, {git, "git@github.com:tigertext/poolboy.git", {branch, "branch_1.5.2_tag"}}},
lager,
{lager_logstash, {git, "git@github.com:tigertext/lager_logstash.git", {tag, "0.1.4"}}}
Expand Down
13 changes: 9 additions & 4 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{"1.1.0",
[{<<"eredis">>,{pkg,<<"eredis">>,<<"1.7.0">>},0},
{"1.2.0",
[{<<"eredis">>,
{git,"git@github.com:tigertext/eredis",
{ref,"9427287576762abd0e69837d201ed9088c32c4ce"}},
0},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.9.2">>},0},
{<<"lager_logstash">>,
Expand All @@ -12,7 +15,9 @@
0}]}.
[
{pkg_hash,[
{<<"eredis">>, <<"7A9B2B51ECDE73AF3399B344C6C7FA5141E6C77DF5FACE86E41E8365A1E5B08F">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lager">>, <<"4CAB289120EB24964E3886BD22323CB5FEFE4510C076992A23AD18CF85413D8C">>}]}
{<<"lager">>, <<"4CAB289120EB24964E3886BD22323CB5FEFE4510C076992A23AD18CF85413D8C">>}]},
{pkg_hash_ext,[
{<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>},
{<<"lager">>, <<"7F904D9E87A8CB7E66156ED31768D1C8E26EBA1D54F4BC85B1AA4AC1F6340C28">>}]}
].
37 changes: 35 additions & 2 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ split_by_pools(Cluster, Commands) ->
split_by_pools([Command | T], Index, CmdAcc, MapAcc, State) ->
Key = get_key_from_command(Command),
Slot = get_key_slot(Key),
{Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State),
{Pool, _Version} = get_pool_for_command(Command, Slot, State),
{NewAcc1, NewAcc2} =
case lists:keyfind(Pool, 1, CmdAcc) of
false ->
Expand Down Expand Up @@ -1214,7 +1214,7 @@ arg_after_keyword(Keyword, [Arg|Args]) ->
end.

memory_arg([Subcommand | Args]) when is_binary(Subcommand) ->
memory_arg([binary_to_list(Subcommand) | Args]);
memory_arg([binary_to_list(Subcommand)|Args]);
memory_arg([Subcommand | Args]) ->
case string:to_lower(Subcommand) of
"usage" -> nth_arg(1, Args);
Expand All @@ -1238,6 +1238,39 @@ resource_queue_redesign_log(Cluster, Command, Result) ->
ok
end.

%% @doc Determine if a command is a read operation
-spec is_read_command(Command :: redis_command()) -> boolean().
is_read_command([Cmd | _]) when is_list(Cmd) ->
CmdStr = string:lowercase(Cmd),
lists:member(CmdStr, ?READ_COMMANDS);
is_read_command(_) ->
false.

%% @doc Get appropriate pool for a command based on operation type
-spec get_pool_for_command(Command :: redis_command(), Slot :: integer(), State :: term()) ->
{PoolName :: atom(), Version :: integer()}.
get_pool_for_command(Command, Slot, State) ->
case is_read_command(Command) of
true ->
%% For read operations, try to get a replica pool
ReplicaPools = eredis_cluster_monitor:get_replicas_for_slot(Slot, State),
case ReplicaPools of
[] ->
%% If no replicas available, fall back to master
{Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State),
lager:debug("Read command ~p routed to master node ~p (no replicas available)", [Command, Pool]),
{Pool, Version};
[Pool | _] ->
lager:debug("Read command ~p routed to replica node ~p", [Command, Pool]),
{Pool, eredis_cluster_monitor:get_state_version(State)}
end;
false ->
%% For write operations, always use master
{Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State),
lager:debug("Write command ~p routed to master node ~p", [Command, Pool]),
{Pool, Version}
end.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down
Loading