diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 0cc95131..733a1232 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -40,6 +40,7 @@ jobs: - {ruby: 'jruby'} - {ruby: 'truffleruby'} - {task: test_cluster_broken, restart: 'no', startup: '6'} + - {task: test_cluster_down} - {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'} - {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml} - {redis: '7.0', ruby: '3.1'} diff --git a/Rakefile b/Rakefile index 57fcef04..6e9c1a78 100644 --- a/Rakefile +++ b/Rakefile @@ -7,7 +7,7 @@ require 'bundler/gem_helper' RuboCop::RakeTask.new Bundler::GemHelper.install_tasks -SLUGGISH_TEST_TYPES = %w[broken scale state].freeze +SLUGGISH_TEST_TYPES = %w[down broken scale state].freeze task default: :test diff --git a/bin/singlepiptx b/bin/singlepiptx index 2c9a0756..15a64a99 100755 --- a/bin/singlepiptx +++ b/bin/singlepiptx @@ -50,10 +50,12 @@ module SinglePipTxDebug def spawn_transaction(cli) Thread.new(cli) do |r| role = 'Transaction' + i = 0 loop do handle_errors(role) do - reply = r.multi do |tx| + reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx| + i += 1 tx.call('incr', 'transaction') tx.call('incr', 'transaction') tx.call('incr', 'transaction') diff --git a/test/test_against_cluster_down.rb b/test/test_against_cluster_down.rb new file mode 100644 index 00000000..b7700785 --- /dev/null +++ b/test/test_against_cluster_down.rb @@ -0,0 +1,200 @@ +# frozen_string_literal: true + +require 'testing_helper' + +class TestAgainstClusterDown < TestingWrapper + WAIT_SEC = 1 + + def setup + @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new + @redirect_count = ::Middlewares::RedirectCount::Counter.new + @clients = Array.new(5) { build_client } + @threads = [] + @controller = nil + @cluster_down_error_count = 0 + @last_pubsub_message = nil + @down_counter_lock = Mutex.new + @pubsub_message_lock = Mutex.new + end + + def teardown + @controller&.close + @threads&.each(&:exit) + @clients&.each(&:close) + print "#{@redirect_count.get}, "\ + "ClusterNodesCall: #{@captured_commands.count('cluster', 'nodes')}, "\ + "ClusterDownError: #{@cluster_down_error_count} = " + end + + def test_recoverability_from_cluster_down + @threads << spawn_single(@clients[0]) + @threads << spawn_pipeline(@clients[1]) + @threads << spawn_transaction(@clients[2]) + @threads << spawn_subscriber(@clients[3]) + @threads << spawn_publisher(@clients[4]) + + system('docker compose --progress quiet down', exception: true) + system('docker system prune --force --volumes', exception: true, out: File::NULL) + system('docker compose --progress quiet up --detach', exception: true) + @controller = build_controller + @controller.wait_for_cluster_to_be_ready + + client = build_client + @clients << client + + single_value1 = client.call('get', 'single', &:to_i) + pipeline_value1 = client.call('get', 'pipeline', &:to_i) + transaction_value1 = client.call('get', 'transaction', &:to_i) + pubsub_message1 = refer_pubsub_message.to_i + + sleep WAIT_SEC * 3 + + single_value2 = client.call('get', 'single', &:to_i) + pipeline_value2 = client.call('get', 'pipeline', &:to_i) + transaction_value2 = client.call('get', 'transaction', &:to_i) + pubsub_message2 = refer_pubsub_message.to_i + + assert(single_value1 < single_value2, "Single: #{single_value1} < #{single_value2}") + assert(pipeline_value1 < pipeline_value2, "Pipeline: #{pipeline_value1} < #{pipeline_value2}") + assert(transaction_value1 < transaction_value2, "Transaction: #{transaction_value1} < #{transaction_value2}") + assert(pubsub_message1 < pubsub_message2, "PubSub: #{pubsub_message1} < #{pubsub_message2}") + end + + private + + def build_client + ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + connect_with_original_config: true, + fixed_hostname: TEST_FIXED_HOSTNAME, + custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, + middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], + **TEST_GENERIC_OPTIONS + ).new_client + end + + def build_controller + ClusterController.new( + TEST_NODE_URIS, + replica_size: TEST_REPLICA_SIZE, + **TEST_GENERIC_OPTIONS.merge(timeout: 30.0) + ) + end + + def spawn_single(cli) + Thread.new(cli) do |r| + loop do + handle_errors do + r.call('incr', 'single') + r.call('incr', 'single') + end + ensure + sleep WAIT_SEC + end + end + end + + def spawn_pipeline(cli) + Thread.new(cli) do |r| + loop do + handle_errors do + r.pipelined do |pi| + pi.call('incr', 'pipeline') + pi.call('incr', 'pipeline') + end + end + ensure + sleep WAIT_SEC + end + end + end + + def spawn_transaction(cli) + Thread.new(cli) do |r| + i = 0 + loop do + handle_errors do + r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx| + i += 1 + tx.call('incr', 'transaction') + tx.call('incr', 'transaction') + end + end + ensure + sleep WAIT_SEC + end + end + end + + def spawn_publisher(cli) + Thread.new(cli) do |r| + i = 0 + loop do + handle_errors do + r.call('spublish', 'chan', i) + i += 1 + end + ensure + sleep WAIT_SEC + end + end + end + + def spawn_subscriber(cli) + Thread.new(cli) do |r| + ps = nil + + loop do + ps = r.pubsub + ps.call('ssubscribe', 'chan') + break + rescue StandardError + ps&.close + ensure + sleep WAIT_SEC + end + + loop do + handle_errors do + event = ps.next_event(0.01) + case event&.first + when 'smessage' then update_pubsub_message(event[2]) + end + end + ensure + sleep WAIT_SEC + end + rescue StandardError, SignalException + ps&.close + raise + end + end + + def handle_errors + yield + rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::InitialSetupError, ::RedisClient::Cluster::NodeMightBeDown + increment_down_count + rescue ::RedisClient::CommandError => e + raise unless e.message.start_with?('CLUSTERDOWN') + + increment_down_count + rescue ::RedisClient::Cluster::ErrorCollection => e + raise unless e.errors.values.all? do |err| + err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError) + end + + increment_down_count + end + + def increment_down_count + @down_counter_lock.synchronize { @cluster_down_error_count += 1 } + end + + def update_pubsub_message(message) + @pubsub_message_lock.synchronize { @last_pubsub_message = message } + end + + def refer_pubsub_message + @pubsub_message_lock.synchronize { @last_pubsub_message } + end +end