diff --git a/.gitignore b/.gitignore index f29db7c..b47eb5f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .bundle *.gem +*.json Gemfile.lock diff --git a/Gemfile b/Gemfile index 75902e5..8060a60 100644 --- a/Gemfile +++ b/Gemfile @@ -15,3 +15,4 @@ gem 'rubocop-minitest', require: false gem 'rubocop-performance', require: false gem 'rubocop-rake', require: false gem 'stackprof', platform: :mri +gem 'vernier', platform: :mri if RUBY_ENGINE == 'ruby' && Integer(RUBY_VERSION.split('.').first) > 2 diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index d31de89..dc049ef 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -62,30 +62,37 @@ def blocking_call_v(timeout, command, &block) end def scan(*args, **kwargs, &block) - raise ArgumentError, 'block required' unless block + return to_enum(__callee__, *args, **kwargs) unless block_given? + command = @command_builder.generate(['SCAN', ZERO_CURSOR_FOR_SCAN] + args, kwargs) seed = Random.new_seed - cursor = ZERO_CURSOR_FOR_SCAN loop do - cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs) + cursor, keys = router.scan(command, seed: seed) + command[1] = cursor keys.each(&block) break if cursor == ZERO_CURSOR_FOR_SCAN end end def sscan(key, *args, **kwargs, &block) - node = router.assign_node(['SSCAN', key]) - router.try_delegate(node, :sscan, key, *args, **kwargs, &block) + return to_enum(__callee__, key, *args, **kwargs) unless block_given? + + command = @command_builder.generate(['SSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + router.scan_single_key(command, arity: 1, &block) end def hscan(key, *args, **kwargs, &block) - node = router.assign_node(['HSCAN', key]) - router.try_delegate(node, :hscan, key, *args, **kwargs, &block) + return to_enum(__callee__, key, *args, **kwargs) unless block_given? + + command = @command_builder.generate(['HSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + router.scan_single_key(command, arity: 2, &block) end def zscan(key, *args, **kwargs, &block) - node = router.assign_node(['ZSCAN', key]) - router.try_delegate(node, :zscan, key, *args, **kwargs, &block) + return to_enum(__callee__, key, *args, **kwargs) unless block_given? + + command = @command_builder.generate(['ZSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + router.scan_single_key(command, arity: 2, &block) end def pipelined(exception: true) diff --git a/lib/redis_client/cluster/noop_command_builder.rb b/lib/redis_client/cluster/noop_command_builder.rb new file mode 100644 index 0000000..803c5c7 --- /dev/null +++ b/lib/redis_client/cluster/noop_command_builder.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class RedisClient + class Cluster + module NoopCommandBuilder + module_function + + def generate(args, _kwargs = nil) + args + end + end + end +end diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index ebe9c6e..42ba0a4 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -2,6 +2,7 @@ require 'redis_client' require 'redis_client/cluster/errors' +require 'redis_client/cluster/noop_command_builder' require 'redis_client/connection_mixin' require 'redis_client/middlewares' require 'redis_client/pooled' @@ -229,7 +230,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met def append_pipeline(node_key) @pipelines ||= {} - @pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(@command_builder) + @pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(::RedisClient::Cluster::NoopCommandBuilder) @pipelines[node_key].add_outer_index(@size) @size += 1 @pipelines[node_key] diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index dad104c..0cd9076 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -104,12 +104,6 @@ def try_send(node, method, command, args, retry_count: 3, &block) end end - def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) - handle_redirection(node, nil, retry_count: retry_count) do |on_node| - on_node.public_send(method, *args, **kwargs, &block) - end - end - def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity yield node rescue ::RedisClient::CircuitBreaker::OpenCircuitError @@ -153,9 +147,7 @@ def handle_redirection(node, command, retry_count:) # rubocop:disable Metrics/Ab raise end - def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize - command = @command_builder.generate(command, kwargs) - + def scan(command, seed: nil) # rubocop:disable Metrics/AbcSize command[1] = ZERO_CURSOR_FOR_SCAN if command.size == 1 input_cursor = Integer(command[1]) @@ -180,6 +172,16 @@ def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize raise end + def scan_single_key(command, arity:, &block) + node = assign_node(command) + loop do + cursor, values = handle_redirection(node, nil, retry_count: 3) { |n| n.call_v(command) } + command[2] = cursor + arity < 2 ? values.each(&block) : values.each_slice(arity, &block) + break if cursor == ZERO_CURSOR_FOR_SCAN + end + end + def assign_node(command) handle_node_reload_error do node_key = find_node_key(command) diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index c35b52c..d3c5f06 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -2,6 +2,7 @@ require 'redis_client' require 'redis_client/cluster/errors' +require 'redis_client/cluster/noop_command_builder' require 'redis_client/cluster/pipeline' class RedisClient @@ -18,7 +19,7 @@ def initialize(router, command_builder, node: nil, slot: nil, asking: false) @router = router @command_builder = command_builder @retryable = true - @pipeline = ::RedisClient::Pipeline.new(@command_builder) + @pipeline = ::RedisClient::Pipeline.new(::RedisClient::Cluster::NoopCommandBuilder) @pending_commands = [] @node = node prepare_tx unless @node.nil? diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 18cde27..6b0504e 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -5,6 +5,7 @@ require 'redis_client/cluster' require 'redis_client/cluster/errors' require 'redis_client/cluster/node_key' +require 'redis_client/cluster/noop_command_builder' require 'redis_client/command_builder' class RedisClient @@ -64,7 +65,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists end def inspect - "#<#{self.class.name} #{startup_nodes.values}>" + "#<#{self.class.name} #{startup_nodes.values.map { |v| v.reject { |k| k == :command_builder } }}>" end def read_timeout @@ -187,6 +188,7 @@ def build_startup_nodes(configs) def augment_client_config(config) config = @client_config.merge(config) config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty? + config[:command_builder] = ::RedisClient::Cluster::NoopCommandBuilder # prevent twice call config end end diff --git a/test/prof_stack2.rb b/test/prof_stack2.rb new file mode 100644 index 0000000..d768dd3 --- /dev/null +++ b/test/prof_stack2.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require 'vernier' +require 'redis_cluster_client' +require 'testing_constants' + +module ProfStack2 + SIZE = 40 + ATTEMPTS = 1000 + + module_function + + def run + client = make_client + prepare(client) + + case mode = ENV.fetch('PROFILE_MODE', :single).to_sym + when :single + execute(client, mode) do |client| + ATTEMPTS.times { |i| client.call('GET', "key#{i}") } + end + when :transaction + execute(client, mode) do |client| + ATTEMPTS.times do |i| + client.multi do |tx| + SIZE.times do |j| + n = SIZE * i + j + tx.call('SET', "{group:#{i}}:key:#{n}", n) + end + end + end + end + when :pipeline + execute(client, mode) do |client| + ATTEMPTS.times do |i| + client.pipelined do |pi| + SIZE.times do |j| + n = SIZE * i + j + pi.call('GET', "key#{n}") + end + end + end + end + end + end + + def execute(client, mode) + Vernier.profile(out: "vernier_#{mode}.json") do + yield(client) + end + end + + def make_client + ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + replica: true, + replica_affinity: :random, + fixed_hostname: TEST_FIXED_HOSTNAME, + # concurrency: { model: :on_demand, size: 6 }, + # concurrency: { model: :pooled, size: 6 }, + concurrency: { model: :none }, + **TEST_GENERIC_OPTIONS + ).new_client + end + + def prepare(client) + ATTEMPTS.times do |i| + client.pipelined do |pi| + SIZE.times do |j| + n = SIZE * i + j + pi.call('SET', "key#{n}", "val#{n}") + end + end + end + end +end + +ProfStack2.run diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index d20f629..aaf3c15 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -92,8 +92,6 @@ def test_blocking_call end def test_scan - assert_raises(ArgumentError) { @client.scan } - 10.times { |i| @client.call('SET', "key#{i}", i) } wait_for_replication want = (0..9).map { |i| "key#{i}" } @@ -117,9 +115,9 @@ def test_hscan 10.times do |i| 10.times { |j| @client.call('HSET', "key#{i}", "field#{j}", j) } wait_for_replication - want = (0..9).map { |j| "field#{j}" } + want = (0..9).map { |j| ["field#{j}", j.to_s] } got = [] - @client.hscan("key#{i}", 'COUNT', '5') { |field| got << field } + @client.hscan("key#{i}", 'COUNT', '5') { |pair| got << pair } assert_equal(want, got.sort) end end @@ -128,9 +126,9 @@ def test_zscan 10.times do |i| 10.times { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") } wait_for_replication - want = (0..9).map { |j| "member#{j}" } + want = (0..9).map { |j| ["member#{j}", j.to_s] } got = [] - @client.zscan("key#{i}", 'COUNT', '5') { |member| got << member } + @client.zscan("key#{i}", 'COUNT', '5') { |pair| got << pair } assert_equal(want, got.sort) end end diff --git a/test/redis_client/test_cluster_config.rb b/test/redis_client/test_cluster_config.rb index 01d784a..8ecd23a 100644 --- a/test/redis_client/test_cluster_config.rb +++ b/test/redis_client/test_cluster_config.rb @@ -38,38 +38,38 @@ def test_startup_nodes { config: ::RedisClient::ClusterConfig.new, want: { - '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 } + '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } }, { config: ::RedisClient::ClusterConfig.new(replica: true), want: { - '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379 } + '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } }, { config: ::RedisClient::ClusterConfig.new(fixed_hostname: 'endpoint.example.com'), want: { - '127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379 } + '127.0.0.1:6379' => { host: 'endpoint.example.com', port: 6379, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } }, { config: ::RedisClient::ClusterConfig.new(timeout: 1), want: { - '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1 } + '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } }, { config: ::RedisClient::ClusterConfig.new(nodes: ['redis://1.2.3.4:1234', 'rediss://5.6.7.8:5678']), want: { - '1.2.3.4:1234' => { host: '1.2.3.4', port: 1234 }, - '5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true } + '1.2.3.4:1234' => { host: '1.2.3.4', port: 1234, command_builder: ::RedisClient::Cluster::NoopCommandBuilder }, + '5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } }, { config: ::RedisClient::ClusterConfig.new(custom: { foo: 'bar' }), want: { - '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' } } + '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' }, command_builder: ::RedisClient::Cluster::NoopCommandBuilder } } } ].each_with_index do |c, idx| @@ -182,13 +182,15 @@ def test_client_config_for_node nodes: ['redis://username:password@1.2.3.4:1234', 'rediss://5.6.7.8:5678'], custom: { foo: 'bar' } ) - assert_equal({ - host: '9.9.9.9', - port: 9999, - username: 'username', - password: 'password', - custom: { foo: 'bar' } - }, config.client_config_for_node('9.9.9.9:9999')) + want = { + host: '9.9.9.9', + port: 9999, + username: 'username', + password: 'password', + custom: { foo: 'bar' }, + command_builder: ::RedisClient::Cluster::NoopCommandBuilder + } + assert_equal(want, config.client_config_for_node('9.9.9.9:9999')) end def test_client_config_id