diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index d7fbc32d..72368464 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -98,9 +98,9 @@ def multi(watch: nil) return transaction.execute end - ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot| + ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot, asking| transaction = ::RedisClient::Cluster::Transaction.new( - @router, @command_builder, node: c, slot: slot + @router, @command_builder, node: c, slot: slot, asking: asking ) yield transaction transaction.execute diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index 8db9a9dd..8a7651ea 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -8,26 +8,56 @@ class Cluster class OptimisticLocking def initialize(router) @router = router + @asking = false end def watch(keys) slot = find_slot(keys) raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil? + # We have not yet selected a node for this transaction, initially, which means we can handle + # redirections freely initially (i.e. for the first WATCH call) node = @router.find_primary_node_by_slot(slot) - @router.handle_redirection(node, retry_count: 1) do |nd| + handle_redirection(node, retry_count: 1) do |nd| nd.with do |c| - c.call('WATCH', *keys) - yield(c, slot) - rescue StandardError - c.call('UNWATCH') - raise + c.ensure_connected_cluster_scoped(retryable: false) do + c.call('ASKING') if @asking + c.call('WATCH', *keys) + begin + yield(c, slot, @asking) + rescue ::RedisClient::ConnectionError + # No need to unwatch on a connection error. + raise + rescue StandardError + c.call('UNWATCH') + raise + end + end end end end private + def handle_redirection(node, retry_count: 1, &blk) + @router.handle_redirection(node, retry_count: retry_count) do |nd| + handle_asking_once(nd, &blk) + end + end + + def handle_asking_once(node) + yield node + rescue ::RedisClient::CommandError => e + raise unless ErrorIdentification.client_owns_error?(e, node) + raise unless e.message.start_with?('ASK') + + node = @router.assign_asking_node(e.message) + @asking = true + yield node + ensure + @asking = false + end + def find_slot(keys) return if keys.empty? return if keys.any? { |k| k.nil? || k.empty? } diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 767cecbf..02adb6f1 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -9,7 +9,7 @@ class Transaction ConsistencyError = Class.new(::RedisClient::Error) MAX_REDIRECTION = 2 - def initialize(router, command_builder, node: nil, slot: nil) + def initialize(router, command_builder, node: nil, slot: nil, asking: false) @router = router @command_builder = command_builder @retryable = true @@ -18,6 +18,7 @@ def initialize(router, command_builder, node: nil, slot: nil) @node = node prepare_tx unless @node.nil? @watching_slot = slot + @asking = asking end def call(*command, **kwargs, &block) @@ -93,7 +94,11 @@ def prepare_tx def settle @pipeline.call('EXEC') - send_transaction(@node, redirect: MAX_REDIRECTION) + # If we needed ASKING on the watch, we need ASKING on the multi as well. + @node.call('ASKING') if @asking + # Don't handle redirections at this level if we're in a watch (the watcher handles redirections + # at the whole-transaction level.) + send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION) end def send_transaction(client, redirect:) @@ -110,7 +115,8 @@ def send_pipeline(client, redirect:) client.middlewares.call_pipelined(commands, client.config) do connection.call_pipelined(commands, nil) rescue ::RedisClient::CommandError => e - return handle_command_error!(commands, e, redirect: redirect) unless redirect.zero? + ensure_the_same_slot!(commands) + return handle_command_error!(e, redirect: redirect) unless redirect.zero? raise end @@ -139,15 +145,13 @@ def coerce_results!(results, offset: 1) results end - def handle_command_error!(commands, err, redirect:) # rubocop:disable Metrics/AbcSize + def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize if err.message.start_with?('CROSSSLOT') raise ConsistencyError, "#{err.message}: #{err.command}" elsif err.message.start_with?('MOVED') - ensure_the_same_slot!(commands) node = @router.assign_redirection_node(err.message) send_transaction(node, redirect: redirect - 1) elsif err.message.start_with?('ASK') - ensure_the_same_slot!(commands) node = @router.assign_asking_node(err.message) try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err else diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index ec2944b6..0b8cead7 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -295,6 +295,100 @@ def test_transaction_with_meaningless_watch assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) end + def test_transaction_does_not_pointlessly_unwatch_on_success + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + @captured_commands.clear + @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + end + assert_equal(%w[WATCH MULTI SET SET EXEC], @captured_commands.to_a.map(&:command).map(&:first)) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) + end + + def test_transaction_unwatches_on_error + test_error = Class.new(StandardError) + + @captured_commands.clear + assert_raises(test_error) do + @client.multi(watch: %w[{key}1 {key}2]) do + raise test_error, 'error!' + end + end + + assert_equal(%w[WATCH UNWATCH], @captured_commands.to_a.map(&:command).map(&:first)) + end + + def test_transaction_does_not_unwatch_on_connection_error + @captured_commands.clear + assert_raises(RedisClient::ConnectionError) do + @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('SET', '{key}1', 'x') + tx.call('QUIT') + end + end + command_list = @captured_commands.to_a.map(&:command).map(&:first) + assert_includes(command_list, 'WATCH') + refute_includes(command_list, 'UNWATCH') + end + + def test_transaction_does_not_retry_without_rewatching + client2 = new_test_client + + @client.call('SET', 'key', 'original_value') + + assert_raises(RedisClient::ConnectionError) do + @client.multi(watch: %w[key]) do |tx| + # Simulate all the connections closing behind the router's back + # Sending QUIT to redis makes the server side close the connection (and the client + # side thus get a RedisClient::ConnectionError) + node = @client.instance_variable_get(:@router).instance_variable_get(:@node) + node.clients.each do |conn| + conn.with(&:close) + end + + # Now the second client sets the value, which should make this watch invalid + client2.call('SET', 'key', 'client2_value') + + tx.call('SET', 'key', '@client_value') + # Committing this transaction will fail, not silently reconnect (without the watch!) + end + end + + # The transaction did not commit. + assert_equal('client2_value', @client.call('GET', 'key')) + end + + def test_transaction_with_watch_retries_block + client2 = new_test_client + call_count = 0 + + @client.call('SET', 'key', 'original_value') + + @client.multi(watch: %w[key]) do |tx| + if call_count == 0 + # Simulate all the connections closing behind the router's back + # Sending QUIT to redis makes the server side close the connection (and the client + # side thus get a RedisClient::ConnectionError) + node = @client.instance_variable_get(:@router).instance_variable_get(:@node) + node.clients.each do |conn| + conn.with(&:close) + end + + # Now the second client sets the value, which should make this watch invalid + client2.call('SET', 'key', 'client2_value') + end + call_count += 1 + + tx.call('SET', 'key', "@client_value_#{call_count}") + end + + # The transaction did commit (but it was the second time) + assert_equal('@client_value_2', @client.call('GET', 'key')) + assert_equal(2, call_count) + end + def test_transaction_with_error @client.call('SET', 'key1', 'x') diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 1cd4753d..27ec09bc 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -103,6 +103,35 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch assert_equal(1, call_cnt) end + def test_the_state_of_cluster_resharding_with_reexecuted_watch + client2 = new_test_client + call_cnt = 0 + + @client.call('SET', 'watch_key', 'original_value') + @client.multi(watch: %w[watch_key]) do |tx| + # Use client2 to change the value of watch_key, which would cause this transaction to fail + if call_cnt == 0 + client2.call('SET', 'watch_key', 'client2_value') + + # Now perform (and _finish_) a reshard, which should make this transaction receive a MOVED + # redirection when it goes to commit. That should result in the entire block being retried + slot = ::RedisClient::Cluster::KeySlotConverter.convert('watch_key') + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + end + call_cnt += 1 + + tx.call('SET', 'watch_key', "@client_value_#{call_cnt}") + end + # It should have retried the entire transaction block. + assert_equal(2, call_cnt) + # The second call succeeded + assert_equal('@client_value_2', @client.call('GET', 'watch_key')) + ensure + client2&.close + end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection # This test is excercising a very delicate race condition; i think the use of @client to set # the keys in do_resharding_test is actually causing the race condition not to happen, so this