From 3b933292204ac5a874a10b13284504dfcf3b60cf Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Fri, 16 Feb 2024 17:38:15 +0900 Subject: [PATCH 1/6] fix: don't call the block twice for the transaction --- lib/redis_client/cluster.rb | 6 +- lib/redis_client/cluster/transaction.rb | 99 +++++++++++++++++++------ test/redis_client/test_cluster.rb | 2 +- 3 files changed, 83 insertions(+), 24 deletions(-) diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 02281105..4bc8d3ef 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -89,8 +89,10 @@ def pipelined pipeline.execute end - def multi(watch: nil, &block) - ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) + def multi(watch: nil) + transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch) + yield transaction + transaction.execute end def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 7302a283..e7333402 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -1,56 +1,113 @@ # frozen_string_literal: true require 'redis_client' +require 'redis_client/cluster/pipeline' class RedisClient class Cluster class Transaction ConsistencyError = Class.new(::RedisClient::Error) - def initialize(router, command_builder) + def initialize(router, command_builder, watch) @router = router @command_builder = command_builder - @node_key = nil + @watch = watch + @pipeline = ::RedisClient::Pipeline.new(@command_builder) + @node = nil + @retryable = true end - def call(*command, **kwargs, &_) + def call(*command, **kwargs, &block) command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + prepare_once(command) + @pipeline.call_v(command, &block) end - def call_v(command, &_) + def call_v(command, &block) command = @command_builder.generate(command) - ensure_node_key(command) + prepare_once(command) + @pipeline.call_v(command, &block) end - def call_once(*command, **kwargs, &_) + def call_once(*command, **kwargs, &block) + @retryable = false command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + prepare_once(command) + @pipeline.call_once_v(command, &block) end - def call_once_v(command, &_) + def call_once_v(command, &block) + @retryable = false command = @command_builder.generate(command) - ensure_node_key(command) + prepare_once(command) + @pipeline.call_once_v(command, &block) end - def execute(watch: nil, &block) - yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? - - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) + def execute + case @node + when ::RedisClient then send_pipeline(@node, @pipeline, @watch, @retryable) + when ::RedisClient::Pooled then @node.with { |node| send_pipeline(node, @pipeline, @watch, @retryable) } + when nil then raise ArgumentError, 'empty transaction' + else raise NotImplementedError, "#{client.class.name}#multi for cluster client" + end end private - def ensure_node_key(command) + def prepare_once(command) + return unless @node.nil? + node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? + raise ConsistencyError, "cloud not find the node: #{command.join(' ')}" if node_key.nil? + + @node = @router.find_node(node_key) + @pipeline.call('WATCH', *@watch) unless @watch.nil? || @watch.empty? + @pipeline.call('MULTI') + end + + def send_pipeline(client, pipeline, watch, retryable) + pipeline.call('EXEC') + pipeline.call('UNWATCH', *watch) unless watch.nil? || watch.empty? + + results = client.ensure_connected_cluster_scoped(retryable: retryable) do |connection| + commands = pipeline._commands + client.middlewares.call_pipelined(commands, client.config) do + connection.call_pipelined(commands, nil) + rescue ::RedisClient::CommandError => e + handle_command_error!(commands, e) + end + end + + pipeline._coerce!(results) + idx = watch.nil? || watch.empty? ? -1 : -2 + results[idx] + end + + def handle_command_error!(commands, err) + if err.message.start_with?('CROSSSLOT') + raise ConsistencyError, err.message + elsif err.message.start_with?('MOVED', 'ASK') + handle_redirection(commands, err) + else + raise err + end + end + + def handle_redirection(commands, err) + ensure_the_same_node!(commands) + + # TODO: fix the handling + raise ConsistencyError, err.message + end - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key + def ensure_the_same_node!(commands) + commands.each do |command| + node_key = @router.find_primary_node_key(command) + next if node_key.nil? - nil + node = @router.find_node(node_key) + raise ConsistencyError, 'the transaction should be executed to the same slot in the same node' if @node != node + end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 433a9af6..c511c132 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -234,7 +234,7 @@ def test_transaction_without_hashtag end end - assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key1', '1', 'key3', '3') From eb53ecda58576440a8e26b337662f378ccd07a9c Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Fri, 16 Feb 2024 22:50:31 +0900 Subject: [PATCH 2/6] fix --- lib/redis_client/cluster/transaction.rb | 64 +++++++++++++++++-------- test/redis_client/test_cluster.rb | 14 ++++++ test/test_against_cluster_state.rb | 14 ++++++ 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index e7333402..938e859a 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -45,8 +45,8 @@ def call_once_v(command, &block) def execute case @node - when ::RedisClient then send_pipeline(@node, @pipeline, @watch, @retryable) - when ::RedisClient::Pooled then @node.with { |node| send_pipeline(node, @pipeline, @watch, @retryable) } + when ::RedisClient then settle(@node) + when ::RedisClient::Pooled then @node.with { |node| settle(node) } when nil then raise ArgumentError, 'empty transaction' else raise NotImplementedError, "#{client.class.name}#multi for cluster client" end @@ -61,54 +61,76 @@ def prepare_once(command) raise ConsistencyError, "cloud not find the node: #{command.join(' ')}" if node_key.nil? @node = @router.find_node(node_key) - @pipeline.call('WATCH', *@watch) unless @watch.nil? || @watch.empty? + @pipeline.call('WATCH', *@watch) if watch? @pipeline.call('MULTI') end - def send_pipeline(client, pipeline, watch, retryable) - pipeline.call('EXEC') - pipeline.call('UNWATCH', *watch) unless watch.nil? || watch.empty? + def settle(client) + @pipeline.call('EXEC') + @pipeline.call('UNWATCH') if watch? + send_pipeline(client) + end - results = client.ensure_connected_cluster_scoped(retryable: retryable) do |connection| - commands = pipeline._commands + def send_pipeline(client, redirect: true) + results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection| + commands = @pipeline._commands client.middlewares.call_pipelined(commands, client.config) do connection.call_pipelined(commands, nil) rescue ::RedisClient::CommandError => e - handle_command_error!(commands, e) + return handle_command_error!(commands, e) if redirect + + raise end end - pipeline._coerce!(results) - idx = watch.nil? || watch.empty? ? -1 : -2 - results[idx] + @pipeline._coerce!(results) + results[watch? ? -2 : -1] end def handle_command_error!(commands, err) if err.message.start_with?('CROSSSLOT') raise ConsistencyError, err.message elsif err.message.start_with?('MOVED', 'ASK') - handle_redirection(commands, err) + ensure_the_same_node!(commands) + handle_redirection(err) else raise err end end - def handle_redirection(commands, err) - ensure_the_same_node!(commands) - - # TODO: fix the handling - raise ConsistencyError, err.message - end - def ensure_the_same_node!(commands) commands.each do |command| node_key = @router.find_primary_node_key(command) next if node_key.nil? node = @router.find_node(node_key) - raise ConsistencyError, 'the transaction should be executed to the same slot in the same node' if @node != node + next if @node == node + + raise ConsistencyError, 'the transaction should be executed to the same slot in the same node' end end + + def handle_redirection(err) + if err.message.start_with?('MOVED') + node = @router.assign_redirection_node(err.message) + send_pipeline(node, redirect: false) + elsif err.message.start_with?('ASK') + node = @router.assign_asking_node(err.message) + try_asking(node) ? send_pipeline(node, redirect: false) : err + else + raise err + end + end + + def try_asking(node) + node.call('ASKING') == 'OK' + rescue StandardError + false + end + + def watch? + !@watch.nil? && !@watch.empty? + end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index c511c132..3ccf5d0c 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -247,6 +247,20 @@ def test_transaction_without_hashtag end end + def test_transaction_with_watch + got = @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + end + + assert_equal(%w[OK OK], got) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) + end + + def test_transaction_against_optimistic_locking + skip("TODO: It's hard to hook in the middle of the transaction.") + end + def test_pubsub_without_subscription pubsub = @client.pubsub assert_nil(pubsub.next_event(0.01)) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 813ba4df..363bd53c 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -59,6 +59,20 @@ def test_the_state_of_cluster_resharding_with_pipelining end end + def test_the_state_of_cluster_resharding_with_transaction + do_resharding_test do |keys| + @client.multi do |tx| + keys.each { |key| tx.call('SET', key, key) } + end + + keys.each do |key| + want = key + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end + end + end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection # This test is excercising a very delicate race condition; i think the use of @client to set # the keys in do_resharding_test is actually causing the race condition not to happen, so this From d5a4e28cfa8bdf9d669d3cb8f47f7e80d04551a1 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 17 Feb 2024 08:45:49 +0900 Subject: [PATCH 3/6] fix --- lib/redis_client/cluster/transaction.rb | 58 +++++++++++++++++-------- test/redis_client/test_cluster.rb | 21 ++++++++- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 938e859a..3ed3f939 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -12,62 +12,82 @@ def initialize(router, command_builder, watch) @router = router @command_builder = command_builder @watch = watch + @retryable = true @pipeline = ::RedisClient::Pipeline.new(@command_builder) + @buffer = [] @node = nil - @retryable = true end def call(*command, **kwargs, &block) command = @command_builder.generate(command, kwargs) - prepare_once(command) - @pipeline.call_v(command, &block) + if prepare(command) + @pipeline.call_v(command, &block) + else + @buffer << -> { @pipeline.call_v(command, &block) } + end end def call_v(command, &block) command = @command_builder.generate(command) - prepare_once(command) - @pipeline.call_v(command, &block) + if prepare(command) + @pipeline.call_v(command, &block) + else + @buffer << -> { @pipeline.call_v(command, &block) } + end end def call_once(*command, **kwargs, &block) @retryable = false command = @command_builder.generate(command, kwargs) - prepare_once(command) - @pipeline.call_once_v(command, &block) + if prepare(command) + @pipeline.call_once_v(command, &block) + else + @buffer << -> { @pipeline.call_once_v(command, &block) } + end end def call_once_v(command, &block) @retryable = false command = @command_builder.generate(command) - prepare_once(command) - @pipeline.call_once_v(command, &block) + if prepare(command) + @pipeline.call_once_v(command, &block) + else + @buffer << -> { @pipeline.call_once_v(command, &block) } + end end def execute + @buffer.each(&:call) + + raise ArgumentError, 'empty transaction' if @pipeline._empty? + raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? + case @node when ::RedisClient then settle(@node) - when ::RedisClient::Pooled then @node.with { |node| settle(node) } - when nil then raise ArgumentError, 'empty transaction' + when ::RedisClient::Pooled then @node.with { |nd| settle(nd) } else raise NotImplementedError, "#{client.class.name}#multi for cluster client" end end private - def prepare_once(command) - return unless @node.nil? + def prepare(command) + return true unless @node.nil? node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "cloud not find the node: #{command.join(' ')}" if node_key.nil? + return false if node_key.nil? @node = @router.find_node(node_key) - @pipeline.call('WATCH', *@watch) if watch? + @node.call('WATCH', *@watch) if watch? @pipeline.call('MULTI') + @buffer.each(&:call) + @buffer.clear + true end def settle(client) @pipeline.call('EXEC') - @pipeline.call('UNWATCH') if watch? + @node.call('UNWATCH') if watch? send_pipeline(client) end @@ -84,12 +104,12 @@ def send_pipeline(client, redirect: true) end @pipeline._coerce!(results) - results[watch? ? -2 : -1] + results[-1] end def handle_command_error!(commands, err) if err.message.start_with?('CROSSSLOT') - raise ConsistencyError, err.message + raise ConsistencyError, "#{err.message}: #{err.command}" elsif err.message.start_with?('MOVED', 'ASK') ensure_the_same_node!(commands) handle_redirection(err) @@ -106,7 +126,7 @@ def ensure_the_same_node!(commands) node = @router.find_node(node_key) next if @node == node - raise ConsistencyError, 'the transaction should be executed to the same slot in the same node' + raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}" end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 3ccf5d0c..9e1becdd 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -207,7 +207,7 @@ def test_transaction_with_empty_block assert_raises(LocalJumpError) { @client.multi } end - def test_transaction_with_keyless_commands + def test_transaction_with_only_keyless_commands assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi do |t| t.call('ECHO', 'foo') @@ -249,14 +249,31 @@ def test_transaction_without_hashtag def test_transaction_with_watch got = @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('ECHO', 'START') tx.call('SET', '{key}1', '1') tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') end - assert_equal(%w[OK OK], got) + assert_equal(%w[START OK OK FINISH], got) assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) end + def test_transaction_with_bad_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + assert_raises(::RedisClient::CommandError) do + @client.multi(watch: %w[key1 key2]) do |tx| + tx.call('ECHO', 'START') + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') + end + end + + assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2')) + end + def test_transaction_against_optimistic_locking skip("TODO: It's hard to hook in the middle of the transaction.") end From fd21b34c969545f8159229cc4450c9c581be9b37 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 17 Feb 2024 11:49:04 +0900 Subject: [PATCH 4/6] fix --- lib/redis_client/cluster/router.rb | 5 ++++ lib/redis_client/cluster/transaction.rb | 33 +++++++++++++++++++------ test/redis_client/test_cluster.rb | 20 ++++++++++++--- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 7fa37d15..543d1ffe 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -162,6 +162,11 @@ def find_node_key_by_key(key, seed: nil, primary: false) end end + def find_primary_node_by_slot(slot) + node_key = @node.find_node_key_of_primary(slot) + find_node(node_key) + end + def find_node_key(command, seed: nil) key = @command.extract_first_key(command) find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command)) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 3ed3f939..8f55371a 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -2,6 +2,7 @@ require 'redis_client' require 'redis_client/cluster/pipeline' +require 'redis_client/cluster/key_slot_converter' class RedisClient class Cluster @@ -56,11 +57,12 @@ def call_once_v(command, &block) end end - def execute + def execute # rubocop:disable Metrics/AbcSize @buffer.each(&:call) raise ArgumentError, 'empty transaction' if @pipeline._empty? raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? + raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch? case @node when ::RedisClient then settle(@node) @@ -71,6 +73,25 @@ def execute private + def watch? + !@watch.nil? && !@watch.empty? + end + + def safe_watch? + return true unless watch? + return false if @node.nil? + + slots = @watch.map do |k| + return false if k.nil? || k.empty? + + ::RedisClient::Cluster::KeySlotConverter.convert(k) + end + + return false if slots.uniq.size != 1 + + @router.find_primary_node_by_slot(slots.first) == @node + end + def prepare(command) return true unless @node.nil? @@ -78,7 +99,7 @@ def prepare(command) return false if node_key.nil? @node = @router.find_node(node_key) - @node.call('WATCH', *@watch) if watch? + @pipeline.call('WATCH', *@watch) if watch? @pipeline.call('MULTI') @buffer.each(&:call) @buffer.clear @@ -87,7 +108,7 @@ def prepare(command) def settle(client) @pipeline.call('EXEC') - @node.call('UNWATCH') if watch? + @pipeline.call('UNWATCH') if watch? send_pipeline(client) end @@ -104,7 +125,7 @@ def send_pipeline(client, redirect: true) end @pipeline._coerce!(results) - results[-1] + results[watch? ? -2 : -1] end def handle_command_error!(commands, err) @@ -147,10 +168,6 @@ def try_asking(node) rescue StandardError false end - - def watch? - !@watch.nil? && !@watch.empty? - end end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 9e1becdd..987ac525 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -248,6 +248,8 @@ def test_transaction_without_hashtag end def test_transaction_with_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + got = @client.multi(watch: %w[{key}1 {key}2]) do |tx| tx.call('ECHO', 'START') tx.call('SET', '{key}1', '1') @@ -259,10 +261,10 @@ def test_transaction_with_watch assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) end - def test_transaction_with_bad_watch + def test_transaction_with_unsafe_watch @client.call('MSET', '{key}1', '0', '{key}2', '0') - assert_raises(::RedisClient::CommandError) do + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi(watch: %w[key1 key2]) do |tx| tx.call('ECHO', 'START') tx.call('SET', '{key}1', '1') @@ -274,8 +276,18 @@ def test_transaction_with_bad_watch assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2')) end - def test_transaction_against_optimistic_locking - skip("TODO: It's hard to hook in the middle of the transaction.") + def test_transaction_with_meaningless_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + got = @client.multi(watch: %w[{key}3 {key}4]) do |tx| + tx.call('ECHO', 'START') + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') + end + + assert_equal(%w[START OK OK FINISH], got) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) end def test_pubsub_without_subscription From 4f78219e81164e14f97cea84845dcc6001cd5f9c Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 17 Feb 2024 12:00:43 +0900 Subject: [PATCH 5/6] fix --- test/test_against_cluster_state.rb | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 363bd53c..412ba891 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -142,12 +142,14 @@ def new_test_client class ScaleReadRandom < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -155,6 +157,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client @@ -171,12 +181,14 @@ def new_test_client class ScaleReadRandomWithPrimary < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -184,6 +196,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client @@ -200,12 +220,14 @@ def new_test_client class ScaleReadLatency < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -213,6 +235,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client From 35c4e05367eca5980041fb34ad9d1f7a1d38020c Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 17 Feb 2024 13:23:52 +0900 Subject: [PATCH 6/6] fix --- lib/redis_client/cluster/transaction.rb | 26 ++++++++++++++----------- test/redis_client/test_cluster.rb | 9 +++++++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 8f55371a..eca4342d 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -57,18 +57,14 @@ def call_once_v(command, &block) end end - def execute # rubocop:disable Metrics/AbcSize + def execute @buffer.each(&:call) raise ArgumentError, 'empty transaction' if @pipeline._empty? raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch? - case @node - when ::RedisClient then settle(@node) - when ::RedisClient::Pooled then @node.with { |nd| settle(nd) } - else raise NotImplementedError, "#{client.class.name}#multi for cluster client" - end + settle end private @@ -106,13 +102,21 @@ def prepare(command) true end - def settle(client) + def settle @pipeline.call('EXEC') @pipeline.call('UNWATCH') if watch? - send_pipeline(client) + send_transaction(@node, redirect: true) + end + + def send_transaction(client, redirect:) + case client + when ::RedisClient then send_pipeline(client, redirect: redirect) + when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) } + else raise NotImplementedError, "#{client.class.name}#multi for cluster client" + end end - def send_pipeline(client, redirect: true) + def send_pipeline(client, redirect:) results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection| commands = @pipeline._commands client.middlewares.call_pipelined(commands, client.config) do @@ -154,10 +158,10 @@ def ensure_the_same_node!(commands) def handle_redirection(err) if err.message.start_with?('MOVED') node = @router.assign_redirection_node(err.message) - send_pipeline(node, redirect: false) + send_transaction(node, redirect: false) elsif err.message.start_with?('ASK') node = @router.assign_asking_node(err.message) - try_asking(node) ? send_pipeline(node, redirect: false) : err + try_asking(node) ? send_transaction(node, redirect: false) : err else raise err end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 987ac525..31e4ccf8 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -266,10 +266,15 @@ def test_transaction_with_unsafe_watch assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi(watch: %w[key1 key2]) do |tx| - tx.call('ECHO', 'START') tx.call('SET', '{key}1', '1') tx.call('SET', '{key}2', '2') - tx.call('ECHO', 'FINISH') + end + end + + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(watch: %w[{hey}1 {hey}2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') end end