From 954668291e665a59e264f906355fa86f064abf37 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 20 Feb 2024 18:21:45 +1100 Subject: [PATCH 1/2] Improve handling of retries around #watch * It's important that all of a transaction actually happens on the same connection, with no transparent reconnection allowed inside ::RedisClient. So, we wrap a watch transaction in ensure_connected_cluster_scoped(retryable: false). * We don't need to call UNWATCH on connection errors (since the connection state is already broken). redis-rb and RedisClient don't do this either. --- .../cluster/optimistic_locking.rb | 19 +++- test/redis_client/test_cluster.rb | 94 +++++++++++++++++++ 2 files changed, 108 insertions(+), 5 deletions(-) diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index 8db9a9dd..dae098bd 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -14,14 +14,23 @@ def watch(keys) slot = find_slot(keys) raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil? + # We have not yet selected a node for this transaction, initially, which means we can handle + # redirections freely initially (i.e. for the first WATCH call) node = @router.find_primary_node_by_slot(slot) @router.handle_redirection(node, retry_count: 1) do |nd| nd.with do |c| - c.call('WATCH', *keys) - yield(c, slot) - rescue StandardError - c.call('UNWATCH') - raise + c.ensure_connected_cluster_scoped(retryable: false) do + c.call('WATCH', *keys) + begin + yield(c, slot) + rescue ::RedisClient::ConnectionError + # No need to unwatch on a connection error. + raise + rescue StandardError + c.call('UNWATCH') + raise + end + end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index ec2944b6..0b8cead7 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) end + def test_transaction_does_not_pointlessly_unwatch_on_success + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + @captured_commands.clear + @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + end + assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first)) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) + end + + def test_transaction_unwatches_on_error + test_error = Class.new(StandardError) + + @captured_commands.clear + assert_raises(test_error) do + @client.multi(watch: %w[{key}1 {key}2]) do + raise test_error, 'error!' + end + end + + assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first)) + end + + def test_transaction_does_not_unwatch_on_connection_error + @captured_commands.clear + assert_raises(RedisClient::ConnectionError) do + @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('SET', '{key}1', 'x') + tx.call('QUIT') + end + end + command_list = @captured_commands.to_a.map(&:command).map(&:first) + assert_includes(command_list, 'WATCH') + refute_includes(command_list, 'UNWATCH') + end + + def test_transaction_does_not_retry_without_rewatching + client2 = new_test_client + + @client.call('SET', 'key', 'original_value') + + assert_raises(RedisClient::ConnectionError) do + @client.multi(watch: %w[key]) do |tx| + # Simulate all the connections closing behind the router's back + # Sending QUIT to redis makes the server side close the connection (and the client + # side thus get a RedisClient::ConnectionError) + node = @client.instance_variable_get(:@router).instance_variable_get(:@node) + node.clients.each do |conn| + conn.with(&:close) + end + + # Now the second client sets the value, which should make this watch invalid + client2.call('SET', 'key', 'client2_value') + + tx.call('SET', 'key', '@client_value') + # Committing this transaction will fail, not silently reconnect (without the watch!) + end + end + + # The transaction did not commit. + assert_equal('client2_value', @client.call('GET', 'key')) + end + + def test_transaction_with_watch_retries_block + client2 = new_test_client + call_count = 0 + + @client.call('SET', 'key', 'original_value') + + @client.multi(watch: %w[key]) do |tx| + if call_count == 0 + # Simulate all the connections closing behind the router's back + # Sending QUIT to redis makes the server side close the connection (and the client + # side thus get a RedisClient::ConnectionError) + node = @client.instance_variable_get(:@router).instance_variable_get(:@node) + node.clients.each do |conn| + conn.with(&:close) + end + + # Now the second client sets the value, which should make this watch invalid + client2.call('SET', 'key', 'client2_value') + end + call_count += 1 + + tx.call('SET', 'key', "@client_value_#{call_count}") + end + + # The transaction did commit (but it was the second time) + assert_equal('@client_value_2', @client.call('GET', 'key')) + assert_equal(2, call_count) + end + def test_transaction_with_error @client.call('SET', 'key1', 'x') From 99de3d3b06deeb0a598b2d8a3308b89bf55d8401 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 20 Feb 2024 19:09:46 +1100 Subject: [PATCH 2/2] Don't attempt redirection on the actual MULTI for WATCH...MULTI If we have a WATCH, then the MULTI _must_ exec on exactly that node; it should not be allowed for that to be redirected to a different node, because then the MULTI isn't on the same connection as the WATCH anymore! The first part of this patch fixes this by disabling redirection handling in transaction.rb if we are in a watch. I also added a test test_the_state_of_cluster_resharding_with_reexecuted_watch for this. However, the second part of this patch is a lot trickier... This change causes a different test, test_the_state_of_cluster_resharding_with_transaction_and_watch, to break. That test is asserting that - if we watch something where the slot is in the middle of migrating between nodes, - and all the keys we wanted to watch are on the new node, - that the client correctly retries the transaction with ASKING against the new node Disabling redirection handling obviously makes this stop working, and it _is_ possible to handle this case correctly. We need to record whether or not we had to issue an ASKING on the WATCH for the transaction, and if so, pre-emptively issue an ASKING on the MULTI too. That's because this slot is not yet actually assigned to the node we're connected to (it's IMPORTING). It may well not be worth it, and I'm also OK with just failing WATCH/MULTI on slots which are currently migrating. That would imply: - Keeping test_the_state_of_cluster_resharding_with_reexecuted_watch - Deleting test_the_state_of_cluster_resharding_with_transaction_and_watch --- lib/redis_client/cluster.rb | 4 +-- .../cluster/optimistic_locking.rb | 25 ++++++++++++++-- lib/redis_client/cluster/transaction.rb | 16 ++++++---- test/test_against_cluster_state.rb | 29 +++++++++++++++++++ 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index d7fbc32d..72368464 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -98,9 +98,9 @@ def multi(watch: nil) return transaction.execute end - ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot| + ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking| transaction = ::RedisClient::Cluster::Transaction.new( - @router, @command_builder, node: c, slot: slot + @router, @command_builder, node: c, slot: slot, asking: asking ) yield transaction transaction.execute diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index dae098bd..8a7651ea 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -8,6 +8,7 @@ class Cluster class OptimisticLocking def initialize(router) @router = router + @asking = false end def watch(keys) @@ -17,12 +18,13 @@ def watch(keys) # We have not yet selected a node for this transaction, initially, which means we can handle # redirections freely initially (i.e. for the first WATCH call) node = @router.find_primary_node_by_slot(slot) - @router.handle_redirection(node, retry_count: 1) do |nd| + handle_redirection(node, retry_count: 1) do |nd| nd.with do |c| c.ensure_connected_cluster_scoped(retryable: false) do + c.call('ASKING') if @asking c.call('WATCH', *keys) begin - yield(c, slot) + yield(c, slot, @asking) rescue ::RedisClient::ConnectionError # No need to unwatch on a connection error. raise @@ -37,6 +39,25 @@ def watch(keys) private + def handle_redirection(node, retry_count: 1, &blk) + @router.handle_redirection(node, retry_count: retry_count) do |nd| + handle_asking_once(nd, &blk) + end + end + + def handle_asking_once(node) + yield node + rescue ::RedisClient::CommandError => e + raise unless ErrorIdentification.client_owns_error?(e, node) + raise unless e.message.start_with?('ASK') + + node = @router.assign_asking_node(e.message) + @asking = true + yield node + ensure + @asking = false + end + def find_slot(keys) return if keys.empty? return if keys.any? { |k| k.nil? || k.empty? } diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 767cecbf..02adb6f1 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -9,7 +9,7 @@ class Transaction ConsistencyError = Class.new(::RedisClient::Error) MAX_REDIRECTION = 2 - def initialize(router, command_builder, node: nil, slot: nil) + def initialize(router, command_builder, node: nil, slot: nil, asking: false) @router = router @command_builder = command_builder @retryable = true @@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil) @node = node prepare_tx unless @node.nil? @watching_slot = slot + @asking = asking end def call(*command, **kwargs, &block) @@ -93,7 +94,11 @@ def prepare_tx def settle @pipeline.call('EXEC') - send_transaction(@node, redirect: MAX_REDIRECTION) + # If we needed ASKING on the watch, we need ASKING on the multi as well. + @node.call('ASKING') if @asking + # Don't handle redirections at this level if we're in a watch (the watcher handles redirections + # at the whole-transaction level.) + send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION) end def send_transaction(client, redirect:) @@ -110,7 +115,8 @@ def send_pipeline(client, redirect:) client.middlewares.call_pipelined(commands, client.config) do connection.call_pipelined(commands, nil) rescue ::RedisClient::CommandError => e - return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero? + ensure_the_same_slot!(commands) + return handle_command_error!(e, redirect: redirect) unless redirect.zero? raise end @@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1) results end - def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize + def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize if err.message.start_with?('CROSSSLOT') raise ConsistencyError, "#{err.message}: #{err.command}" elsif err.message.start_with?('MOVED') - ensure_the_same_slot!(commands) node = @router.assign_redirection_node(err.message) send_transaction(node, redirect: redirect - 1) elsif err.message.start_with?('ASK') - ensure_the_same_slot!(commands) node = @router.assign_asking_node(err.message) try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err else diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 1cd4753d..27ec09bc 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch assert_equal(1, call_cnt) end + def test_the_state_of_cluster_resharding_with_reexecuted_watch + client2 = new_test_client + call_cnt = 0 + + @client.call('SET', 'watch_key', 'original_value') + @client.multi(watch: %w[watch_key]) do |tx| + # Use client2 to change the value of watch_key, which would cause this transaction to fail + if call_cnt == 0 + client2.call('SET', 'watch_key', 'client2_value') + + # Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED + # redirection when it goes to commit. That should result in the entire block being retried + slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key') + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + end + call_cnt += 1 + + tx.call('SET', 'watch_key', "@client_value_#{call_cnt}") + end + # It should have retried the entire transaction block. + assert_equal(2, call_cnt) + # The second call succeeded + assert_equal('@client_value_2', @client.call('GET', 'watch_key')) + ensure + client2&.close + end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection # This test is excercising a very delicate race condition; i think the use of @client to set # the keys in do_resharding_test is actually causing the race condition not to happen, so this