Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign a new node after calling update_cluster_info! #355

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(slot, retry_count: 1) do |nd|
nd.with do |c|
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
Expand All @@ -45,10 +42,22 @@ def watch(keys) # rubocop:disable Metrics/AbcSize

private

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
def handle_redirection(slot, retry_count: 1, &blk)
# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
times_block_executed = 0
@router.handle_redirection(node, nil, retry_count: retry_count) do |nd|
times_block_executed += 1
handle_asking_once(nd, &blk)
end
rescue ::RedisClient::ConnectionError
# Deduct the number of retries that happened _inside_ router#handle_redirection from our remaining
# _external_ retries. Always deduct at least one in case handle_redirection raises without trying the block.
retry_count -= [times_block_executed, 1].min
raise if retry_count < 0

retry
end

def handle_asking_once(node)
Expand Down
17 changes: 14 additions & 3 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi

# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, command, retry_count: retry_count) do |on_node|
if args.empty?
# prevent memory allocation for variable-length args
on_node.public_send(method, command, &block)
Expand All @@ -101,12 +101,12 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, retry_count: retry_count) do |on_node|
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
Expand Down Expand Up @@ -134,6 +134,17 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me

retry_count -= 1
renew_cluster_state

if retry_count >= 0
# Find the node to use for this command - if this fails for some reason, though, re-use
# the old node.
begin
node = find_node(find_node_key(command)) if command
rescue StandardError # rubocop:disable Lint/SuppressedException
end
retry
end

retry if retry_count >= 0
raise
end
Expand Down
2 changes: 2 additions & 0 deletions test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def initialize(node_addrs,
@debug = ENV.fetch('DEBUG', '0')
end

attr_reader :clients

def wait_for_cluster_to_be_ready(skip_clients: [])
print_debug('wait for nodes to be recognized...')
wait_meeting(@clients, max_attempts: @max_attempts)
Expand Down
65 changes: 65 additions & 0 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'logger'
require 'json'
require 'testing_helper'
require 'securerandom'

class TestAgainstClusterBroken < TestingWrapper
WAIT_SEC = 0.1
Expand Down Expand Up @@ -54,6 +55,41 @@ def test_client_patience
do_assertions(offset: 3)
end

def test_reloading_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@clients[0].call('SET', test_key, 'foobar1')

# Shut the node down.
kill_a_node_and_wait_for_failover(sacrifice)

# When we try and fetch the key, it'll attempt to connect to the broken node, and
# thus trigger a reload of the cluster topology.
assert_equal 'OK', @clients[0].call('SET', test_key, 'foobar2')
end

def test_transaction_retry_on_connection_error
sacrifice = @controller.select_sacrifice_of_primary
# Find a key which lives on the sacrifice node
test_key = generate_key_for_node(sacrifice)
@clients[0].call('SET', test_key, 'foobar1')

call_count = 0
# Begin a transaction, but shut the node down after the WATCH is issued
res = @clients[0].multi(watch: [test_key]) do |tx|
kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0
call_count += 1
tx.call('SET', test_key, 'foobar2')
end

# The transaction should have retried once and successfully completed
# the second time.
assert_equal ['OK'], res
assert_equal 'foobar2', @clients[0].call('GET', test_key)
assert_equal 2, call_count
end

private

def prepare_test_data
Expand Down Expand Up @@ -129,6 +165,18 @@ def do_assertions(offset:)
end
end

def generate_key_for_node(conn)
# Figure out a slot on the the sacrifice node, and a key in that slot.
conn_id = conn.call('CLUSTER', 'MYID')
conn_slots = conn.call('CLUSTER', 'SLOTS')
.select { |res| res[2][2] == conn_id }
.flat_map { |res| (res[0]..res[1]).to_a }
loop do
test_key = SecureRandom.hex
return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key))
end
end

def wait_for_replication(client)
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
Expand Down Expand Up @@ -211,6 +259,23 @@ def retryable(attempts: MAX_ATTEMPTS, wait_sec: WAIT_SEC)
end
end

def kill_a_node_and_wait_for_failover(sacrifice)
other_client = @controller.clients.reject { _1 == sacrifice }.first
sacrifice_id = sacrifice.call('CLUSTER', 'MYID')
kill_a_node(sacrifice)
failover_checks = 0
loop do
raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30

# Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS.
cluster_slots = other_client.call('CLUSTER', 'SLOTS')
break unless cluster_slots.any? { _1[2][2] == sacrifice_id }

sleep 1
failover_checks += 1
end
end

def build_client(
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
Expand Down