Skip to content

Commit

Permalink
test: add a case for the recoverability from cluster down
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Sep 28, 2024
1 parent b5b3e08 commit a8d450f
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {task: test_cluster_down}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {redis: '7.0', ruby: '3.1'}
Expand Down
2 changes: 1 addition & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require 'bundler/gem_helper'
RuboCop::RakeTask.new
Bundler::GemHelper.install_tasks

SLUGGISH_TEST_TYPES = %w[broken scale state].freeze
SLUGGISH_TEST_TYPES = %w[down broken scale state].freeze

task default: :test

Expand Down
4 changes: 3 additions & 1 deletion bin/singlepiptx
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ module SinglePipTxDebug
def spawn_transaction(cli)
Thread.new(cli) do |r|
role = 'Transaction'
i = 0

loop do
handle_errors(role) do
reply = r.multi do |tx|
reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx|
i += 1
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
Expand Down
200 changes: 200 additions & 0 deletions test/test_against_cluster_down.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# frozen_string_literal: true

require 'testing_helper'

class TestAgainstClusterDown < TestingWrapper
WAIT_SEC = 1

def setup
@captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new
@redirect_count = ::Middlewares::RedirectCount::Counter.new
@clients = Array.new(5) { build_client }
@threads = []
@controller = nil
@cluster_down_error_count = 0
@last_pubsub_message = nil
@down_counter_lock = Mutex.new
@pubsub_message_lock = Mutex.new
end

def teardown
@controller&.close
@threads&.each(&:exit)
@clients&.each(&:close)
print "#{@redirect_count.get}, "\
"ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\
"ClusterDownError: #{@cluster_down_error_count} = "
end

def test_recoverability_from_cluster_down
@threads << spawn_single(@clients[0])
@threads << spawn_pipeline(@clients[1])
@threads << spawn_transaction(@clients[2])
@threads << spawn_subscriber(@clients[3])
@threads << spawn_publisher(@clients[4])

system('docker compose --progress quiet down', exception: true)
system('docker system prune --force --volumes', exception: true, out: File::NULL)
system('docker compose --progress quiet up --detach', exception: true)
@controller = build_controller
@controller.wait_for_cluster_to_be_ready

client = build_client
@clients << client

single_value1 = client.call('get', 'single', &:to_i)
pipeline_value1 = client.call('get', 'pipeline', &:to_i)
transaction_value1 = client.call('get', 'transaction', &:to_i)
pubsub_message1 = refer_pubsub_message.to_i

sleep WAIT_SEC * 3

single_value2 = client.call('get', 'single', &:to_i)
pipeline_value2 = client.call('get', 'pipeline', &:to_i)
transaction_value2 = client.call('get', 'transaction', &:to_i)
pubsub_message2 = refer_pubsub_message.to_i

assert(single_value1 < single_value2, "Single: #{single_value1} < #{single_value2}")
assert(pipeline_value1 < pipeline_value2, "Pipeline: #{pipeline_value1} < #{pipeline_value2}")
assert(transaction_value1 < transaction_value2, "Transaction: #{transaction_value1} < #{transaction_value2}")
assert(pubsub_message1 < pubsub_message2, "PubSub: #{pubsub_message1} < #{pubsub_message2}")
end

private

def build_client
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
connect_with_original_config: true,
fixed_hostname: TEST_FIXED_HOSTNAME,
custom: { captured_commands: @captured_commands, redirect_count: @redirect_count },
middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount],
**TEST_GENERIC_OPTIONS
).new_client
end

def build_controller
ClusterController.new(
TEST_NODE_URIS,
replica_size: TEST_REPLICA_SIZE,
**TEST_GENERIC_OPTIONS.merge(timeout: 30.0)
)
end

def spawn_single(cli)
Thread.new(cli) do |r|
loop do
handle_errors do
r.call('incr', 'single')
r.call('incr', 'single')
end
ensure
sleep WAIT_SEC
end
end
end

def spawn_pipeline(cli)
Thread.new(cli) do |r|
loop do
handle_errors do
r.pipelined do |pi|
pi.call('incr', 'pipeline')
pi.call('incr', 'pipeline')
end
end
ensure
sleep WAIT_SEC
end
end
end

def spawn_transaction(cli)
Thread.new(cli) do |r|
i = 0
loop do
handle_errors do
r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx|
i += 1
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
end
end
ensure
sleep WAIT_SEC
end
end
end

def spawn_publisher(cli)
Thread.new(cli) do |r|
i = 0
loop do
handle_errors do
r.call('spublish', 'chan', i)
i += 1
end
ensure
sleep WAIT_SEC
end
end
end

def spawn_subscriber(cli)
Thread.new(cli) do |r|
ps = nil

loop do
ps = r.pubsub
ps.call('ssubscribe', 'chan')
break
rescue StandardError
ps&.close
ensure
sleep WAIT_SEC
end

loop do
handle_errors do
event = ps.next_event(0.01)
case event&.first
when 'smessage' then update_pubsub_message(event[2])
end
end
ensure
sleep WAIT_SEC
end
rescue StandardError, SignalException
ps&.close
raise
end
end

def handle_errors
yield
rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::InitialSetupError, ::RedisClient::Cluster::NodeMightBeDown
increment_down_count
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('CLUSTERDOWN')

increment_down_count
rescue ::RedisClient::Cluster::ErrorCollection => e
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end

increment_down_count
end

def increment_down_count
@down_counter_lock.synchronize { @cluster_down_error_count += 1 }
end

def update_pubsub_message(message)
@pubsub_message_lock.synchronize { @last_pubsub_message = message }
end

def refer_pubsub_message
@pubsub_message_lock.synchronize { @last_pubsub_message }
end
end

0 comments on commit a8d450f

Please sign in to comment.