Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't call the block twice during the transaction #318

Merged
merged 6 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ def pipelined
pipeline.execute
end

def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
def multi(watch: nil)
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch)
yield transaction
transaction.execute
end

def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
Expand Down
5 changes: 5 additions & 0 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ def find_node_key_by_key(key, seed: nil, primary: false)
end
end

def find_primary_node_by_slot(slot)
node_key = @node.find_node_key_of_primary(slot)
find_node(node_key)
end

def find_node_key(command, seed: nil)
key = @command.extract_first_key(command)
find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command))
Expand Down
160 changes: 140 additions & 20 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
@@ -1,56 +1,176 @@
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/key_slot_converter'

class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)

def initialize(router, command_builder)
def initialize(router, command_builder, watch)
@router = router
@command_builder = command_builder
@node_key = nil
@watch = watch
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
@buffer = []
@node = nil
end

def call(*command, **kwargs, &_)
def call(*command, **kwargs, &block)
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
if prepare(command)
@pipeline.call_v(command, &block)
else
@buffer << -> { @pipeline.call_v(command, &block) }
end
end

def call_v(command, &_)
def call_v(command, &block)
command = @command_builder.generate(command)
ensure_node_key(command)
if prepare(command)
@pipeline.call_v(command, &block)
else
@buffer << -> { @pipeline.call_v(command, &block) }
end
end

def call_once(*command, **kwargs, &_)
def call_once(*command, **kwargs, &block)
@retryable = false
command = @command_builder.generate(command, kwargs)
ensure_node_key(command)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
@buffer << -> { @pipeline.call_once_v(command, &block) }
end
end

def call_once_v(command, &_)
def call_once_v(command, &block)
@retryable = false
command = @command_builder.generate(command)
ensure_node_key(command)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
@buffer << -> { @pipeline.call_once_v(command, &block) }
end
end

def execute(watch: nil, &block)
yield self
raise ArgumentError, 'empty transaction' if @node_key.nil?
def execute
@buffer.each(&:call)

node = @router.find_node(@node_key)
@router.try_delegate(node, :multi, watch: watch, &block)
raise ArgumentError, 'empty transaction' if @pipeline._empty?
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch?

settle
end

private

def ensure_node_key(command)
def watch?
!@watch.nil? && !@watch.empty?
end

def safe_watch?
return true unless watch?
return false if @node.nil?

slots = @watch.map do |k|
return false if k.nil? || k.empty?

::RedisClient::Cluster::KeySlotConverter.convert(k)
end

return false if slots.uniq.size != 1

@router.find_primary_node_by_slot(slots.first) == @node
end

def prepare(command)
return true unless @node.nil?

node_key = @router.find_primary_node_key(command)
raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil?
return false if node_key.nil?

@node = @router.find_node(node_key)
@pipeline.call('WATCH', *@watch) if watch?
@pipeline.call('MULTI')
@buffer.each(&:call)
@buffer.clear
true
end

def settle
@pipeline.call('EXEC')
@pipeline.call('UNWATCH') if watch?
send_transaction(@node, redirect: true)
end

@node_key ||= node_key
raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key
def send_transaction(client, redirect:)
case client
when ::RedisClient then send_pipeline(client, redirect: redirect)
when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) }
else raise NotImplementedError, "#{client.class.name}#multi for cluster client"
end
end

def send_pipeline(client, redirect:)
results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection|
commands = @pipeline._commands
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
return handle_command_error!(commands, e) if redirect

raise
end
end

@pipeline._coerce!(results)
results[watch? ? -2 : -1]
end

def handle_command_error!(commands, err)
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
elsif err.message.start_with?('MOVED', 'ASK')
ensure_the_same_node!(commands)
handle_redirection(err)
else
raise err
end
end

def ensure_the_same_node!(commands)
commands.each do |command|
node_key = @router.find_primary_node_key(command)
next if node_key.nil?

node = @router.find_node(node_key)
next if @node == node

raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
end
end

def handle_redirection(err)
if err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: false)
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: false) : err
else
raise err
end
end

nil
def try_asking(node)
node.call('ASKING') == 'OK'
rescue StandardError
false
end
end
end
Expand Down
52 changes: 50 additions & 2 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_transaction_with_empty_block
assert_raises(LocalJumpError) { @client.multi }
end

def test_transaction_with_keyless_commands
def test_transaction_with_only_keyless_commands
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi do |t|
t.call('ECHO', 'foo')
Expand All @@ -234,7 +234,7 @@ def test_transaction_without_hashtag
end
end

assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi do |t|
t.call('MSET', 'key1', '1', 'key2', '2')
t.call('MSET', 'key1', '1', 'key3', '3')
Expand All @@ -247,6 +247,54 @@ def test_transaction_without_hashtag
end
end

def test_transaction_with_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.multi(watch: %w[{key}1 {key}2]) do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end

assert_equal(%w[START OK OK FINISH], got)
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_with_unsafe_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi(watch: %w[key1 key2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
end

assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
@client.multi(watch: %w[{hey}1 {hey}2]) do |tx|
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
end
end

assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2'))
end

def test_transaction_with_meaningless_watch
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.multi(watch: %w[{key}3 {key}4]) do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end

assert_equal(%w[START OK OK FINISH], got)
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
end

def test_pubsub_without_subscription
pubsub = @client.pubsub
assert_nil(pubsub.next_event(0.01))
Expand Down
Loading