Skip to content

Commit

Permalink
Fix #pipelined concurrently using the same Redis connection (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
KJTsanaktsidis authored Nov 17, 2023
1 parent ed2bb7a commit 658103d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
16 changes: 10 additions & 6 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,13 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end
end

all_replies = errors = nil
all_replies = errors = required_redirections = nil

work_group.each do |node_key, v|
case v
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
all_replies ||= Array.new(@size)
pipeline = @pipelines[node_key]
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
required_redirections ||= {}
required_redirections[node_key] = v
when StandardError
errors ||= {}
errors[node_key] = v
Expand All @@ -180,9 +178,15 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end

work_group.close

raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?

required_redirections&.each do |node_key, v|
all_replies ||= Array.new(@size)
pipeline = @pipelines[node_key]
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
end

all_replies
end

Expand Down
25 changes: 25 additions & 0 deletions test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ def test_the_state_of_cluster_resharding_with_pipelining
end
end

def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
# This test is excercising a very delicate race condition; i think the use of @client to set
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
# test is actually performing the resharding on its own.
key_count = 10
key_count.times do |i|
key = "key#{i}"
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
src, dest = @controller.select_resharding_target(slot)
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
end

res = @client.pipelined do |p|
key_count.times do |i|
p.call_v(['SET', "key#{i}", "value#{i}"])
end
end

key_count.times do |i|
assert_equal('OK', res[i])
assert_equal("value#{i}", @client.call_v(['GET', "key#{i}"]))
end
end

private

def wait_for_replication
Expand Down

0 comments on commit 658103d

Please sign in to comment.