Skip to content

Commit

Permalink
fix: ensure recoverability from cluster state changes (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 26, 2024
1 parent 1967399 commit 8543134
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 174 deletions.
19 changes: 10 additions & 9 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@ jobs:
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {task: test_cluster_broken, redis: '7.2', restart: 'no', startup: '6'}
- {task: test_cluster_broken, redis: '6.2', restart: 'no', startup: '6'}
- {task: test_cluster_scale, redis: '7.2', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, redis: '6.2', compose: compose.scale.yaml, startup: '8'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '7.0', ruby: '3.1'}
- {redis: '6.2', ruby: '3.0'}
- {redis: '5.0', ruby: '2.7'}
- {task: test_cluster_state, redis: '8', pattern: 'PrimaryOnly', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'Pooled', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_state, redis: '8', pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, replica: '2', startup: '9'}
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Pipeline', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'Transaction', compose: compose.scale.yaml, startup: '8'}
- {task: test_cluster_scale, pattern: 'PubSub', compose: compose.scale.yaml, startup: '8'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
env:
REDIS_VERSION: ${{ matrix.redis || '7.2' }}
DOCKER_COMPOSE_FILE: ${{ matrix.compose || 'compose.yaml' }}
Expand Down
8 changes: 8 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ Metrics/AbcSize:
Exclude:
- 'test/**/*'

Metrics/CyclomaticComplexity:
Exclude:
- 'test/**/*'

Metrics/PerceivedComplexity:
Exclude:
- 'test/**/*'

Metrics/ClassLength:
Max: 500

Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ErrorCollection < ::RedisClient::Error
def initialize(errors)
@errors = {}
if !errors.is_a?(Hash) || errors.empty?
super('')
super(errors.to_s)
return
end

Expand Down
8 changes: 7 additions & 1 deletion lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def initialize(router)
@asking = false
end

def watch(keys)
def watch(keys) # rubocop:disable Metrics/AbcSize
slot = find_slot(keys)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" if slot.nil?

Expand All @@ -32,7 +32,13 @@ def watch(keys)
c.call('UNWATCH')
raise
end
rescue ::RedisClient::CommandError => e
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise
end
rescue ::RedisClient::ConnectionError
@router.renew_cluster_state
raise
end
end
end
Expand Down
50 changes: 40 additions & 10 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
results = Array.new(commands.size)
@pending_reads += size
write_multi(commands)
redirection_indices = nil
first_exception = nil
redirection_indices = stale_cluster_state = first_exception = nil

size.times do |index|
timeout = timeouts && timeouts[index]
Expand All @@ -73,18 +72,31 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
elsif exception
first_exception ||= result
end

stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served')
end

results[index] = result
end

raise first_exception if exception && first_exception
return results if redirection_indices.nil?
if redirection_indices
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
err.replies = results
err.indices = redirection_indices
err.first_exception = first_exception
raise err
end

if stale_cluster_state
err = ::RedisClient::Cluster::Pipeline::StaleClusterState.new
err.replies = results
err.first_exception = first_exception
raise err
end

raise first_exception if first_exception

err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
err.replies = results
err.indices = redirection_indices
raise err
results
end
end

Expand All @@ -98,8 +110,12 @@ def ensure_connected_cluster_scoped(retryable: true, &block)

ReplySizeError = Class.new(::RedisClient::Error)

class StaleClusterState < ::RedisClient::Error
attr_accessor :replies, :first_exception
end

class RedirectionNeeded < ::RedisClient::Error
attr_accessor :replies, :indices
attr_accessor :replies, :indices, :first_exception
end

def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed)
Expand Down Expand Up @@ -166,14 +182,18 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end
end

all_replies = errors = required_redirections = nil
all_replies = errors = required_redirections = cluster_state_errors = nil

work_group.each do |node_key, v|
case v
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
required_redirections ||= {}
required_redirections[node_key] = v
when ::RedisClient::Cluster::Pipeline::StaleClusterState
cluster_state_errors ||= {}
cluster_state_errors[node_key] = v
when StandardError
cluster_state_errors ||= {} if v.is_a?(::RedisClient::ConnectionError)
errors ||= {}
errors[node_key] = v
else
Expand All @@ -183,15 +203,25 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
end

work_group.close
@router.renew_cluster_state if cluster_state_errors
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?

required_redirections&.each do |node_key, v|
raise v.first_exception if v.first_exception

all_replies ||= Array.new(@size)
pipeline = @pipelines[node_key]
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
end

cluster_state_errors&.each do |node_key, v|
raise v.first_exception if v.first_exception

all_replies ||= Array.new(@size)
@pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
end

all_replies
end

Expand Down
65 changes: 41 additions & 24 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def ensure_worker
def close
@worker.exit if @worker&.alive?
@client.close
rescue ::RedisClient::ConnectionError
# ignore
end

private
Expand Down Expand Up @@ -51,27 +53,33 @@ def initialize(router, command_builder)
@command_builder = command_builder
@queue = SizedQueue.new(BUF_SIZE)
@state_dict = {}
@commands = []
end

def call(*args, **kwargs)
_call(@command_builder.generate(args, kwargs))
command = @command_builder.generate(args, kwargs)
_call(command)
@commands << command
nil
end

def call_v(command)
_call(@command_builder.generate(command))
command = @command_builder.generate(command)
_call(command)
@commands << command
nil
end

def close
@state_dict.each_value(&:close)
@state_dict.clear
@commands.clear
@queue.clear
@queue.close
nil
end

def next_event(timeout = nil)
def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
@state_dict.each_value(&:ensure_worker)
max_duration = calc_max_duration(timeout)
starting = obtain_current_time
Expand All @@ -80,6 +88,13 @@ def next_event(timeout = nil)
break if max_duration > 0 && obtain_current_time - starting > max_duration

case event = @queue.pop(true)
when ::RedisClient::CommandError
if event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
@router.renew_cluster_state
break start_over
end

raise event
when StandardError then raise event
when Array then break event
end
Expand All @@ -99,13 +114,26 @@ def _call(command)
end
end

def call_to_single_state(command)
def call_to_single_state(command, retry_count: 1)
node_key = @router.find_node_key(command)
try_call(node_key, command)
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key].call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
retry_count -= 1
retry_count >= 0 ? retry : raise
end

def call_to_all_states(command)
@state_dict.each_value { |s| s.call(command) }
@state_dict.each do |node_key, state|
state.call(command)
rescue ::RedisClient::ConnectionError
@state_dict[node_key].close
@state_dict.delete(node_key)
@router.renew_cluster_state
end
end

def call_for_sharded_states(command)
Expand All @@ -116,31 +144,20 @@ def call_for_sharded_states(command)
end
end

def try_call(node_key, command, retry_count: 1)
add_state(node_key).call(command)
rescue ::RedisClient::CommandError => e
raise if !e.message.start_with?('MOVED') || retry_count <= 0

# for sharded pub/sub
node_key = e.message.split[2]
retry_count -= 1
retry
end

def add_state(node_key)
return @state_dict[node_key] if @state_dict.key?(node_key)

state = State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key] = state
end

def obtain_current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end

def calc_max_duration(timeout)
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
end

def start_over
@state_dict.each_value(&:close)
@state_dict.clear
@commands.each { |command| _call(command) }
nil
end
end
end
end
Loading

0 comments on commit 8543134

Please sign in to comment.