Skip to content

Commit

Permalink
perf: lessen reload frequency to mitigate load of servers (#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 23, 2024
1 parent a1ca9b3 commit 4ee1116
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 68 deletions.
19 changes: 17 additions & 2 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Node
ROLE_FLAGS = %w[master slave].freeze
EMPTY_ARRAY = [].freeze
EMPTY_HASH = {}.freeze
STATE_REFRESH_INTERVAL = (3..10).freeze

private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT,
:DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH
Expand Down Expand Up @@ -103,6 +104,8 @@ def initialize(concurrent_worker, config:, pool: nil, **kwargs)
@config = config
@mutex = Mutex.new
@last_reloaded_at = nil
@reload_times = 0
@random = Random.new
end

def inspect
Expand Down Expand Up @@ -419,15 +422,27 @@ def with_reload_lock
# performed the reload.
# Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is
# obviously not working.
wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
wait_start = obtain_current_time
@mutex.synchronize do
return if @last_reloaded_at && @last_reloaded_at > wait_start

if @last_reloaded_at && @reload_times > 1
# Mitigate load of servers by naive logic. Don't sleep with exponential backoff.
now = obtain_current_time
elapsed = @last_reloaded_at + @random.rand(STATE_REFRESH_INTERVAL) * 1_000_000
return if now < elapsed
end

r = yield
@last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@last_reloaded_at = obtain_current_time
@reload_times += 1
r
end
end

def obtain_current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end
end
end
end
25 changes: 10 additions & 15 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,29 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
rescue ::RedisClient::CommandError => e
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

retry_count -= 1
if e.message.start_with?('MOVED')
node = assign_redirection_node(e.message)
retry_count -= 1
retry if retry_count >= 0
elsif e.message.start_with?('ASK')
node = assign_asking_node(e.message)
retry_count -= 1
if retry_count >= 0
node.call('ASKING')
retry
end
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
@node.reload!
retry_count -= 1
retry if retry_count >= 0
end

raise
rescue ::RedisClient::ConnectionError => e
raise unless ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(e, node)

@node.reload!

raise if retry_count <= 0

retry_count -= 1
retry
@node.reload!
retry if retry_count >= 0
raise
end

def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
Expand Down Expand Up @@ -200,13 +197,13 @@ def find_slot_by_key(key)
::RedisClient::Cluster::KeySlotConverter.convert(key)
end

def find_node(node_key, retry_count: 3)
def find_node(node_key, retry_count: 1)
@node.find_by(node_key)
rescue ::RedisClient::Cluster::Node::ReloadNeeded
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0

@node.reload!
retry_count -= 1
@node.reload!
retry
end

Expand Down Expand Up @@ -236,17 +233,15 @@ def close

private

def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize
def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize
@node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block))
rescue ::RedisClient::Cluster::ErrorCollection => e
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)
raise if retry_count <= 0
raise if e.errors.values.none? do |err|
err.message.include?('WAIT cannot be used with replica instances')
end
raise if e.errors.values.none? { |err| err.message.include?('WAIT cannot be used with replica instances') }

@node.reload!
retry_count -= 1
@node.reload!
retry
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

module Middlewares
module RedirectionCount
module RedirectCount
class Counter
Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true)
Result = Struct.new('RedirectCountResult', :moved, :ask, keyword_init: true)

def initialize
@moved = 0
Expand Down Expand Up @@ -39,9 +39,9 @@ def call(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
cfg.custom.fetch(:redirect_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
cfg.custom.fetch(:redirect_count).ask
end

raise
Expand All @@ -51,9 +51,9 @@ def call_pipelined(cmd, cfg)
super
rescue ::RedisClient::CommandError => e
if e.message.start_with?('MOVED')
cfg.custom.fetch(:redirection_count).moved
cfg.custom.fetch(:redirect_count).moved
elsif e.message.start_with?('ASK')
cfg.custom.fetch(:redirection_count).ask
cfg.custom.fetch(:redirect_count).ask
end

raise
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# frozen_string_literal: true

module Middlewares
module RedirectionEmulation
module RedirectFake
Setting = Struct.new(
'RedirectionEmulationMiddlewareSetting',
'RedirectFakeSetting',
:slot, :to, :command, keyword_init: true
)

def call(cmd, cfg)
s = cfg.custom.fetch(:redirect)
s = cfg.custom.fetch(:redirect_fake)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
end

def call_pipelined(cmd, cfg)
s = cfg.custom.fetch(:redirect)
s = cfg.custom.fetch(:redirect_fake)
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command

super
Expand Down
30 changes: 15 additions & 15 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ class TestCluster
module Mixin
def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirection_count = ::Middlewares::RedirectionCount::Counter.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = new_test_client
@client.call('FLUSHDB')
wait_for_replication
@captured_commands.clear
@redirection_count.clear
@redirect_count.clear
end

def teardown
@client&.call('FLUSHDB')
wait_for_replication
@client&.close
flunk(@redirection_count.get) unless @redirection_count.zero?
flunk(@redirect_count.get) unless @redirect_count.zero?
end

def test_config
Expand Down Expand Up @@ -850,10 +850,10 @@ def test_only_reshards_own_errors
client2 = new_test_client(
middlewares: [
::RedisClient::Cluster::ErrorIdentification::Middleware,
::Middlewares::RedirectionEmulation
::Middlewares::RedirectFake
],
custom: {
redirect: ::Middlewares::RedirectionEmulation::Setting.new(
redirect_fake: ::Middlewares::RedirectFake::Setting.new(
slot: slot, to: broken_primary_key, command: %w[SET testkey client2]
)
}
Expand Down Expand Up @@ -925,8 +925,8 @@ class PrimaryOnly < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -946,8 +946,8 @@ class ScaleReadRandom < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -969,8 +969,8 @@ class ScaleReadRandomWithPrimary < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -992,8 +992,8 @@ class ScaleReadLatency < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand All @@ -1015,8 +1015,8 @@ class Pooled < TestingWrapper
include Mixin

def new_test_client(
custom: { captured_commands: @captured_commands, redirection_count: @redirection_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectionCount],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**opts
)
config = ::RedisClient::ClusterConfig.new(
Expand Down
7 changes: 5 additions & 2 deletions test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ class TestAgainstClusterBroken < TestingWrapper

def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = ::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
fixed_hostname: TEST_FIXED_HOSTNAME,
custom: { captured_commands: @captured_commands },
middlewares: [::Middlewares::CommandCapture],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
Expand All @@ -22,11 +23,13 @@ def setup
**TEST_GENERIC_OPTIONS.merge(timeout: 30.0)
)
@captured_commands.clear
@redirect_count.clear
end

def teardown
@client&.close
@controller&.close
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
end

def test_a_replica_is_down
Expand Down
7 changes: 5 additions & 2 deletions test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ def self.test_order

def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@client = ::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
fixed_hostname: TEST_FIXED_HOSTNAME,
custom: { captured_commands: @captured_commands },
middlewares: [::Middlewares::CommandCapture],
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**TEST_GENERIC_OPTIONS
).new_client
@client.call('echo', 'init')
@captured_commands.clear
@redirect_count.clear
end

def teardown
@client&.close
@controller&.close
print "#{@redirect_count.get}, ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')} = "
end

def test_01_scale_out
Expand Down
Loading

0 comments on commit 4ee1116

Please sign in to comment.