Skip to content

Commit

Permalink
Fix two separate but related problems with watch retry handling (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Mar 8, 2024
2 parents 3230791 + 99de3d3 commit 7eff2f2
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 14 deletions.
4 changes: 2 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def multi(watch: nil)
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, slot: slot
@router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
Expand Down
42 changes: 36 additions & 6 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,56 @@ class Cluster
class OptimisticLocking
def initialize(router)
@router = router
@asking = false
end

def watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

# We have not yet selected a node for this transaction, initially, which means we can handle
# redirections freely initially (i.e. for the first WATCH call)
node = @router.find_primary_node_by_slot(slot)
@router.handle_redirection(node, retry_count: 1) do |nd|
handle_redirection(node, retry_count: 1) do |nd|
nd.with do |c|
c.call('WATCH', *keys)
yield(c, slot)
rescue StandardError
c.call('UNWATCH')
raise
c.ensure_connected_cluster_scoped(retryable: false) do
c.call('ASKING') if @asking
c.call('WATCH', *keys)
begin
yield(c, slot, @asking)
rescue ::RedisClient::ConnectionError
# No need to unwatch on a connection error.
raise
rescue StandardError
c.call('UNWATCH')
raise
end
end
end
end
end

private

def handle_redirection(node, retry_count: 1, &blk)
@router.handle_redirection(node, retry_count: retry_count) do |nd|
handle_asking_once(nd, &blk)
end
end

def handle_asking_once(node)
yield node
rescue ::RedisClient::CommandError => e
raise unless ErrorIdentification.client_owns_error?(e, node)
raise unless e.message.start_with?('ASK')

node = @router.assign_asking_node(e.message)
@asking = true
yield node
ensure
@asking = false
end

def find_slot(keys)
return if keys.empty?
return if keys.any? { |k| k.nil? || k.empty? }
Expand Down
16 changes: 10 additions & 6 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
MAX_REDIRECTION = 2

def initialize(router, command_builder, node: nil, slot: nil)
def initialize(router, command_builder, node: nil, slot: nil, asking: false)
@router = router
@command_builder = command_builder
@retryable = true
Expand All @@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil)
@node = node
prepare_tx unless @node.nil?
@watching_slot = slot
@asking = asking
end

def call(*command, **kwargs, &block)
Expand Down Expand Up @@ -93,7 +94,11 @@ def prepare_tx

def settle
@pipeline.call('EXEC')
send_transaction(@node, redirect: MAX_REDIRECTION)
# If we needed ASKING on the watch, we need ASKING on the multi as well.
@node.call('ASKING') if @asking
# Don't handle redirections at this level if we're in a watch (the watcher handles redirections
# at the whole-transaction level.)
send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION)
end

def send_transaction(client, redirect:)
Expand All @@ -110,7 +115,8 @@ def send_pipeline(client, redirect:)
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero?
ensure_the_same_slot!(commands)
return handle_command_error!(e, redirect: redirect) unless redirect.zero?

raise
end
Expand Down Expand Up @@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1)
results
end

def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize
def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED')
ensure_the_same_slot!(commands)
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
elsif err.message.start_with?('ASK')
ensure_the_same_slot!(commands)
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
else
Expand Down
94 changes: 94 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_does_not_pointlessly_unwatch_on_success
@client.call('MSET', '{key}1', '0', '{key}2', '0')

@captured_commands.clear
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first))
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_unwatches_on_error
test_error = Class.new(StandardError)

@captured_commands.clear
assert_raises(test_error) do
@client.multi(watch: %w[{key}1 {key}2]) do
raise test_error, 'error!'
end
end

assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first))
end

def test_transaction_does_not_unwatch_on_connection_error
@captured_commands.clear
assert_raises(RedisClient::ConnectionError) do
@client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('SET', '{key}1', 'x')
tx.call('QUIT')
end
end
command_list = @captured_commands.to_a.map(&:command).map(&:first)
assert_includes(command_list, 'WATCH')
refute_includes(command_list, 'UNWATCH')
end

def test_transaction_does_not_retry_without_rewatching
client2 = new_test_client

@client.call('SET', 'key', 'original_value')

assert_raises(RedisClient::ConnectionError) do
@client.multi(watch: %w[key]) do |tx|
# Simulate all the connections closing behind the router's back
# Sending QUIT to redis makes the server side close the connection (and the client
# side thus get a RedisClient::ConnectionError)
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
node.clients.each do |conn|
conn.with(&:close)
end

# Now the second client sets the value, which should make this watch invalid
client2.call('SET', 'key', 'client2_value')

tx.call('SET', 'key', '@client_value')
# Committing this transaction will fail, not silently reconnect (without the watch!)
end
end

# The transaction did not commit.
assert_equal('client2_value', @client.call('GET', 'key'))
end

def test_transaction_with_watch_retries_block
client2 = new_test_client
call_count = 0

@client.call('SET', 'key', 'original_value')

@client.multi(watch: %w[key]) do |tx|
if call_count == 0
# Simulate all the connections closing behind the router's back
# Sending QUIT to redis makes the server side close the connection (and the client
# side thus get a RedisClient::ConnectionError)
node = @client.instance_variable_get(:@router).instance_variable_get(:@node)
node.clients.each do |conn|
conn.with(&:close)
end

# Now the second client sets the value, which should make this watch invalid
client2.call('SET', 'key', 'client2_value')
end
call_count += 1

tx.call('SET', 'key', "@client_value_#{call_count}")
end

# The transaction did commit (but it was the second time)
assert_equal('@client_value_2', @client.call('GET', 'key'))
assert_equal(2, call_count)
end

def test_transaction_with_error
@client.call('SET', 'key1', 'x')

Expand Down
29 changes: 29 additions & 0 deletions test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch
assert_equal(1, call_cnt)
end

def test_the_state_of_cluster_resharding_with_reexecuted_watch
client2 = new_test_client
call_cnt = 0

@client.call('SET', 'watch_key', 'original_value')
@client.multi(watch: %w[watch_key]) do |tx|
# Use client2 to change the value of watch_key, which would cause this transaction to fail
if call_cnt == 0
client2.call('SET', 'watch_key', 'client2_value')

# Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED
# redirection when it goes to commit. That should result in the entire block being retried
slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key')
src, dest = @controller.select_resharding_target(slot)
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
end
call_cnt += 1

tx.call('SET', 'watch_key', "@client_value_#{call_cnt}")
end
# It should have retried the entire transaction block.
assert_equal(2, call_cnt)
# The second call succeeded
assert_equal('@client_value_2', @client.call('GET', 'watch_key'))
ensure
client2&.close
end

def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
# This test is excercising a very delicate race condition; i think the use of @client to set
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
Expand Down

0 comments on commit 7eff2f2

Please sign in to comment.