Skip to content

Commit

Permalink
fix: ensure recoverability from cluster down for pubsub and transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Sep 26, 2024
1 parent b0fbffa commit 5bac9a8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
53 changes: 33 additions & 20 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma

case event = @queue.pop(true)
when ::RedisClient::CommandError
if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
@router.renew_cluster_state
break start_over
end
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')

raise event
@router.renew_cluster_state
break start_over
when ::RedisClient::ConnectionError
@router.renew_cluster_state
break start_over
when StandardError then raise event
when Array then break event
end
Expand All @@ -114,25 +115,20 @@ def _call(command)
end
end

def call_to_single_state(command, retry_count: 1)
def call_to_single_state(command)
node_key = @router.find_node_key(command)
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key].call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
retry_count -= 1
retry_count >= 0 ? retry : raise

handle_connection_error(node_key) do
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key].call(command)
end
end

def call_to_all_states(command)
@state_dict.each do |node_key, state|
state.call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
handle_connection_error(node_key, ignore: true) do
state.call(command)
end
end
end

Expand All @@ -152,10 +148,27 @@ def calc_max_duration(timeout)
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
end

def handle_connection_error(node_key, ignore: false)
yield
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
raise unless ignore
end

def start_over
@state_dict.each_value(&:close)
@state_dict.clear
@commands.each { |command| _call(command) }
@commands.each do |command|
loop do
_call(command)
break
rescue ::RedisClient::ConnectionError
sleep 1.0
end
end

nil
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ def assign_node(command)
def find_node_key_by_key(key, seed: nil, primary: false)
if key && !key.empty?
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
node_key = primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
if node_key.nil?
renew_cluster_state
raise ::RedisClient::Cluster::NodeMightBeDown
end
node_key
else
primary ? @node.any_primary_node_key(seed: seed) : @node.any_replica_node_key(seed: seed)
end
Expand Down

0 comments on commit 5bac9a8

Please sign in to comment.