Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: command builder shouldn't be called multiple times #406

Merged
merged 5 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.bundle
*.gem
*.json
Gemfile.lock
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ gem 'rubocop-minitest', require: false
gem 'rubocop-performance', require: false
gem 'rubocop-rake', require: false
gem 'stackprof', platform: :mri
gem 'vernier', platform: :mri if RUBY_ENGINE == 'ruby' && Integer(RUBY_VERSION.split('.').first) > 2
25 changes: 16 additions & 9 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,37 @@ def blocking_call_v(timeout, command, &block)
end

def scan(*args, **kwargs, &block)
raise ArgumentError, 'block required' unless block
return to_enum(__callee__, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SCAN', ZERO_CURSOR_FOR_SCAN] + args, kwargs)
seed = Random.new_seed
cursor = ZERO_CURSOR_FOR_SCAN
loop do
cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
cursor, keys = router.scan(command, seed: seed)
command[1] = cursor
keys.each(&block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end

def sscan(key, *args, **kwargs, &block)
node = router.assign_node(['SSCAN', key])
router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 1, &block)
end

def hscan(key, *args, **kwargs, &block)
node = router.assign_node(['HSCAN', key])
router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['HSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

def zscan(key, *args, **kwargs, &block)
node = router.assign_node(['ZSCAN', key])
router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['ZSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

def pipelined(exception: true)
Expand Down
13 changes: 13 additions & 0 deletions lib/redis_client/cluster/noop_command_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

class RedisClient
class Cluster
module NoopCommandBuilder
module_function

def generate(args, _kwargs = nil)
args
end
end
end
end
3 changes: 2 additions & 1 deletion lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/connection_mixin'
require 'redis_client/middlewares'
require 'redis_client/pooled'
Expand Down Expand Up @@ -229,7 +230,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met

def append_pipeline(node_key)
@pipelines ||= {}
@pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(@command_builder)
@pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(::RedisClient::Cluster::NoopCommandBuilder)
@pipelines[node_key].add_outer_index(@size)
@size += 1
@pipelines[node_key]
Expand Down
20 changes: 11 additions & 9 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,6 @@ def try_send(node, method, command, args, retry_count: 3, &block)
end
end

def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block)
handle_redirection(node, nil, retry_count: retry_count) do |on_node|
on_node.public_send(method, *args, **kwargs, &block)
end
end

def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
yield node
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
Expand Down Expand Up @@ -153,9 +147,7 @@ def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/Ab
raise
end

def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
command = @command_builder.generate(command, kwargs)

def scan(command, seed: nil) # rubocop:disable Metrics/AbcSize
command[1] = ZERO_CURSOR_FOR_SCAN if command.size == 1
input_cursor = Integer(command[1])

Expand All @@ -180,6 +172,16 @@ def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
raise
end

def scan_single_key(command, arity:, &block)
node = assign_node(command)
loop do
cursor, values = handle_redirection(node, nil, retry_count: 3) { |n| n.call_v(command) }
command[2] = cursor
arity < 2 ? values.each(&block) : values.each_slice(arity, &block)
break if cursor == ZERO_CURSOR_FOR_SCAN
end
end

def assign_node(command)
handle_node_reload_error do
node_key = find_node_key(command)
Expand Down
3 changes: 2 additions & 1 deletion lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/cluster/pipeline'

class RedisClient
Expand All @@ -18,7 +19,7 @@ def initialize(router, command_builder, node: nil, slot: nil, asking: false)
@router = router
@command_builder = command_builder
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@pipeline = ::RedisClient::Pipeline.new(::RedisClient::Cluster::NoopCommandBuilder)
@pending_commands = []
@node = node
prepare_tx unless @node.nil?
Expand Down
4 changes: 3 additions & 1 deletion lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'redis_client/cluster'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node_key'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/command_builder'

class RedisClient
Expand Down Expand Up @@ -64,7 +65,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists
end

def inspect
"#<#{self.class.name} #{startup_nodes.values}>"
"#<#{self.class.name} #{startup_nodes.values.map { |v| v.reject { |k| k == :command_builder } }}>"
end

def read_timeout
Expand Down Expand Up @@ -187,6 +188,7 @@ def build_startup_nodes(configs)
def augment_client_config(config)
config = @client_config.merge(config)
config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty?
config[:command_builder] = ::RedisClient::Cluster::NoopCommandBuilder # prevent twice call
config
end
end
Expand Down
78 changes: 78 additions & 0 deletions test/prof_stack2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

require 'vernier'
require 'redis_cluster_client'
require 'testing_constants'

module ProfStack2
SIZE = 40
ATTEMPTS = 1000

module_function

def run
client = make_client
prepare(client)

case mode = ENV.fetch('PROFILE_MODE', :single).to_sym
when :single
execute(client, mode) do |client|
ATTEMPTS.times { |i| client.call('GET', "key#{i}") }
end
when :transaction
execute(client, mode) do |client|
ATTEMPTS.times do |i|
client.multi do |tx|
SIZE.times do |j|
n = SIZE * i + j
tx.call('SET', "{group:#{i}}:key:#{n}", n)
end
end
end
end
when :pipeline
execute(client, mode) do |client|
ATTEMPTS.times do |i|
client.pipelined do |pi|
SIZE.times do |j|
n = SIZE * i + j
pi.call('GET', "key#{n}")
end
end
end
end
end
end

def execute(client, mode)
Vernier.profile(out: "vernier_#{mode}.json") do
yield(client)
end
end

def make_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
# concurrency: { model: :on_demand, size: 6 },
# concurrency: { model: :pooled, size: 6 },
concurrency: { model: :none },
**TEST_GENERIC_OPTIONS
).new_client
end

def prepare(client)
ATTEMPTS.times do |i|
client.pipelined do |pi|
SIZE.times do |j|
n = SIZE * i + j
pi.call('SET', "key#{n}", "val#{n}")
end
end
end
end
end

ProfStack2.run
10 changes: 4 additions & 6 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ def test_blocking_call
end

def test_scan
assert_raises(ArgumentError) { @client.scan }

10.times { |i| @client.call('SET', "key#{i}", i) }
wait_for_replication
want = (0..9).map { |i| "key#{i}" }
Expand All @@ -117,9 +115,9 @@ def test_hscan
10.times do |i|
10.times { |j| @client.call('HSET', "key#{i}", "field#{j}", j) }
wait_for_replication
want = (0..9).map { |j| "field#{j}" }
want = (0..9).map { |j| ["field#{j}", j.to_s] }
got = []
@client.hscan("key#{i}", 'COUNT', '5') { |field| got << field }
@client.hscan("key#{i}", 'COUNT', '5') { |pair| got << pair }
assert_equal(want, got.sort)
end
end
Expand All @@ -128,9 +126,9 @@ def test_zscan
10.times do |i|
10.times { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") }
wait_for_replication
want = (0..9).map { |j| "member#{j}" }
want = (0..9).map { |j| ["member#{j}", j.to_s] }
got = []
@client.zscan("key#{i}", 'COUNT', '5') { |member| got << member }
@client.zscan("key#{i}", 'COUNT', '5') { |pair| got << pair }
assert_equal(want, got.sort)
end
end
Expand Down
30 changes: 16 additions & 14 deletions test/redis_client/test_cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,38 @@ def test_startup_nodes
{
config: ::RedisClient::ClusterConfig.new,
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(replica: true),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(fixed_hostname: 'endpoint.example.com'),
want: {
'127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379 }
'127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(timeout: 1),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1 }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(nodes: ['redis://1.2.3.4:1234', 'rediss://5.6.7.8:5678']),
want: {
'1.2.3.4:1234' => { host: '1.2.3.4', port: 1234 },
'5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true }
'1.2.3.4:1234' => { host: '1.2.3.4', port: 1234, command_builder: ::RedisClient::Cluster::NoopCommandBuilder },
'5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
},
{
config: ::RedisClient::ClusterConfig.new(custom: { foo: 'bar' }),
want: {
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' } }
'127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' }, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }
}
}
].each_with_index do |c, idx|
Expand Down Expand Up @@ -182,13 +182,15 @@ def test_client_config_for_node
nodes: ['redis://username:password@1.2.3.4:1234', 'rediss://5.6.7.8:5678'],
custom: { foo: 'bar' }
)
assert_equal({
host: '9.9.9.9',
port: 9999,
username: 'username',
password: 'password',
custom: { foo: 'bar' }
}, config.client_config_for_node('9.9.9.9:9999'))
want = {
host: '9.9.9.9',
port: 9999,
username: 'username',
password: 'password',
custom: { foo: 'bar' },
command_builder: ::RedisClient::Cluster::NoopCommandBuilder
}
assert_equal(want, config.client_config_for_node('9.9.9.9:9999'))
end

def test_client_config_id
Expand Down