Skip to content

Commit

Permalink
perf: lessen memory consumption for emulated mget command
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed May 2, 2024
1 parent d6fc855 commit 76e3f63
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ jobs:
- single
- excessive_pipelining
- pipelining_in_moderation
- original_mget
- emulated_mget
env:
REDIS_VERSION: '7.2'
DOCKER_COMPOSE_FILE: 'compose.yaml'
Expand Down
25 changes: 20 additions & 5 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,31 @@ def send_watch_command(command)
end
end

def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
if command.size < 3 || !::RedisClient::Cluster::KeySlotConverter.extract_hash_tag(command[1]).empty? # rubocop:disable Style/IfUnlessModifier
def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
key_step = @command.determine_key_step(cmd)
if command.size <= key_step + 1 || !::RedisClient::Cluster::KeySlotConverter.extract_hash_tag(command[1]).empty? # rubocop:disable Style/IfUnlessModifier
return try_send(assign_node(command), method, command, args, &block)
end

single_key_cmd = MULTIPLE_KEYS_COMMAND_TO_PIPELINE[cmd]
key_step = @command.determine_key_step(cmd)
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
pipeline = ::RedisClient::Cluster::Pipeline.new(self, @command_builder, @concurrent_worker, exception: true, seed: seed)
command[1..].each_slice(key_step) { |*v| pipeline.call(single_key_cmd, *v) }

# This implementation is prioritized to lessen memory consumption rather than readability.
single_key_cmd = MULTIPLE_KEYS_COMMAND_TO_PIPELINE[cmd]
single_command = Array.new(key_step + 1)
single_command[0] = single_key_cmd
if key_step == 1
command[1..].each do |key|
single_command[1] = key
pipeline.call_v(single_command)
end
else
command[1..].each_slice(key_step) do |v|
key_step.times { |i| single_command[i + 1] = v[i] }
pipeline.call_v(single_command)
end
end

replies = pipeline.execute
result = case cmd
when 'mset' then replies.first
Expand Down
58 changes: 58 additions & 0 deletions test/ips_mget.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require 'benchmark/ips'
require 'redis_cluster_client'
require 'testing_constants'

module IpsMget
module_function

ATTEMPTS = 40

def run
cli = make_client
prepare(cli)
print_letter('MGET')
bench('MGET', cli)
end

def make_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
concurrency: { model: :on_demand },
**TEST_GENERIC_OPTIONS
).new_client
end

def print_letter(title)
print "################################################################################\n"
print "# #{title}\n"
print "################################################################################\n"
print "\n"
end

def prepare(client)
ATTEMPTS.times do |i|
client.call('SET', "{key}#{i}", "val#{i}")
client.call('SET', "key#{i}", "val#{i}")
end
end

def bench(cmd, client)
original = [cmd] + Array.new(ATTEMPTS) { |i| "{key}#{i}" }
emulated = [cmd] + Array.new(ATTEMPTS) { |i| "key#{i}" }

Benchmark.ips do |x|
x.time = 5
x.warmup = 1
x.report("#{cmd}: original") { client.call_v(original) }
x.report("#{cmd}: emulated") { client.call_v(emulated) }
x.compare!
end
end
end

IpsMget.run
10 changes: 10 additions & 0 deletions test/prof_mem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module ProfMem
ATTEMPT_COUNT = 1000
MAX_PIPELINE_SIZE = 100
SLICED_NUMBERS = (1..ATTEMPT_COUNT).each_slice(MAX_PIPELINE_SIZE).freeze
ORIGINAL_MGET = (%w[MGET] + Array.new(40) { |i| "{key}#{i}" }).freeze
EMULATED_MGET = (%w[MGET] + Array.new(40) { |i| "key#{i}" }).freeze
CLI_TYPES = %w[primary_only scale_read_random scale_read_latency pooled].freeze
MODES = {
single: lambda do |client_builder_method|
Expand Down Expand Up @@ -38,6 +40,14 @@ module ProfMem
numbers.each { |i| pi.call('GET', i) }
end
end
end,
original_mget: lambda do |client_builder_method|
cli = send(client_builder_method)
ATTEMPT_COUNT.times { cli.call_v(ORIGINAL_MGET) }
end,
emulated_mget: lambda do |client_builder_method|
cli = send(client_builder_method)
ATTEMPT_COUNT.times { cli.call_v(EMULATED_MGET) }
end
}.freeze

Expand Down
7 changes: 7 additions & 0 deletions test/prof_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
module ProfStack
SIZE = 40
ATTEMPTS = 1000
ORIGINAL_MGET = (%w[MGET] + Array.new(SIZE) { |i| "{key}#{i}" }).freeze
EMULATED_MGET = (%w[MGET] + Array.new(SIZE) { |i| "key#{i}" }).freeze

module_function

Expand Down Expand Up @@ -36,6 +38,7 @@ def prepare(client)
SIZE.times do |j|
n = SIZE * i + j
pi.call('SET', "key#{n}", "val#{n}")
pi.call('SET', "{key}#{n}", "val#{n}")
end
end
end
Expand All @@ -58,6 +61,10 @@ def execute(client, mode)
end
end
end
when :original_mget
ATTEMPTS.times { client.call_v(ORIGINAL_MGET) }
when :emulated_mget
ATTEMPTS.times { client.call_v(EMULATED_MGET) }
else raise ArgumentError, mode
end
end
Expand Down

0 comments on commit 76e3f63

Please sign in to comment.