Skip to content

Commit

Permalink
chore: patch some minor changes (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Oct 8, 2024
1 parent ef518f6 commit b9b3b71
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 119 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ jobs:
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_down}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {redis: '7.0', ruby: '3.1'}
- {redis: '6.2', ruby: '3.0'}
- {redis: '5.0', ruby: '2.7'}
- {task: test_cluster_down}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
Expand Down
66 changes: 36 additions & 30 deletions bin/pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,57 @@ require 'bundler/setup'
require 'redis_cluster_client'

module PubSubDebug
WAIT_SEC = 2.0

module_function

def spawn_publisher(cli, chan)
Thread.new(cli, chan) do |r, c|
role = ' Publisher'
def spawn_publisher(client, channel)
Thread.new(client, channel) do |cli, chan|
role = 'Publisher'
i = 0

loop do
handle_errors(role) do
msg = format('%05d', i)
r.call('spublish', c, msg)
log "#{role}: sent: #{msg}"
cli.call('spublish', chan, i)
log(role, :spublish, chan, i)
i += 1
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize
Thread.new(cli, chan) do |r, c|
def spawn_subscriber(client, channel) # rubocop:disable Metrics/AbcSize
Thread.new(client, channel) do |cli, chan|
role = 'Subscriber'
ps = nil

loop do
ps = r.pubsub
ps.call('ssubscribe', c)
log "#{role}: done: subscription started to #{c}"
ps = cli.pubsub
ps.call('ssubscribe', chan)
break
rescue StandardError => e
log "#{role}: init: #{e.class}: #{e.message}"
log(role, :init, e.class, e.message)
ps&.close
ensure
sleep 1.0
sleep WAIT_SEC
end

loop do
handle_errors('Subscriber') do
e = ps.next_event(0.01)
log "#{role}: recv: #{e.nil? ? 'nil' : e}"
ps.call('ssubscribe', c) if !e.nil? && e.first == 'sunsubscribe'
event = ps.next_event(WAIT_SEC)
log(role, *event) unless event.nil?
case event&.first
when 'sunsubscribe' then ps.call('ssubscribe', chan)
end
end
ensure
sleep 1.0
end
rescue StandardError, SignalException => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
ps&.close
raise
end
Expand All @@ -64,23 +64,26 @@ module PubSubDebug
def handle_errors(role)
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: recv: #{e.class}"
log(role, e.class)
rescue RedisClient::CommandError => e
log "#{role}: recv: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.message.start_with?('CLUSTERDOWN')
rescue StandardError => e
log "#{role}: recv: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise
end

def log(msg)
print "#{msg}\n"
def log(*texts)
return if texts.nil? || texts.empty?

message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
print "#{message}\n"
end
end

clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client }
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
clients = Array.new(6) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
threads = []
channel = 'chan1'

Signal.trap(:INT) do
threads.each(&:exit)
Expand All @@ -89,6 +92,9 @@ Signal.trap(:INT) do
exit 0
end

threads << PubSubDebug.spawn_subscriber(clients[0], channel)
threads << PubSubDebug.spawn_publisher(clients[1], channel)
%w[chan1 chan2 chan3].each_with_index do |channel, i|
threads << PubSubDebug.spawn_subscriber(clients[i], channel)
threads << PubSubDebug.spawn_publisher(clients[i + 3], channel)
end

threads.each(&:join)
86 changes: 50 additions & 36 deletions bin/singlepiptx
Original file line number Diff line number Diff line change
Expand Up @@ -5,96 +5,101 @@ require 'bundler/setup'
require 'redis_cluster_client'

module SinglePipTxDebug
WAIT_SEC = 2.0

module_function

def spawn_single(cli)
Thread.new(cli) do |r|
role = ' Single'
def spawn_single(client, key)
Thread.new(client, key) do |cli, k|
role = 'Single'

loop do
handle_errors(role) do
reply = r.call('incr', 'single')
log "#{role}: #{reply}"
reply = cli.call('incr', k)
log(role, k, reply)
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_pipeline(cli)
Thread.new(cli) do |r|
role = ' Pipeline'
def spawn_pipeline(client, key)
Thread.new(client, key) do |cli, k|
role = 'Pipeline'

loop do
handle_errors(role) do
reply = r.pipelined do |pi|
pi.call('incr', 'pipeline')
pi.call('incr', 'pipeline')
reply = cli.pipelined do |pi|
pi.call('incr', k)
pi.call('incr', k)
end

log "#{role}: #{reply}"
log(role, k, reply.last)
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_transaction(cli)
Thread.new(cli) do |r|
def spawn_transaction(client, key)
Thread.new(client, key) do |cli, k|
role = 'Transaction'
i = 0

loop do
handle_errors(role) do
reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx|
i += 1
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
reply = cli.multi(watch: i.odd? ? [k] : nil) do |tx|
tx.call('incr', k)
tx.call('incr', k)
end

log "#{role}: #{reply}"
log(role, k, reply.last)
i += 1
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def handle_errors(role) # rubocop:disable Metrics/AbcSize
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: #{e.class}"
log(role, e.class)
rescue RedisClient::CommandError => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.message.start_with?('CLUSTERDOWN')
rescue RedisClient::Cluster::ErrorCollection => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end
rescue StandardError => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise
end

def log(msg)
print "#{msg}\n"
def log(*texts)
return if texts.nil? || texts.empty?

message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
print "#{message}\n"
end
end

clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client }
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
clients = Array.new(9) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
threads = []

Signal.trap(:INT) do
Expand All @@ -104,7 +109,16 @@ Signal.trap(:INT) do
exit 0
end

threads << SinglePipTxDebug.spawn_single(clients[0])
threads << SinglePipTxDebug.spawn_pipeline(clients[1])
threads << SinglePipTxDebug.spawn_transaction(clients[2])
%w[single1 single3 single4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_single(clients[i], key)
end

%w[pipeline1 pipeline2 pipeline4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_pipeline(clients[i + 3], key)
end

%w[transaction1 transaction3 transaction4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_transaction(clients[i + 6], key)
end

threads.each(&:join)
6 changes: 3 additions & 3 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class Cluster

attr_reader :config

def initialize(config, pool: nil, concurrency: nil, **kwargs)
@config = config
def initialize(config = nil, pool: nil, concurrency: nil, **kwargs)
@config = config.nil? ? ClusterConfig.new(**kwargs) : config
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
@command_builder = config.command_builder
@command_builder = @config.command_builder

@pool = pool
@kwargs = kwargs
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M
work_group.push(i, raw_client) do |client|
regular_timeout = client.read_timeout
client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout
reply = client.call('CLUSTER', 'NODES')
reply = client.call_once('CLUSTER', 'NODES')
client.read_timeout = regular_timeout
parse_cluster_node_reply(reply)
rescue StandardError => e
Expand Down
5 changes: 3 additions & 2 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,9 @@ def wait_for_replication
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
swap_timeout(@client, timeout: 0.1) do |client|
client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
rescue RedisClient::ConnectionError
# ignore
rescue RedisClient::Cluster::ErrorCollection => e
# FIXME: flaky in jruby on #test_pubsub_with_wrong_command
raise unless e.errors.values.all? { |err| err.is_a?(::RedisClient::ConnectionError) }
end
end

Expand Down
Loading

0 comments on commit b9b3b71

Please sign in to comment.