Skip to content

Commit

Permalink
perf: command builder shouldn't be called multiple times (#406)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Nov 17, 2024
1 parent ff2969d commit 5817a0b
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.bundle
*.gem
*.json
Gemfile.lock
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ gem 'rubocop-minitest', require: false
gem 'rubocop-performance', require: false
gem 'rubocop-rake', require: false
gem 'stackprof', platform: :mri
gem 'vernier', platform: :mri if RUBY_ENGINE == 'ruby' && Integer(RUBY_VERSION.split('.').first) > 2
25 changes: 16 additions & 9 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,37 @@ def blocking_call_v(timeout, command, &block)
end

def scan(*args, **kwargs, &block)
raise ArgumentError, 'block required' unless block
return to_enum(__callee__, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SCAN', ZERO_CURSOR_FOR_SCAN] + args, kwargs)
seed = Random.new_seed
cursor = ZERO_CURSOR_FOR_SCAN
loop do
cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
cursor, keys = router.scan(command, seed: seed)
command[1] = cursor
keys.each(&block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end

def sscan(key, *args, **kwargs, &block)
node = router.assign_node(['SSCAN', key])
router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 1, &block)
end

def hscan(key, *args, **kwargs, &block)
node = router.assign_node(['HSCAN', key])
router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['HSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

def zscan(key, *args, **kwargs, &block)
node = router.assign_node(['ZSCAN', key])
router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['ZSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

def pipelined(exception: true)
Expand Down
13 changes: 13 additions & 0 deletions lib/redis_client/cluster/noop_command_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

class RedisClient
class Cluster
module NoopCommandBuilder
module_function

def generate(args, _kwargs = nil)
args
end
end
end
end
3 changes: 2 additions & 1 deletion lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/connection_mixin'
require 'redis_client/middlewares'
require 'redis_client/pooled'
Expand Down Expand Up @@ -229,7 +230,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met

def append_pipeline(node_key)
@pipelines ||= {}
@pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(@command_builder)
@pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(::RedisClient::Cluster::NoopCommandBuilder)
@pipelines[node_key].add_outer_index(@size)
@size += 1
@pipelines[node_key]
Expand Down
20 changes: 11 additions & 9 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,6 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
Expand Down Expand Up @@ -153,9 +147,7 @@ def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/Ab
raise
end

def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
command = @command_builder.generate(command, kwargs)

def scan(command, seed: nil) # rubocop:disable Metrics/AbcSize
command[1] = ZERO_CURSOR_FOR_SCAN if command.size == 1
input_cursor = Integer(command[1])

Expand All @@ -180,6 +172,16 @@ def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
raise
end

def scan_single_key(command, arity:, &block)
node = assign_node(command)
loop do
cursor, values = handle_redirection(node, nil, retry_count: 3) { |n| n.call_v(command) }
command[2] = cursor
arity < 2 ? values.each(&block) : values.each_slice(arity, &block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end

def assign_node(command)
handle_node_reload_error do
node_key = find_node_key(command)
Expand Down
3 changes: 2 additions & 1 deletion lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/cluster/pipeline'

class RedisClient
Expand All @@ -18,7 +19,7 @@ def initialize(router, command_builder, node: nil, slot: nil, asking: false)
@router = router
@command_builder = command_builder
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@pipeline = ::RedisClient::Pipeline.new(::RedisClient::Cluster::NoopCommandBuilder)
@pending_commands = []
@node = node
prepare_tx unless @node.nil?
Expand Down
4 changes: 3 additions & 1 deletion lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'redis_client/cluster'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node_key'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/command_builder'

class RedisClient
Expand Down Expand Up @@ -64,7 +65,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists
end

def inspect
"#<#{self.class.name} #{startup_nodes.values}>"
"#<#{self.class.name} #{startup_nodes.values.map { |v| v.reject { |k| k == :command_builder } }}>"
end

def read_timeout
Expand Down Expand Up @@ -187,6 +188,7 @@ def build_startup_nodes(configs)
def augment_client_config(config)
config = @client_config.merge(config)
config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty?
config[:command_builder] = ::RedisClient::Cluster::NoopCommandBuilder # prevent twice call
config
end
end
Expand Down
78 changes: 78 additions & 0 deletions test/prof_stack2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

require 'vernier'
require 'redis_cluster_client'
require 'testing_constants'

module ProfStack2
SIZE = 40
ATTEMPTS = 1000

module_function

def run
client = make_client
prepare(client)

case mode = ENV.fetch('PROFILE_MODE', :single).to_sym
when :single
execute(client, mode) do |client|
ATTEMPTS.times { |i| client.call('GET', "key#{i}") }
end
when :transaction
execute(client, mode) do |client|
ATTEMPTS.times do |i|
client.multi do |tx|
SIZE.times do |j|
n = SIZE * i + j
tx.call('SET', "{group:#{i}}:key:#{n}", n)
end
end
end
end
when :pipeline
execute(client, mode) do |client|
ATTEMPTS.times do |i|
client.pipelined do |pi|
SIZE.times do |j|
n = SIZE * i + j
pi.call('GET', "key#{n}")
end
end
end
end
end
end

def execute(client, mode)
Vernier.profile(out: "vernier_#{mode}.json") do
yield(client)
end
end

def make_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
# concurrency: { model: :on_demand, size: 6 },
# concurrency: { model: :pooled, size: 6 },
concurrency: { model: :none },
**TEST_GENERIC_OPTIONS
).new_client
end

def prepare(client)
ATTEMPTS.times do |i|
client.pipelined do |pi|
SIZE.times do |j|
n = SIZE * i + j
pi.call('SET', "key#{n}", "val#{n}")
end
end
end
end
end

ProfStack2.run
10 changes: 4 additions & 6 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ def test_blocking_call
end

def test_scan
assert_raises(ArgumentError) { @client.scan }

10.times { |i| @client.call('SET', "key#{i}", i) }
wait_for_replication
want = (0..9).map { |i| "key#{i}" }
Expand All @@ -117,9 +115,9 @@ def test_hscan
10.times do |i|
10.times { |j| @client.call('HSET', "key#{i}", "field#{j}", j) }
wait_for_replication
want = (0..9).map { |j| "field#{j}" }
want = (0..9).map { |j| ["field#{j}", j.to_s] }
got = []
@client.hscan("key#{i}", 'COUNT', '5') { |field| got << field }
@client.hscan("key#{i}", 'COUNT', '5') { |pair| got << pair }
assert_equal(want, got.sort)
end
end
Expand All @@ -128,9 +126,9 @@ def test_zscan
10.times do |i|
10.times { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") }
wait_for_replication
want = (0..9).map { |j| "member#{j}" }
want = (0..9).map { |j| ["member#{j}", j.to_s] }
got = []
@client.zscan("key#{i}", 'COUNT', '5') { |member| got << member }
@client.zscan("key#{i}", 'COUNT', '5') { |pair| got << pair }
assert_equal(want, got.sort)
end
end
Expand Down
30 changes: 16 additions & 14 deletions test/redis_client/test_cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,38 @@ def test_startup_nodes
{
config: ::RedisClient::ClusterConfig.new,
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(replica: true),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(fixed_hostname: 'endpoint.example.com'),
want: {
'127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379 }
'127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(timeout: 1),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(nodes: ['redis://1.2.3.4:1234', 'rediss://5.6.7.8:5678']),
want: {
'1.2.3.4:1234' => { host: '1.2.3.4', port: 1234 },
'5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true }
'1.2.3.4:1234' => { host: '1.2.3.4', port: 1234, command_builder: ::RedisClient::Cluster::NoopCommandBuilder },
'5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(custom: { foo: 'bar' }),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' } }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' }, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
}
].each_with_index do |c, idx|
Expand Down Expand Up @@ -182,13 +182,15 @@ def test_client_config_for_node
nodes: ['redis://username:password@1.2.3.4:1234', 'rediss://5.6.7.8:5678'],
custom: { foo: 'bar' }
)
assert_equal({
host: '9.9.9.9',
port: 9999,
username: 'username',
password: 'password',
custom: { foo: 'bar' }
}, config.client_config_for_node('9.9.9.9:9999'))
want = {
host: '9.9.9.9',
port: 9999,
username: 'username',
password: 'password',
custom: { foo: 'bar' },
command_builder: ::RedisClient::Cluster::NoopCommandBuilder
}
assert_equal(want, config.client_config_for_node('9.9.9.9:9999'))
end

def test_client_config_id
Expand Down

0 comments on commit 5817a0b

Please sign in to comment.