diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 456ec493..6cef810e 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -55,15 +55,18 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco results = Array.new(commands.size) @pending_reads += size write_multi(commands) - redirection_indices = nil first_exception = nil + size.times do |index| timeout = timeouts && timeouts[index] - result = read(timeout) + result = read(connection_timeout(timeout)) @pending_reads -= 1 + if result.is_a?(::RedisClient::Error) result._set_command(commands[index]) + result._set_config(config) + if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK') redirection_indices ||= [] redirection_indices << index @@ -71,6 +74,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco first_exception ||= result end end + results[index] = result end diff --git a/test/command_capture_middleware.rb b/test/command_capture_middleware.rb deleted file mode 100644 index 1bd9417b..00000000 --- a/test/command_capture_middleware.rb +++ /dev/null @@ -1,63 +0,0 @@ -# frozen_string_literal: true - -module CommandCaptureMiddleware - CapturedCommand = Struct.new(:server_url, :command, :pipelined, keyword_init: true) do - def inspect - "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" - end - end - - # The CommandBuffer is what should be set as the :captured_commands custom option. - # It needs to be threadsafe, because redis-cluster-client performs some redis operations on - # multiple nodes in parallel, and in e.g. jruby it's not safe to concurrently manipulate the same array. - class CommandBuffer - def initialize - @array = [] - @mutex = Mutex.new - end - - def to_a - @mutex.synchronize do - @array.dup - end - end - - def <<(command) - @mutex.synchronize do - @array << command - end - end - - def clear - @mutex.synchronize do - @array.clear - end - end - end - - def call(command, redis_config) - redis_config.custom[:captured_commands] << CapturedCommand.new( - server_url: CommandCaptureMiddleware.normalize_captured_url(redis_config.server_url), - command: command, - pipelined: false - ) - super - end - - def call_pipelined(commands, redis_config) - commands.map do |command| - redis_config.custom[:captured_commands] << CapturedCommand.new( - server_url: CommandCaptureMiddleware.normalize_captured_url(redis_config.server_url), - command: command, - pipelined: true - ) - end - super - end - - def self.normalize_captured_url(url) - URI.parse(url).tap do |u| - u.path = '' - end.to_s - end -end diff --git a/test/middlewares/command_capture.rb b/test/middlewares/command_capture.rb new file mode 100644 index 00000000..5bd4b67e --- /dev/null +++ b/test/middlewares/command_capture.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +module Middlewares + module CommandCapture + CapturedCommand = Struct.new('CapturedCommand', :server_url, :command, :pipelined, keyword_init: true) do + def inspect + "#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >" + end + end + + # The CommandBuffer is what should be set as the :captured_commands custom option. + # It needs to be threadsafe, because redis-cluster-client performs some redis operations on + # multiple nodes in parallel, and in e.g. jruby it's not safe to concurrently manipulate the same array. + class CommandBuffer + def initialize + @array = [] + @mutex = Mutex.new + end + + def to_a + @mutex.synchronize do + @array.dup + end + end + + def <<(command) + @mutex.synchronize do + @array << command + end + end + + def count(*cmd) + @mutex.synchronize do + next 0 if @array.empty? + + @array.count do |e| + cmd.size.times.all? { |i| cmd[i].downcase == e.command[i]&.downcase } + end + end + end + + def clear + @mutex.synchronize do + @array.clear + end + end + end + + def call(command, redis_config) + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url), + command: command, + pipelined: false + ) + super + end + + def call_pipelined(commands, redis_config) + commands.map do |command| + redis_config.custom[:captured_commands] << CapturedCommand.new( + server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url), + command: command, + pipelined: true + ) + end + super + end + + def self.normalize_captured_url(url) + URI.parse(url).tap do |u| + u.path = '' + end.to_s + end + end +end diff --git a/test/middlewares/redirection_count.rb b/test/middlewares/redirection_count.rb new file mode 100644 index 00000000..98126faf --- /dev/null +++ b/test/middlewares/redirection_count.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module Middlewares + module RedirectionCount + class Counter + Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true) + + def initialize + @moved = 0 + @ask = 0 + @mutex = Mutex.new + end + + def moved + @mutex.synchronize { @moved += 1 } + end + + def ask + @mutex.synchronize { @ask += 1 } + end + + def get + @mutex.synchronize { Result.new(moved: @moved, ask: @ask) } + end + + def zero? + @mutex.synchronize { @moved == 0 && @ask == 0 } + end + + def clear + @mutex.synchronize do + @moved = 0 + @ask = 0 + end + end + end + + def call(cmd, cfg) + super + rescue ::RedisClient::CommandError => e + if e.message.start_with?('MOVED') + cfg.custom.fetch(:redirection_count).moved + elsif e.message.start_with?('ASK') + cfg.custom.fetch(:redirection_count).ask + end + + raise + end + + def call_pipelined(cmd, cfg) + super + rescue ::RedisClient::CommandError => e + if e.message.start_with?('MOVED') + cfg.custom.fetch(:redirection_count).moved + elsif e.message.start_with?('ASK') + cfg.custom.fetch(:redirection_count).ask + end + + raise + end + end +end diff --git a/test/middlewares/redirection_emulation.rb b/test/middlewares/redirection_emulation.rb new file mode 100644 index 00000000..3965a71a --- /dev/null +++ b/test/middlewares/redirection_emulation.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Middlewares + module RedirectionEmulation + Setting = Struct.new( + 'RedirectionEmulationMiddlewareSetting', + :slot, :to, :command, keyword_init: true + ) + + def call(cmd, cfg) + s = cfg.custom.fetch(:redirect) + raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command + + super + end + + def call_pipelined(cmd, cfg) + s = cfg.custom.fetch(:redirect) + raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command + + super + end + end +end diff --git a/test/redirection_emulation_middleware.rb b/test/redirection_emulation_middleware.rb deleted file mode 100644 index 7e1091fa..00000000 --- a/test/redirection_emulation_middleware.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -module RedirectionEmulationMiddleware - Setting = Struct.new( - 'RedirectionEmulationMiddlewareSetting', - :slot, :to, :command, keyword_init: true - ) - - def call(cmd, cfg) - s = cfg.custom.fetch(:redirect) - raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command - - super - end - - def call_pipelined(cmd, cfg) - s = cfg.custom.fetch(:redirect) - raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command - - super - end -end diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index 1f1c42d7..b0dfb00d 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -36,11 +36,11 @@ def teardown @test_nodes&.each { |n| n&.each(&:close) } end - def make_node(capture_buffer: CommandCaptureMiddleware::CommandBuffer.new, pool: nil, **kwargs) + def make_node(capture_buffer: ::Middlewares::CommandCapture::CommandBuffer.new, pool: nil, **kwargs) config = ::RedisClient::ClusterConfig.new(**{ nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, - middlewares: [CommandCaptureMiddleware], + middlewares: [::Middlewares::CommandCapture], custom: { captured_commands: capture_buffer }, **TEST_GENERIC_OPTIONS }.merge(kwargs)) @@ -596,7 +596,7 @@ def test_try_map end def test_reload - capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new test_node = make_node(replica: true, capture_buffer: capture_buffer) capture_buffer.clear @@ -618,7 +618,7 @@ def test_reload def test_reload_with_original_config bootstrap_node = TEST_NODE_URIS.first - capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new test_node = make_node( nodes: [bootstrap_node], replica: true, @@ -640,7 +640,7 @@ def test_reload_with_original_config end def test_reload_with_overriden_sample_size - capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new test_node = make_node(replica: true, capture_buffer: capture_buffer, max_startup_sample: 1) capture_buffer.clear @@ -661,7 +661,7 @@ def test_reload_with_overriden_sample_size end def test_reload_concurrently - capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new test_node = make_node(replica: true, pool: { size: 2 }, capture_buffer: capture_buffer) # Simulate refetch_node_info_list taking a long time diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index aee1d6a2..b5d39022 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,17 +6,20 @@ class RedisClient class TestCluster module Mixin def setup - @captured_commands = CommandCaptureMiddleware::CommandBuffer.new + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirection_count = ::Middlewares::RedirectionCount::Counter.new @client = new_test_client @client.call('FLUSHDB') wait_for_replication @captured_commands.clear + @redirection_count.clear end def teardown @client&.call('FLUSHDB') wait_for_replication @client&.close + flunk(@redirection_count.get) unless @redirection_count.zero? end def test_config @@ -414,7 +417,7 @@ def test_transaction_does_not_unwatch_on_connection_error end def test_transaction_does_not_retry_without_rewatching - client2 = new_test_client + client2 = new_test_client(middlewares: nil) @client.call('SET', 'key', 'original_value') @@ -442,7 +445,7 @@ def test_transaction_does_not_retry_without_rewatching end def test_transaction_with_watch_retries_block - client2 = new_test_client + client2 = new_test_client(middlewares: nil) call_count = 0 @client.call('SET', 'key', 'original_value') @@ -523,7 +526,7 @@ def test_transaction_in_race_condition @client.call('MSET', '{key}1', '1', '{key}2', '2') another = Fiber.new do - cli = new_test_client + cli = new_test_client(middlewares: nil) cli.call('MSET', '{key}1', '3', '{key}2', '4') cli.close Fiber.yield @@ -847,10 +850,10 @@ def test_only_reshards_own_errors client2 = new_test_client( middlewares: [ ::RedisClient::Cluster::ErrorIdentification::Middleware, - RedirectionEmulationMiddleware + ::Middlewares::RedirectionEmulation ], custom: { - redirect: RedirectionEmulationMiddleware::Setting.new( + redirect: ::Middlewares::RedirectionEmulation::Setting.new( slot: slot, to: broken_primary_key, command: %w[SET testkey client2] ) } @@ -907,7 +910,7 @@ def collect_messages(pubsub, size:, max_attempts: 30, timeout: 1.0) end def publish_messages - client = new_test_client + client = new_test_client(middlewares: nil) yield client client.close end @@ -921,7 +924,11 @@ def hiredis_used? class PrimaryOnly < TestingWrapper include Mixin - def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts) + def new_test_client( + custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + **opts + ) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, @@ -938,7 +945,11 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar class ScaleReadRandom < TestingWrapper include Mixin - def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts) + def new_test_client( + custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + **opts + ) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -957,7 +968,11 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar class ScaleReadRandomWithPrimary < TestingWrapper include Mixin - def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts) + def new_test_client( + custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + **opts + ) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -976,7 +991,11 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar class ScaleReadLatency < TestingWrapper include Mixin - def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts) + def new_test_client( + custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + **opts + ) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -995,7 +1014,11 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar class Pooled < TestingWrapper include Mixin - def new_test_client(custom: { captured_commands: @captured_commands }, middlewares: [CommandCaptureMiddleware], **opts) + def new_test_client( + custom: { captured_commands: @captured_commands, redirection_count: @redirection_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount], + **opts + ) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, diff --git a/test/test_against_cluster_broken.rb b/test/test_against_cluster_broken.rb index 30917f36..62ca402e 100644 --- a/test/test_against_cluster_broken.rb +++ b/test/test_against_cluster_broken.rb @@ -6,10 +6,13 @@ class TestAgainstClusterBroken < TestingWrapper WAIT_SEC = 3 def setup + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new @client = ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, fixed_hostname: TEST_FIXED_HOSTNAME, + custom: { captured_commands: @captured_commands }, + middlewares: [::Middlewares::CommandCapture], **TEST_GENERIC_OPTIONS ).new_client @client.call('echo', 'init') @@ -18,6 +21,7 @@ def setup replica_size: TEST_REPLICA_SIZE, **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) ) + @captured_commands.clear end def teardown @@ -28,11 +32,13 @@ def teardown def test_a_replica_is_down sacrifice = @controller.select_sacrifice_of_replica do_test_a_node_is_down(sacrifice, number_of_keys: 10) + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) end def test_a_primary_is_down sacrifice = @controller.select_sacrifice_of_primary do_test_a_node_is_down(sacrifice, number_of_keys: 10) + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) end private diff --git a/test/test_against_cluster_scale.rb b/test/test_against_cluster_scale.rb index b2c1d7ea..7bc39806 100644 --- a/test/test_against_cluster_scale.rb +++ b/test/test_against_cluster_scale.rb @@ -10,13 +10,17 @@ def self.test_order end def setup + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new @client = ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, fixed_hostname: TEST_FIXED_HOSTNAME, + custom: { captured_commands: @captured_commands }, + middlewares: [::Middlewares::CommandCapture], **TEST_GENERIC_OPTIONS ).new_client @client.call('echo', 'init') + @captured_commands.clear end def teardown @@ -42,6 +46,7 @@ def test_01_scale_out .instance_variable_get(:@clients) .size assert_equal(want, got, 'Case: number of nodes') + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) end def test_02_scale_in @@ -64,6 +69,7 @@ def test_02_scale_in .instance_variable_get(:@clients) .size assert_equal(want, got, 'Case: number of nodes') + refute(@captured_commands.count('cluster', 'nodes').zero?, @captured_commands.to_a.map(&:command)) end private diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index f0769e7a..fc1bcf5c 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -4,6 +4,7 @@ module TestAgainstClusterState SLOT_SIZE = 16_384 + PATTERN = ENV.fetch('TEST_CLASS_PATTERN', '') module Mixin def setup @@ -13,8 +14,10 @@ def setup **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) ) @controller.rebuild + @redirection_count = ::Middlewares::RedirectionCount::Counter.new @client = new_test_client @client.call('echo', 'init') + @redirection_count.clear end def teardown @@ -34,6 +37,7 @@ def test_the_state_of_cluster_failover wait_for_replication 1000.times { |i| assert_equal(i.to_s, @client.call('GET', "key#{i}")) } assert_equal('ok', fetch_cluster_info('cluster_state')) + refute(@redirection_count.zero?, @redirection_count.get) end def test_the_state_of_cluster_resharding @@ -44,6 +48,8 @@ def test_the_state_of_cluster_resharding assert_equal(want, got, "Case: GET: #{key}") end end + + refute(@redirection_count.zero?, @redirection_count.get) end def test_the_state_of_cluster_resharding_with_pipelining @@ -58,6 +64,11 @@ def test_the_state_of_cluster_resharding_with_pipelining assert_equal(want, got, "Case: GET: #{key}") end end + + # Since redirections are handled by #call_pipelined_aware_of_redirection, + # we can't trace them in pipelining processes. + # + # refute(@redirection_count.zero?, @redirection_count.get) end def test_the_state_of_cluster_resharding_with_transaction @@ -80,6 +91,7 @@ def test_the_state_of_cluster_resharding_with_transaction end assert_equal(1, call_cnt) + refute(@redirection_count.zero?, @redirection_count.get) end def test_the_state_of_cluster_resharding_with_transaction_and_watch @@ -102,10 +114,11 @@ def test_the_state_of_cluster_resharding_with_transaction_and_watch end assert_equal(1, call_cnt) + refute(@redirection_count.zero?, @redirection_count.get) end def test_the_state_of_cluster_resharding_with_reexecuted_watch - client2 = new_test_client + client2 = new_test_client(middlewares: nil) call_cnt = 0 @client.call('SET', 'watch_key', 'original_value') @@ -129,6 +142,7 @@ def test_the_state_of_cluster_resharding_with_reexecuted_watch assert_equal(2, call_cnt) # The second call succeeded assert_equal('@client_value_2', @client.call('GET', 'watch_key')) + refute(@redirection_count.zero?, @redirection_count.get) ensure client2&.close end @@ -189,19 +203,24 @@ def do_resharding_test(number_of_keys: 1000) end end - PATTERN = ENV.fetch('TEST_CLASS_PATTERN', '') - if PATTERN == 'PrimaryOnly' || PATTERN.empty? class PrimaryOnly < TestingWrapper include Mixin private - def new_test_client + def new_test_client( + custom: { redirection_count: @redirection_count }, + middlewares: [::Middlewares::RedirectionCount], + **opts + ) ::RedisClient.cluster( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS + middlewares: middlewares, + custom: custom, + **TEST_GENERIC_OPTIONS, + **opts ).new_client end end @@ -213,11 +232,18 @@ class Pooled < TestingWrapper private - def new_test_client + def new_test_client( + custom: { redirection_count: @redirection_count }, + middlewares: [::Middlewares::RedirectionCount], + **opts + ) ::RedisClient.cluster( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS + middlewares: middlewares, + custom: custom, + **TEST_GENERIC_OPTIONS, + **opts ).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2) end end @@ -229,13 +255,20 @@ class ScaleReadRandom < TestingWrapper private - def new_test_client + def new_test_client( + custom: { redirection_count: @redirection_count }, + middlewares: [::Middlewares::RedirectionCount], + **opts + ) ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS + middlewares: middlewares, + custom: custom, + **TEST_GENERIC_OPTIONS, + **opts ).new_client end end @@ -247,13 +280,20 @@ class ScaleReadRandomWithPrimary < TestingWrapper private - def new_test_client + def new_test_client( + custom: { redirection_count: @redirection_count }, + middlewares: [::Middlewares::RedirectionCount], + **opts + ) ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :random_with_primary, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS + middlewares: middlewares, + custom: custom, + **TEST_GENERIC_OPTIONS, + **opts ).new_client end end @@ -265,13 +305,20 @@ class ScaleReadLatency < TestingWrapper private - def new_test_client + def new_test_client( + custom: { redirection_count: @redirection_count }, + middlewares: [::Middlewares::RedirectionCount], + **opts + ) ::RedisClient.cluster( nodes: TEST_NODE_URIS, replica: true, replica_affinity: :latency, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS + middlewares: middlewares, + custom: custom, + **TEST_GENERIC_OPTIONS, + **opts ).new_client end end diff --git a/test/testing_helper.rb b/test/testing_helper.rb index 7ba01cd4..d297571f 100644 --- a/test/testing_helper.rb +++ b/test/testing_helper.rb @@ -6,8 +6,9 @@ require 'redis-cluster-client' require 'testing_constants' require 'cluster_controller' -require 'command_capture_middleware' -require 'redirection_emulation_middleware' +require 'middlewares/command_capture' +require 'middlewares/redirection_emulation' +require 'middlewares/redirection_count' case ENV.fetch('REDIS_CONNECTION_DRIVER', 'ruby') when 'hiredis' then require 'hiredis-client'