Skip to content

Commit

Permalink
Assign a new node after calling update_cluster_info!
Browse files Browse the repository at this point in the history
If we catch a connection error and refresh the cluster topology, we need
to re-calculate what node to send the command to in the router; the node
we're using might not even be a valid node any longer.
  • Loading branch information
KJTsanaktsidis committed Apr 26, 2024
1 parent b272f19 commit 4eba87e
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
21 changes: 15 additions & 6 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ def watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(slot, retry_count: 1) do |nd|
nd.with do |c|
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
Expand All @@ -39,10 +36,22 @@ def watch(keys)

private

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
def handle_redirection(slot, retry_count: 1, &blk)
# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
times_block_executed = 0
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
times_block_executed += 1
handle_asking_once(nd, &blk)
end
rescue ::RedisClient::ConnectionError
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
retry_count -= [times_block_executed, 1].min
raise if retry_count < 0

retry
end

def handle_asking_once(node)
Expand Down
20 changes: 16 additions & 4 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi

# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, command, retry_count: retry_count) do |on_node|
if args.empty?
# prevent memory allocation for variable-length args
on_node.public_send(method, command, &block)
Expand All @@ -90,12 +90,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
Expand Down Expand Up @@ -124,7 +124,19 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me

update_cluster_info!

raise if retry_count <= 0
# if command.nil?, then we don't have a way to know what node this command should be done on
# under the new topology. It might be e.g. part of a transaction which needs to be retried at a
# higher level.
raise if retry_count <= 0 || command.nil?

# Find the node to use for this command - if this fails for some reason, though, re-raise
# the original connection error.
node = begin
find_node(find_node_key(command), retry_count: 0)
rescue StandardError
nil
end
raise unless node

retry_count -= 1
retry
Expand Down
2 changes: 2 additions & 0 deletions test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def initialize(node_addrs,
@debug = ENV.fetch('DEBUG', '0')
end

attr_reader :clients

def wait_for_cluster_to_be_ready
print_debug('wait for nodes to be recognized...')
wait_meeting(@clients, max_attempts: @max_attempts)
Expand Down
65 changes: 65 additions & 0 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'testing_helper'
require 'securerandom'

class TestAgainstClusterBroken < TestingWrapper
WAIT_SEC = 3
Expand Down Expand Up @@ -34,8 +35,55 @@ def test_a_primary_is_down
do_test_a_node_is_down(sacrifice, number_of_keys: 10)
end

def test_reloading_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@client.call('SET', test_key, 'foobar1')

# Shut the node down.
kill_a_node_and_wait_for_failover(sacrifice)

# When we try and fetch the key, it'll attempt to connect to the broken node, and
# thus trigger a reload of the cluster topology.
assert_equal 'OK', @client.call('SET', test_key, 'foobar2')
end

def test_transaction_retry_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@client.call('SET', test_key, 'foobar1')

call_count = 0
# Begin a transaction, but shut the node down after the WATCH is issued
res = @client.multi(watch: [test_key]) do |tx|
kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0
call_count += 1
tx.call('SET', test_key, 'foobar2')
end

# The transaction should have retried once and successfully completed
# the second time.
assert_equal ['OK'], res
assert_equal 'foobar2', @client.call('GET', test_key)
assert_equal 2, call_count
end

private

def generate_key_for_node(conn)
# Figure out a slot on the the sacrifice node, and a key in that slot.
conn_id = conn.call('CLUSTER', 'MYID')
conn_slots = conn.call('CLUSTER', 'SLOTS')
.select { |res| res[2][2] == conn_id }
.flat_map { |res| (res[0]..res[1]).to_a }
loop do
test_key = SecureRandom.hex
return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key))
end
end

def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
Expand Down Expand Up @@ -78,6 +126,23 @@ def kill_a_node(sacrifice, kill_attempts:)
assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') }
end

def kill_a_node_and_wait_for_failover(sacrifice)
other_client = @controller.clients.reject { _1 == sacrifice }.first
sacrifice_id = sacrifice.call('CLUSTER', 'MYID')
kill_a_node(sacrifice, kill_attempts: 10)
failover_checks = 0
loop do
raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30

# Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS.
cluster_slots = other_client.call('CLUSTER', 'SLOTS')
break unless cluster_slots.any? { _1[2][2] == sacrifice_id }

sleep 1
failover_checks += 1
end
end

def wait_for_cluster_to_be_ready(wait_attempts:)
loop do
break if wait_attempts <= 0 || @client.call('PING') == 'PONG'
Expand Down

0 comments on commit 4eba87e

Please sign in to comment.