diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 02281105..4bc8d3ef 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -89,8 +89,10 @@ def pipelined pipeline.execute end - def multi(watch: nil, &block) - ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) + def multi(watch: nil) + transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch) + yield transaction + transaction.execute end def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 7fa37d15..543d1ffe 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -162,6 +162,11 @@ def find_node_key_by_key(key, seed: nil, primary: false) end end + def find_primary_node_by_slot(slot) + node_key = @node.find_node_key_of_primary(slot) + find_node(node_key) + end + def find_node_key(command, seed: nil) key = @command.extract_first_key(command) find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command)) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 7302a283..eca4342d 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -1,56 +1,176 @@ # frozen_string_literal: true require 'redis_client' +require 'redis_client/cluster/pipeline' +require 'redis_client/cluster/key_slot_converter' class RedisClient class Cluster class Transaction ConsistencyError = Class.new(::RedisClient::Error) - def initialize(router, command_builder) + def initialize(router, command_builder, watch) @router = router @command_builder = command_builder - @node_key = nil + @watch = watch + @retryable = true + @pipeline = ::RedisClient::Pipeline.new(@command_builder) + @buffer = [] + @node = nil end - def call(*command, **kwargs, &_) + def call(*command, **kwargs, &block) command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + if prepare(command) + @pipeline.call_v(command, &block) + else + @buffer << -> { @pipeline.call_v(command, &block) } + end end - def call_v(command, &_) + def call_v(command, &block) command = @command_builder.generate(command) - ensure_node_key(command) + if prepare(command) + @pipeline.call_v(command, &block) + else + @buffer << -> { @pipeline.call_v(command, &block) } + end end - def call_once(*command, **kwargs, &_) + def call_once(*command, **kwargs, &block) + @retryable = false command = @command_builder.generate(command, kwargs) - ensure_node_key(command) + if prepare(command) + @pipeline.call_once_v(command, &block) + else + @buffer << -> { @pipeline.call_once_v(command, &block) } + end end - def call_once_v(command, &_) + def call_once_v(command, &block) + @retryable = false command = @command_builder.generate(command) - ensure_node_key(command) + if prepare(command) + @pipeline.call_once_v(command, &block) + else + @buffer << -> { @pipeline.call_once_v(command, &block) } + end end - def execute(watch: nil, &block) - yield self - raise ArgumentError, 'empty transaction' if @node_key.nil? + def execute + @buffer.each(&:call) - node = @router.find_node(@node_key) - @router.try_delegate(node, :multi, watch: watch, &block) + raise ArgumentError, 'empty transaction' if @pipeline._empty? + raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? + raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch? + + settle end private - def ensure_node_key(command) + def watch? + !@watch.nil? && !@watch.empty? + end + + def safe_watch? + return true unless watch? + return false if @node.nil? + + slots = @watch.map do |k| + return false if k.nil? || k.empty? + + ::RedisClient::Cluster::KeySlotConverter.convert(k) + end + + return false if slots.uniq.size != 1 + + @router.find_primary_node_by_slot(slots.first) == @node + end + + def prepare(command) + return true unless @node.nil? + node_key = @router.find_primary_node_key(command) - raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil? + return false if node_key.nil? + + @node = @router.find_node(node_key) + @pipeline.call('WATCH', *@watch) if watch? + @pipeline.call('MULTI') + @buffer.each(&:call) + @buffer.clear + true + end + + def settle + @pipeline.call('EXEC') + @pipeline.call('UNWATCH') if watch? + send_transaction(@node, redirect: true) + end - @node_key ||= node_key - raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key + def send_transaction(client, redirect:) + case client + when ::RedisClient then send_pipeline(client, redirect: redirect) + when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) } + else raise NotImplementedError, "#{client.class.name}#multi for cluster client" + end + end + + def send_pipeline(client, redirect:) + results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection| + commands = @pipeline._commands + client.middlewares.call_pipelined(commands, client.config) do + connection.call_pipelined(commands, nil) + rescue ::RedisClient::CommandError => e + return handle_command_error!(commands, e) if redirect + + raise + end + end + + @pipeline._coerce!(results) + results[watch? ? -2 : -1] + end + + def handle_command_error!(commands, err) + if err.message.start_with?('CROSSSLOT') + raise ConsistencyError, "#{err.message}: #{err.command}" + elsif err.message.start_with?('MOVED', 'ASK') + ensure_the_same_node!(commands) + handle_redirection(err) + else + raise err + end + end + + def ensure_the_same_node!(commands) + commands.each do |command| + node_key = @router.find_primary_node_key(command) + next if node_key.nil? + + node = @router.find_node(node_key) + next if @node == node + + raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}" + end + end + + def handle_redirection(err) + if err.message.start_with?('MOVED') + node = @router.assign_redirection_node(err.message) + send_transaction(node, redirect: false) + elsif err.message.start_with?('ASK') + node = @router.assign_asking_node(err.message) + try_asking(node) ? send_transaction(node, redirect: false) : err + else + raise err + end + end - nil + def try_asking(node) + node.call('ASKING') == 'OK' + rescue StandardError + false end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 433a9af6..31e4ccf8 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -207,7 +207,7 @@ def test_transaction_with_empty_block assert_raises(LocalJumpError) { @client.multi } end - def test_transaction_with_keyless_commands + def test_transaction_with_only_keyless_commands assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi do |t| t.call('ECHO', 'foo') @@ -234,7 +234,7 @@ def test_transaction_without_hashtag end end - assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.multi do |t| t.call('MSET', 'key1', '1', 'key2', '2') t.call('MSET', 'key1', '1', 'key3', '3') @@ -247,6 +247,54 @@ def test_transaction_without_hashtag end end + def test_transaction_with_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + got = @client.multi(watch: %w[{key}1 {key}2]) do |tx| + tx.call('ECHO', 'START') + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') + end + + assert_equal(%w[START OK OK FINISH], got) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) + end + + def test_transaction_with_unsafe_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(watch: %w[key1 key2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + end + end + + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.multi(watch: %w[{hey}1 {hey}2]) do |tx| + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + end + end + + assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2')) + end + + def test_transaction_with_meaningless_watch + @client.call('MSET', '{key}1', '0', '{key}2', '0') + + got = @client.multi(watch: %w[{key}3 {key}4]) do |tx| + tx.call('ECHO', 'START') + tx.call('SET', '{key}1', '1') + tx.call('SET', '{key}2', '2') + tx.call('ECHO', 'FINISH') + end + + assert_equal(%w[START OK OK FINISH], got) + assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2')) + end + def test_pubsub_without_subscription pubsub = @client.pubsub assert_nil(pubsub.next_event(0.01)) diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 813ba4df..412ba891 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -59,6 +59,20 @@ def test_the_state_of_cluster_resharding_with_pipelining end end + def test_the_state_of_cluster_resharding_with_transaction + do_resharding_test do |keys| + @client.multi do |tx| + keys.each { |key| tx.call('SET', key, key) } + end + + keys.each do |key| + want = key + got = @client.call('GET', key) + assert_equal(want, got, "Case: GET: #{key}") + end + end + end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection # This test is excercising a very delicate race condition; i think the use of @client to set # the keys in do_resharding_test is actually causing the race condition not to happen, so this @@ -128,12 +142,14 @@ def new_test_client class ScaleReadRandom < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -141,6 +157,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client @@ -157,12 +181,14 @@ def new_test_client class ScaleReadRandomWithPrimary < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -170,6 +196,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client @@ -186,12 +220,14 @@ def new_test_client class ScaleReadLatency < TestingWrapper include Mixin + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding keys = nil do_resharding_test { |ks| keys = ks } keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 def test_the_state_of_cluster_resharding_with_pipelining keys = nil do_resharding_test { |ks| keys = ks } @@ -199,6 +235,14 @@ def test_the_state_of_cluster_resharding_with_pipelining keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") } end + # FIXME: https://github.com/redis/redis/issues/11312 + def test_the_state_of_cluster_resharding_with_transaction + keys = nil + do_resharding_test { |ks| keys = ks } + @client.multi { |tx| keys.each { |key| tx.call('SET', key, key) } } + keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") } + end + private def new_test_client