Skip to content

Commit

Permalink
Merge pull request #38 from emqx/copy-of-branch-0.7.x
Browse files Browse the repository at this point in the history
Merge branch 0.7.x to master
  • Loading branch information
terry-xiaoyu authored Feb 6, 2025
2 parents 0c729d5 + 958c4f5 commit 5d5912a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 23 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ jobs:
run: rebar3 eunit -v -c
- name: cover report
run: rebar3 cover -v
- uses: actions/upload-artifact@v3
- name: upload artifact
uses: actions/upload-artifact@v4
with:
name: cover
name: cover-rebar3_${{ matrix.erlang.rebar3 }}-otp_${{ matrix.erlang.otp }}
path: _build/test/cover
- name: teardown redis cluster
if: always()
Expand Down
8 changes: 8 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@
{report_type_mismatch, true},
{pretty_print, erl_pp},
{preprocess, true}]}.
{profiles, [
{test, [
{deps, [
{meck, "0.9.2"}
]},
{erl_opts, [nowarn_export_all]}
]}
]}.
29 changes: 16 additions & 13 deletions src/eredis_cluster_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ get_slot_samples(Name) ->

-spec reload_slots_map(State::#state{}) -> NewState::#state{}.
reload_slots_map(State = #state{pool_name = PoolName}) ->
NewState = case get_cluster_slots(State#state.init_nodes, State, 0) of
{error, _Reason} ->
NewState = case get_cluster_slots(State#state.init_nodes, State, []) of
{error, Reason} ->
logger:error("Failed to get cluster slots: ~p", [Reason]),
State;
[] -> State#state{version = State#state.version + 1};
[] ->
State#state{version = State#state.version + 1};
ClusterSlots ->
[close_connection(SlotsMap)
|| SlotsMap <- tuple_to_list(State#state.slots_maps)],
Expand All @@ -138,15 +140,15 @@ reload_slots_map(State = #state{pool_name = PoolName}) ->
set_state(PoolName, NewState),
NewState.

get_cluster_slots([], State, FailAcc) ->
case erlang:length(State#state.init_nodes) =:= FailAcc of
get_cluster_slots([], State, ErrAcc) ->
case erlang:length(State#state.init_nodes) =:= erlang:length(ErrAcc) of
true ->
{error, <<"ERR all nodes are down">>};
{error, {<<"ERR all nodes are down">>, ErrAcc}};
false ->
[]
end;

get_cluster_slots([Node|T], State, FailAcc) ->
get_cluster_slots([Node|T], State, ErrAcc) ->
case safe_eredis_start_link(Node, State) of
{ok,Connection} ->
case eredis:q(Connection, ["CLUSTER", "SLOTS"]) of
Expand All @@ -157,12 +159,13 @@ get_cluster_slots([Node|T], State, FailAcc) ->
{ok, ClusterInfo} ->
eredis:stop(Connection),
ClusterInfo;
_ ->
Err ->
logger:error("Failed to get cluster slots from redis node ~p: ~p", [Node, Err]),
eredis:stop(Connection),
get_cluster_slots(T, State, FailAcc+1)
end;
_ ->
get_cluster_slots(T, State, FailAcc+1)
get_cluster_slots(T, State, [{Node, cluster_slots_cmd, Err} | ErrAcc])
end;
Err ->
get_cluster_slots(T, State, [{Node, eredis_start_link, Err} | ErrAcc])
end.

-spec get_cluster_slots_from_single_node(#node{}) ->
Expand Down Expand Up @@ -271,7 +274,7 @@ connect_(PoolName, Opts) ->
init([PoolName, Opts]) ->
process_flag(trap_exit, true),
case connect_(PoolName, Opts) of
#state{init_nodes = []} ->
#state{slots = undefined} ->
{stop, <<"ERR unable to connect to any nodes">>};
State ->
true = gproc:reg({n, l, name(PoolName)}, ignored),
Expand Down
52 changes: 44 additions & 8 deletions test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@
-export([log/2]).

-define(POOL, ?MODULE).
-define(SERVERS, "127.0.0.1:30001,127.0.0.1:30002").
-record(state, {
init_nodes,
slots,
slots_maps,
version,
pool_name,
database,
username,
password,
pool_options
}).

-define(AUTH_PASS_ONLY_PASSWORD, "passw0rd").
-define(AUTH_USER_PASS_USERNAME, "test_user").
-define(AUTH_USER_PASS_PASSWORD, "test_passwd").

pool_opts(password_only) ->
[
{servers, [
{"127.0.0.1", 30001},
{"127.0.0.1", 30002}
]},
{servers, format_redis_servers(os:getenv("REDIS_NODE_LIST", ?SERVERS))},
{pool_size, 5},
{password, ?AUTH_PASS_ONLY_PASSWORD},
{pool_type, round_robin}
];
pool_opts(username_password) ->
[
{servers, [
{"127.0.0.1", 30001},
{"127.0.0.1", 30002}
]},
{servers, format_redis_servers(os:getenv("REDIS_NODE_LIST", ?SERVERS))},
{pool_size, 5},
{username, ?AUTH_USER_PASS_USERNAME},
{password, ?AUTH_USER_PASS_PASSWORD},
Expand Down Expand Up @@ -224,6 +230,27 @@ basic_test_cases(AuthMethod) ->
}
].

rainy_day_test_() ->
{setup, fun setup_username_password/0, fun cleanup/1,
[
{"get and set after redis is recovered from node failure",
fun() ->
meck:new(eredis, [passthrough]),
?assertEqual({ok, <<"OK">>}, eredis_cluster:q(?POOL, ["SET", "key", "value"])),
?assertEqual({ok, <<"value">>}, eredis_cluster:q(?POOL, ["GET","key"])),
#state{version = Vsn1} = eredis_cluster_monitor:get_state(?POOL),
meck:expect(eredis, q, fun(_, _) -> {error, no_connection} end),
meck:expect(eredis, q, fun(_, _, _) -> {error, no_connection} end),
?assertEqual({error, no_connection}, eredis_cluster:q(?POOL, ["GET","key"])),
meck:unload(),
?assertEqual({ok, <<"value">>}, eredis_cluster:q(?POOL, ["GET","key"])),
#state{version = Vsn2, init_nodes = InitNodes} = eredis_cluster_monitor:get_state(?POOL),
?assertEqual(Vsn1, Vsn2),
?assert(length(InitNodes) > 0)
end}
]
}.

censor_test_() ->
[
{
Expand Down Expand Up @@ -308,3 +335,12 @@ log(#{msg := Msg}, #{relay_to := Pid}) ->
Pid ! {log, Msg};
log(_, _) ->
ok.

format_redis_servers(Servers) ->
[format_server(Server) || Server <- string:tokens(Servers, ",")].

format_server(Servers) ->
case string:tokens(Servers, ":") of
[Domain] -> {Domain, 6379};
[Domain, Port] -> {Domain, list_to_integer(Port)}
end.

0 comments on commit 5d5912a

Please sign in to comment.