Skip to content

Commit

Permalink
fix: don't call the block twice during the transaction (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Feb 17, 2024
1 parent 86c0e01 commit d1d5483
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 24 deletions.
6 changes: 4 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ def pipelined
pipeline.execute
end

def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
def multi(watch: nil)
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch)
yield transaction
transaction.execute
end

def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
Expand Down
5 changes: 5 additions & 0 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ def find_node_key_by_key(key, seed: nil, primary: false)
end
end

def find_primary_node_by_slot(slot)
node_key = @node.find_node_key_of_primary(slot)
find_node(node_key)
end

def find_node_key(command, seed: nil)
key = @command.extract_first_key(command)
find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command))
Expand Down
160 changes: 140 additions & 20 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
@@ -1,56 +1,176 @@
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/key_slot_converter'

class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)

def initialize(router, command_builder)
def initialize(router, command_builder, watch)
@router = router
@command_builder = command_builder
@node_key = nil
@watch = watch
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@buffer = []
@node = nil
end

def call(*command, **kwargs, &_)
def call(*command, **kwargs, &block)
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
if prepare(command)
@pipeline.call_v(command, &block)
else
@buffer << -> { @pipeline.call_v(command, &block) }
end
end

def call_v(command, &_)
def call_v(command, &block)
command = @command_builder.generate(command)
ensure_node_key(command)
if prepare(command)
@pipeline.call_v(command, &block)
else
@buffer << -> { @pipeline.call_v(command, &block) }
end
end

def call_once(*command, **kwargs, &_)
def call_once(*command, **kwargs, &block)
@retryable = false
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
@buffer << -> { @pipeline.call_once_v(command, &block) }
end
end

def call_once_v(command, &_)
def call_once_v(command, &block)
@retryable = false
command = @command_builder.generate(command)
ensure_node_key(command)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
@buffer << -> { @pipeline.call_once_v(command, &block) }
end
end

def execute(watch: nil, &block)
yield self
raise ArgumentError, 'empty transaction' if @node_key.nil?
def execute
@buffer.each(&:call)

node = @router.find_node(@node_key)
@router.try_delegate(node, :multi, watch: watch, &block)
raise ArgumentError, 'empty transaction' if @pipeline._empty?
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch?

settle
end

private

def ensure_node_key(command)
def watch?
!@watch.nil? && !@watch.empty?
end

def safe_watch?
return true unless watch?
return false if @node.nil?

slots = @watch.map do |k|
return false if k.nil? || k.empty?

::RedisClient::Cluster::KeySlotConverter.convert(k)
end

return false if slots.uniq.size != 1

@router.find_primary_node_by_slot(slots.first) == @node
end

def prepare(command)
return true unless @node.nil?

node_key = @router.find_primary_node_key(command)
raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil?
return false if node_key.nil?

@node = @router.find_node(node_key)
@pipeline.call('WATCH', *@watch) if watch?
@pipeline.call('MULTI')
@buffer.each(&:call)
@buffer.clear
true
end

def settle
@pipeline.call('EXEC')
@pipeline.call('UNWATCH') if watch?
send_transaction(@node, redirect: true)
end

@node_key ||= node_key
raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key
def send_transaction(client, redirect:)
case client
when ::RedisClient then send_pipeline(client, redirect: redirect)
when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) }
else raise NotImplementedError, "#{client.class.name}#multi for cluster client"
end
end

def send_pipeline(client, redirect:)
results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection|
commands = @pipeline._commands
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(commands, e) if redirect

raise
end
end

@pipeline._coerce!(results)
results[watch? ? -2 : -1]
end

def handle_command_error!(commands, err)
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED', 'ASK')
ensure_the_same_node!(commands)
handle_redirection(err)
else
raise err
end
end

def ensure_the_same_node!(commands)
commands.each do |command|
node_key = @router.find_primary_node_key(command)
next if node_key.nil?

node = @router.find_node(node_key)
next if @node == node

raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
end
end

def handle_redirection(err)
if err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: false)
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: false) : err
else
raise err
end
end

nil
def try_asking(node)
node.call('ASKING') == 'OK'
rescue StandardError
false
end
end
end
Expand Down
52 changes: 50 additions & 2 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_transaction_with_empty_block
assert_raises(LocalJumpError) { @client.multi }
end

def test_transaction_with_keyless_commands
def test_transaction_with_only_keyless_commands
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi do |t|
t.call('ECHO', 'foo')
Expand All @@ -234,7 +234,7 @@ def test_transaction_without_hashtag
end
end

assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi do |t|
t.call('MSET', 'key1', '1', 'key2', '2')
t.call('MSET', 'key1', '1', 'key3', '3')
Expand All @@ -247,6 +247,54 @@ def test_transaction_without_hashtag
end
end

def test_transaction_with_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end

assert_equal(%w[START OK OK FINISH], got)
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_with_unsafe_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi(watch: %w[key1 key2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
end

assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi(watch: %w[{hey}1 {hey}2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
end

assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_with_meaningless_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.multi(watch: %w[{key}3 {key}4]) do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end

assert_equal(%w[START OK OK FINISH], got)
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_pubsub_without_subscription
pubsub = @client.pubsub
assert_nil(pubsub.next_event(0.01))
Expand Down
Loading

0 comments on commit d1d5483

Please sign in to comment.