Skip to content

Commit

Permalink
test: verify occurrence of redirections just in case (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 22, 2024
1 parent 8cbcaf4 commit a1ca9b3
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 120 deletions.
8 changes: 6 additions & 2 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,26 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
results = Array.new(commands.size)
@pending_reads += size
write_multi(commands)

redirection_indices = nil
first_exception = nil

size.times do |index|
timeout = timeouts && timeouts[index]
result = read(timeout)
result = read(connection_timeout(timeout))
@pending_reads -= 1

if result.is_a?(::RedisClient::Error)
result._set_command(commands[index])
result._set_config(config)

if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK')
redirection_indices ||= []
redirection_indices << index
elsif exception
first_exception ||= result
end
end

results[index] = result
end

Expand Down
63 changes: 0 additions & 63 deletions test/command_capture_middleware.rb

This file was deleted.

75 changes: 75 additions & 0 deletions test/middlewares/command_capture.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true

module Middlewares
module CommandCapture
CapturedCommand = Struct.new('CapturedCommand', :server_url, :command, :pipelined, keyword_init: true) do
def inspect
"#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >"
end
end

# The CommandBuffer is what should be set as the :captured_commands custom option.
# It needs to be threadsafe, because redis-cluster-client performs some redis operations on
# multiple nodes in parallel, and in e.g. jruby it's not safe to concurrently manipulate the same array.
class CommandBuffer
def initialize
@array = []
@mutex = Mutex.new
end

def to_a
@mutex.synchronize do
@array.dup
end
end

def <<(command)
@mutex.synchronize do
@array << command
end
end

def count(*cmd)
@mutex.synchronize do
next 0 if @array.empty?

@array.count do |e|
cmd.size.times.all? { |i| cmd[i].downcase == e.command[i]&.downcase }
end
end
end

def clear
@mutex.synchronize do
@array.clear
end
end
end

def call(command, redis_config)
redis_config.custom[:captured_commands] << CapturedCommand.new(
server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url),
command: command,
pipelined: false
)
super
end

def call_pipelined(commands, redis_config)
commands.map do |command|
redis_config.custom[:captured_commands] << CapturedCommand.new(
server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url),
command: command,
pipelined: true
)
end
super
end

def self.normalize_captured_url(url)
URI.parse(url).tap do |u|
u.path = ''
end.to_s
end
end
end
62 changes: 62 additions & 0 deletions test/middlewares/redirection_count.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

module Middlewares
module RedirectionCount
class Counter
Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true)

def initialize
@moved = 0
@ask = 0
@mutex = Mutex.new
end

def moved
@mutex.synchronize { @moved += 1 }
end

def ask
@mutex.synchronize { @ask += 1 }
end

def get
@mutex.synchronize { Result.new(moved: @moved, ask: @ask) }
end

def zero?
@mutex.synchronize { @moved == 0 && @ask == 0 }
end

def clear
@mutex.synchronize do
@moved = 0
@ask = 0
end
end
end

def call(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
end

raise
end

def call_pipelined(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
end

raise
end
end
end
24 changes: 24 additions & 0 deletions test/middlewares/redirection_emulation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Middlewares
module RedirectionEmulation
Setting = Struct.new(
'RedirectionEmulationMiddlewareSetting',
:slot, :to, :command, keyword_init: true
)

def call(cmd, cfg)
s = cfg.custom.fetch(:redirect)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end

def call_pipelined(cmd, cfg)
s = cfg.custom.fetch(:redirect)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end
end
end
22 changes: 0 additions & 22 deletions test/redirection_emulation_middleware.rb

This file was deleted.

12 changes: 6 additions & 6 deletions test/redis_client/cluster/test_node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def teardown
@test_nodes&.each { |n| n&.each(&:close) }
end

def make_node(capture_buffer: CommandCaptureMiddleware::CommandBuffer.new, pool: nil, **kwargs)
def make_node(capture_buffer: ::Middlewares::CommandCapture::CommandBuffer.new, pool: nil, **kwargs)
config = ::RedisClient::ClusterConfig.new(**{
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
middlewares: [CommandCaptureMiddleware],
middlewares: [::Middlewares::CommandCapture],
custom: { captured_commands: capture_buffer },
**TEST_GENERIC_OPTIONS
}.merge(kwargs))
Expand Down Expand Up @@ -596,7 +596,7 @@ def test_try_map
end

def test_reload
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
test_node = make_node(replica: true, capture_buffer: capture_buffer)

capture_buffer.clear
Expand All @@ -618,7 +618,7 @@ def test_reload

def test_reload_with_original_config
bootstrap_node = TEST_NODE_URIS.first
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
test_node = make_node(
nodes: [bootstrap_node],
replica: true,
Expand All @@ -640,7 +640,7 @@ def test_reload_with_original_config
end

def test_reload_with_overriden_sample_size
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
test_node = make_node(replica: true, capture_buffer: capture_buffer, max_startup_sample: 1)

capture_buffer.clear
Expand All @@ -661,7 +661,7 @@ def test_reload_with_overriden_sample_size
end

def test_reload_concurrently
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
test_node = make_node(replica: true, pool: { size: 2 }, capture_buffer: capture_buffer)

# Simulate refetch_node_info_list taking a long time
Expand Down
Loading

0 comments on commit a1ca9b3

Please sign in to comment.