Skip to content

Commit

Permalink
Add an explicit #watch method to RedisClient::Cluster
Browse files Browse the repository at this point in the history
This returns a "watcher" object, which can either be used for three
things:

* To add keys to be watched on the same connection (by calling #watch
* To begin a MULTI transaction on the connection (by calling #multi)
* To UNWATCH the connection and return it to its original state
  (by calling... #unwatch)

This means that the following pattern becomes possible in
redis-cluster-client:

```
client.watch(["{slot}k1", "{slot}k2"]) do |watcher|
  # Further reads can be performed with client directly; this is
  # perfectly safe and they will be individually redirected if required
  # as normal.
  # If a read on a slot being watched is redirected, that's also OK,
  # because it means the final EXEC will fail (since the watch got
  # modified).
  current_value = client.call('GET', '{slot}k1')
  some_other_thing = client.call('GET', '{slot}something_unwatched')

  # You can add more keys to the watch if required
  # This could raise a redireciton error, and cause the whole watch
  # block to be re-attempted
  watcher.watch('{slot}differet_key')
  different_value = client.call('GET', '{slot}different_key')

  if do_transaction?
    # Once you're ready to perform a transaction, you can use multi...
    watcher.multi do |tx|
      # tx is now a pipeliend RedisClient::Cluster::Transaction
      # instance, like normal multi
      tx.call('SET', '{slot}k1', 'new_value')
      tx.call('SET', '{slot}k2', 'new_value')
    end
    # At this point, the transaction is committed
  else
    # You can abort the transaction by calling unwatch
    # (this will also happen if an exception is thrown)
    watcher.unwatch
  end
end
```

This interface is what's required to make redis-clustering/redis-rb work
correctly, I think.
  • Loading branch information
KJ Tsanaktsidis committed Feb 21, 2024
1 parent 3230791 commit c7b69df
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
18 changes: 11 additions & 7 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,23 @@ def pipelined
pipeline.execute
end

def multi(watch: nil)
def multi(watch: nil, &block)
if watch.nil? || watch.empty?
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
yield transaction
return transaction.execute
end

::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: c, slot: slot
)
yield transaction
transaction.execute
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
watcher.watch(watch) do
watcher.multi(&block)
end
end

def watch(keys)
watcher = ::RedisClient::Cluster::OptimisticLocking.new(@router, @command_builder)
watcher.watch(keys) do
yield watcher
end
end

Expand Down
51 changes: 42 additions & 9 deletions lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,60 @@
class RedisClient
class Cluster
class OptimisticLocking
def initialize(router)
def initialize(router, command_builder)
@router = router
@command_builder = command_builder
@slot = nil
@conn = nil
end

def watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?
def watch(keys, &block)
if @conn
# We're already watching, and the caller wants to watch additional keys
add_to_watch(keys)
else
# First call to #watch
start_watch(keys, &block)
end
end

def unwatch
@conn.call('UNWATCH')
end

def multi
transaction = ::RedisClient::Cluster::Transaction.new(
@router, @command_builder, node: @conn, slot: @slot
)
yield transaction
transaction.execute
end

node = @router.find_primary_node_by_slot(slot)
private

def start_watch(keys)
@slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if @slot.nil?

node = @router.find_primary_node_by_slot(@slot)
@router.handle_redirection(node, retry_count: 1) do |nd|
nd.with do |c|
c.call('WATCH', *keys)
yield(c, slot)
@conn = c
@conn.call('WATCH', *keys)
yield
rescue StandardError
c.call('UNWATCH')
unwatch
raise
end
end
end

private
def add_to_watch(keys)
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "inconsistent watch: #{keys.join(' ')}" if slot != @slot

@conn.call('WATCH', *keys)
end

def find_slot(keys)
return if keys.empty?
Expand Down
14 changes: 0 additions & 14 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
when 'memory' then send_memory_command(method, command, args, &block)
when 'script' then send_script_command(method, command, args, &block)
when 'pubsub' then send_pubsub_command(method, command, args, &block)
when 'watch' then send_watch_command(command, &block)
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
@node.call_all(method, command, args).first.then(&TSF.call(block))
when 'flushall', 'flushdb'
Expand Down Expand Up @@ -311,19 +310,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
end
end

# for redis-rb
def send_watch_command(command)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?

::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot|
transaction = ::RedisClient::Cluster::Transaction.new(
self, @command_builder, node: c, slot: slot
)
yield transaction
transaction.execute
end
end

def update_cluster_info!
@node.reload!
end
Expand Down
12 changes: 7 additions & 5 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,13 @@ def test_transaction_in_race_condition
def test_transaction_with_dedicated_watch_command
@client.call('MSET', '{key}1', '0', '{key}2', '0')

got = @client.call('WATCH', '{key}1', '{key}2') do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
got = @client.watch(['{key}1', '{key}2']) do |watcher|
watcher.multi do |tx|
tx.call('ECHO', 'START')
tx.call('SET', '{key}1', '1')
tx.call('SET', '{key}2', '2')
tx.call('ECHO', 'FINISH')
end
end

assert_equal(%w[START OK OK FINISH], got)
Expand Down

0 comments on commit c7b69df

Please sign in to comment.