From 8d0249b22355e3c5bdaa4576633cf9cb7d325d2f Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Sat, 28 Sep 2024 16:26:27 +0900 Subject: [PATCH] fix --- test/test_against_cluster_down.rb | 87 ++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/test/test_against_cluster_down.rb b/test/test_against_cluster_down.rb index a1bb0f14..e02e8690 100644 --- a/test/test_against_cluster_down.rb +++ b/test/test_against_cluster_down.rb @@ -11,10 +11,8 @@ def setup @clients = Array.new(5) { build_client } @threads = [] @controller = nil - @cluster_down_error_count = 0 - @last_pubsub_message = nil - @down_counter_lock = Mutex.new - @pubsub_message_lock = Mutex.new + @cluster_down_counter = Counter.new + @pubsub_recorder = Recorder.new @captured_commands.clear @redirect_count.clear end @@ -25,7 +23,7 @@ def teardown @clients&.each(&:close) print "#{@redirect_count.get}, "\ "ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\ - "ClusterDownError: #{@cluster_down_error_count} = " + "ClusterDownError: #{@cluster_down_counter.get} = " end def test_recoverability_from_cluster_down @@ -41,9 +39,9 @@ def test_recoverability_from_cluster_down @controller = build_controller @controller.wait_for_cluster_to_be_ready - wait_for_threads_to_be_stable + wait_for_jobs_to_be_stable - refute(refer_down_count.zero?, 'Case: cluster down count') + refute(@cluster_down_counter.get.zero?, 'Case: cluster down count') refute(@captured_commands.count('cluster', 'nodes').zero?, 'Case: cluster nodes calls') client = build_client @@ -52,14 +50,14 @@ def test_recoverability_from_cluster_down single_value1 = client.call('get', 'single', &:to_i) pipeline_value1 = client.call('get', 'pipeline', &:to_i) transaction_value1 = client.call('get', 'transaction', &:to_i) - pubsub_message1 = refer_pubsub_message.to_i + pubsub_message1 = @pubsub_recorder.get.to_i - sleep WAIT_SEC * 30 + wait_until_jobs_to_progress_enough single_value2 = client.call('get', 'single', &:to_i) pipeline_value2 = client.call('get', 'pipeline', &:to_i) transaction_value2 = client.call('get', 'transaction', &:to_i) - pubsub_message2 = refer_pubsub_message.to_i + pubsub_message2 = @pubsub_recorder.get.to_i assert(single_value1 < single_value2, "Single: #{single_value1} < #{single_value2}") assert(pipeline_value1 < pipeline_value2, "Pipeline: #{pipeline_value1} < #{pipeline_value2}") @@ -165,7 +163,7 @@ def spawn_subscriber(cli) handle_errors do event = ps.next_event(0.01) case event&.first - when 'smessage' then update_pubsub_message(event[2]) + when 'smessage' then @pubsub_recorder.set(event[2]) end end ensure @@ -180,43 +178,72 @@ def spawn_subscriber(cli) def handle_errors yield rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::InitialSetupError, ::RedisClient::Cluster::NodeMightBeDown - increment_down_count + @cluster_down_counter.increment rescue ::RedisClient::CommandError => e raise unless e.message.start_with?('CLUSTERDOWN') - increment_down_count + @cluster_down_counter.increment rescue ::RedisClient::Cluster::ErrorCollection => e raise unless e.errors.values.all? do |err| err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError) end - increment_down_count + @cluster_down_counter.increment end - def increment_down_count - @down_counter_lock.synchronize { @cluster_down_error_count += 1 } - end + def wait_for_jobs_to_be_stable(attempts: 30) + now = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) - def update_pubsub_message(message) - @pubsub_message_lock.synchronize { @last_pubsub_message = message } + loop do + raise MaxRetryExceeded if attempts <= 0 + + attempts -= 1 + before = @cluster_down_counter.get + wait_until_jobs_to_progress_enough + after = @cluster_down_counter.get + break if before == after && @pubsub_recorder.updated?(now) + end end - def refer_down_count - @down_counter_lock.synchronize { @cluster_down_error_count } + def wait_until_jobs_to_progress_enough + sleep WAIT_SEC * (@threads.size * 2) end - def refer_pubsub_message - @pubsub_message_lock.synchronize { @last_pubsub_message } + class Counter + def initialize + @count = 0 + @mutex = Mutex.new + end + + def increment + @mutex.synchronize { @count += 1 } + end + + def get + @mutex.synchronize { @count } + end end - def wait_for_threads_to_be_stable(attempts: 30) - loop do - raise MaxRetryExceeded if attempts <= 0 + class Recorder + def initialize + @last_value = nil + @updated_at = nil + @mutex = Mutex.new + end - attempts -= 1 - before = refer_down_count - sleep WAIT_SEC * (@threads.size * 2) - break if before == refer_down_count + def set(value) + @mutex.synchronize do + @last_value = value + @updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) + end + end + + def get + @mutex.synchronize { @last_value } + end + + def updated?(microsecond) + microsecond < @updated_at end end end