diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 258efd49..90e00e4d 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -161,15 +161,13 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met end end - all_replies = errors = nil + all_replies = errors = required_redirections = nil work_group.each do |node_key, v| case v when ::RedisClient::Cluster::Pipeline::RedirectionNeeded - all_replies ||= Array.new(@size) - pipeline = @pipelines[node_key] - v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) } - pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] } + required_redirections ||= {} + required_redirections[node_key] = v when StandardError errors ||= {} errors[node_key] = v @@ -180,9 +178,15 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met end work_group.close - raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil? + required_redirections&.each do |node_key, v| + all_replies ||= Array.new(@size) + pipeline = @pipelines[node_key] + v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) } + pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] } + end + all_replies end diff --git a/test/test_against_cluster_state.rb b/test/test_against_cluster_state.rb index 9e940438..813ba4df 100644 --- a/test/test_against_cluster_state.rb +++ b/test/test_against_cluster_state.rb @@ -59,6 +59,31 @@ def test_the_state_of_cluster_resharding_with_pipelining end end + def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection + # This test is excercising a very delicate race condition; i think the use of @client to set + # the keys in do_resharding_test is actually causing the race condition not to happen, so this + # test is actually performing the resharding on its own. + key_count = 10 + key_count.times do |i| + key = "key#{i}" + slot = ::RedisClient::Cluster::KeySlotConverter.convert(key) + src, dest = @controller.select_resharding_target(slot) + @controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + @controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest) + end + + res = @client.pipelined do |p| + key_count.times do |i| + p.call_v(['SET', "key#{i}", "value#{i}"]) + end + end + + key_count.times do |i| + assert_equal('OK', res[i]) + assert_equal("value#{i}", @client.call_v(['GET', "key#{i}"])) + end + end + private def wait_for_replication