Skip to content

Commit

Permalink
Raise errors from #pipelined instead of returning them
Browse files Browse the repository at this point in the history
It seems redis-cluster-client intentionally decides to return errors
from #pipelined, rather than raising them. I note that in
#37
it was written:

>> In Grab’s Redis Cluster library, the function
>> Pipeline(PipelineReadOnly) returns a response with an error for
>> individual reply.
>> Instead of returning nil or an error message when err != nil, we could
>> check for errors for each result so successful queries are not affected.
>> This might have minimised the outage’s business impact.
>
> I prefer the above behavior.

It's certainly true that returning the individual results from
pipelined gives strictly more information than raising an exception.
UNFORTUNATELY, redis-client chooses the _opposite_ option; if any query
caused an error, after processing the entire pipeline it raises rather
than returns; see:
https://github.com/redis-rb/redis-client/blob/932c5e8909ede7575d56a117773be39f40e2da88/lib/redis_client/connection_mixin.rb#L60

I think consistency with redis-client is very valuable to have for
redis-cluster-client, so I would suggest that changing the behaviour
here (perhaps with a major version bump?) might be a good idea?

Alternatively, althought I think this would be even _more_ confusing
IMO, we could keep the error handling as-is here but patch it in
redis-rb to raise instead of return.
  • Loading branch information
KJ Tsanaktsidis committed Nov 17, 2023
1 parent c00ee01 commit 6c0ca9a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 25 deletions.
21 changes: 9 additions & 12 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,31 @@ def get_block(inner_index)
end

::RedisClient::ConnectionMixin.module_eval do
def call_pipelined_aware_of_redirection(commands, timeouts) # rubocop:disable Metrics/AbcSize
def call_pipelined_aware_of_redirection(commands, timeouts) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
size = commands.size
results = Array.new(commands.size)
@pending_reads += size
write_multi(commands)

redirection_indices = nil
exception = nil
size.times do |index|
timeout = timeouts && timeouts[index]
result = read(timeout)
@pending_reads -= 1
if result.is_a?(CommandError)
if result.is_a?(::RedisClient::Error)
result._set_command(commands[index])
if result.message.start_with?('MOVED', 'ASK')
if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK')
redirection_indices ||= []
redirection_indices << index
else
exception ||= result
end
end

results[index] = result
end

raise exception if exception
return results if redirection_indices.nil?

err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
Expand Down Expand Up @@ -217,21 +220,15 @@ def handle_redirection(err, pipeline, inner_index)

if err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
try_redirection(node, pipeline, inner_index)
redirect_command(node, pipeline, inner_index)
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? try_redirection(node, pipeline, inner_index) : err
try_asking(node) ? redirect_command(node, pipeline, inner_index) : err
else
err
end
end

def try_redirection(node, pipeline, inner_index)
redirect_command(node, pipeline, inner_index)
rescue StandardError => e
e
end

def redirect_command(node, pipeline, inner_index)
method = pipeline.get_callee_method(inner_index)
command = pipeline.get_command(inner_index)
Expand Down
20 changes: 7 additions & 13 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,16 @@ def test_pipelined
end

def test_pipelined_with_errors
got = @client.pipelined do |pipeline|
10.times do |i|
pipeline.call('SET', "string#{i}", i)
pipeline.call('SET', "string#{i}", i, 'too many args')
pipeline.call('SET', "string#{i}", i + 10)
assert_raises(RedisClient::Cluster::ErrorCollection) do
@client.pipelined do |pipeline|
10.times do |i|
pipeline.call('SET', "string#{i}", i)
pipeline.call('SET', "string#{i}", i, 'too many args')
pipeline.call('SET', "string#{i}", i + 10)
end
end
end

assert_equal(30, got.size)

10.times do |i|
assert_equal('OK', got[(3 * i) + 0])
assert_instance_of(::RedisClient::CommandError, got[(3 * i) + 1])
assert_equal('OK', got[(3 * i) + 2])
end

wait_for_replication

10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) }
Expand Down

0 comments on commit 6c0ca9a

Please sign in to comment.