Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Sep 28, 2024
1 parent f30391c commit 8d0249b
Showing 1 changed file with 57 additions and 30 deletions.
87 changes: 57 additions & 30 deletions test/test_against_cluster_down.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ def setup
@clients = Array.new(5) { build_client }
@threads = []
@controller = nil
@cluster_down_error_count = 0
@last_pubsub_message = nil
@down_counter_lock = Mutex.new
@pubsub_message_lock = Mutex.new
@cluster_down_counter = Counter.new
@pubsub_recorder = Recorder.new
@captured_commands.clear
@redirect_count.clear
end
Expand All @@ -25,7 +23,7 @@ def teardown
@clients&.each(&:close)
print "#{@redirect_count.get}, "\
"ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\
"ClusterDownError: #{@cluster_down_error_count} = "
"ClusterDownError: #{@cluster_down_counter.get} = "
end

def test_recoverability_from_cluster_down
Expand All @@ -41,9 +39,9 @@ def test_recoverability_from_cluster_down
@controller = build_controller
@controller.wait_for_cluster_to_be_ready

wait_for_threads_to_be_stable
wait_for_jobs_to_be_stable

refute(refer_down_count.zero?, 'Case: cluster down count')
refute(@cluster_down_counter.get.zero?, 'Case: cluster down count')
refute(@captured_commands.count('cluster', 'nodes').zero?, 'Case: cluster nodes calls')

client = build_client
Expand All @@ -52,14 +50,14 @@ def test_recoverability_from_cluster_down
single_value1 = client.call('get', 'single', &:to_i)
pipeline_value1 = client.call('get', 'pipeline', &:to_i)
transaction_value1 = client.call('get', 'transaction', &:to_i)
pubsub_message1 = refer_pubsub_message.to_i
pubsub_message1 = @pubsub_recorder.get.to_i

sleep WAIT_SEC * 30
wait_until_jobs_to_progress_enough

single_value2 = client.call('get', 'single', &:to_i)
pipeline_value2 = client.call('get', 'pipeline', &:to_i)
transaction_value2 = client.call('get', 'transaction', &:to_i)
pubsub_message2 = refer_pubsub_message.to_i
pubsub_message2 = @pubsub_recorder.get.to_i

assert(single_value1 < single_value2, "Single: #{single_value1} < #{single_value2}")
assert(pipeline_value1 < pipeline_value2, "Pipeline: #{pipeline_value1} < #{pipeline_value2}")
Expand Down Expand Up @@ -165,7 +163,7 @@ def spawn_subscriber(cli)
handle_errors do
event = ps.next_event(0.01)
case event&.first
when 'smessage' then update_pubsub_message(event[2])
when 'smessage' then @pubsub_recorder.set(event[2])
end
end
ensure
Expand All @@ -180,43 +178,72 @@ def spawn_subscriber(cli)
def handle_errors
yield
rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::InitialSetupError, ::RedisClient::Cluster::NodeMightBeDown
increment_down_count
@cluster_down_counter.increment
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('CLUSTERDOWN')

increment_down_count
@cluster_down_counter.increment
rescue ::RedisClient::Cluster::ErrorCollection => e
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end

increment_down_count
@cluster_down_counter.increment
end

def increment_down_count
@down_counter_lock.synchronize { @cluster_down_error_count += 1 }
end
def wait_for_jobs_to_be_stable(attempts: 30)
now = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)

def update_pubsub_message(message)
@pubsub_message_lock.synchronize { @last_pubsub_message = message }
loop do
raise MaxRetryExceeded if attempts <= 0

attempts -= 1
before = @cluster_down_counter.get
wait_until_jobs_to_progress_enough
after = @cluster_down_counter.get
break if before == after && @pubsub_recorder.updated?(now)
end
end

def refer_down_count
@down_counter_lock.synchronize { @cluster_down_error_count }
def wait_until_jobs_to_progress_enough
sleep WAIT_SEC * (@threads.size * 2)
end

def refer_pubsub_message
@pubsub_message_lock.synchronize { @last_pubsub_message }
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end

def increment
@mutex.synchronize { @count += 1 }
end

def get
@mutex.synchronize { @count }
end
end

def wait_for_threads_to_be_stable(attempts: 30)
loop do
raise MaxRetryExceeded if attempts <= 0
class Recorder
def initialize
@last_value = nil
@updated_at = nil
@mutex = Mutex.new
end

attempts -= 1
before = refer_down_count
sleep WAIT_SEC * (@threads.size * 2)
break if before == refer_down_count
def set(value)
@mutex.synchronize do
@last_value = value
@updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end
end

def get
@mutex.synchronize { @last_value }
end

def updated?(microsecond)
microsecond < @updated_at
end
end
end

0 comments on commit 8d0249b

Please sign in to comment.