Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: shorten blocking timeout seconds, add a test case for Valkey #366

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
matrix:
include:
- {redis: '7.2', ruby: '3.3'}
- {redis: '7.2', ruby: '3.3', compose: compose.valkey.yaml}
- {redis: '7.2', ruby: '3.3', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
Expand Down Expand Up @@ -60,7 +61,7 @@ jobs:
ruby-version: ${{ matrix.ruby || '3.3' }}
bundler-cache: true
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
- name: Wait for Redis cluster to be ready
Expand Down Expand Up @@ -91,7 +92,7 @@ jobs:
host_ip_addr=$(ip a | grep eth0 | grep inet | awk '{print $2}' | cut -d'/' -f1)
echo "HOST_IP_ADDR=$host_ip_addr" >> $GITHUB_ENV
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
env:
Expand Down Expand Up @@ -162,7 +163,7 @@ jobs:
ruby-version: '3.3'
bundler-cache: true
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
- name: Wait for Redis cluster to be ready
Expand Down Expand Up @@ -222,7 +223,7 @@ jobs:
ruby-version: '3.3'
bundler-cache: true
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
- name: Wait for Redis cluster to be ready
Expand Down Expand Up @@ -260,7 +261,7 @@ jobs:
ruby-version: '3.3'
bundler-cache: true
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
- name: Wait for Redis cluster to be ready
Expand Down Expand Up @@ -316,7 +317,7 @@ jobs:
sudo sysctl -w net.ipv4.tcp_max_syn_backlog=1024 # backlog setting
sudo sysctl -w net.core.somaxconn=1024 # up the number of connections per port
- name: Pull Docker images
run: docker pull redis:$REDIS_VERSION
run: docker compose -f $DOCKER_COMPOSE_FILE pull
- name: Run containers
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
- name: Print memory info
Expand Down
86 changes: 86 additions & 0 deletions compose.valkey.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
---
services:
node1: &node
image: "valkey/valkey:${REDIS_VERSION:-7}"
command: >
valkey-server
--maxmemory 64mb
--maxmemory-policy allkeys-lru
--appendonly yes
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
restart: "${RESTART_POLICY:-always}"
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: "7s"
timeout: "5s"
retries: 10
ports:
- "6379:6379"
node2:
<<: *node
ports:
- "6380:6379"
node3:
<<: *node
ports:
- "6381:6379"
node4:
<<: *node
ports:
- "6382:6379"
node5:
<<: *node
ports:
- "6383:6379"
node6:
<<: *node
ports:
- "6384:6379"
clustering:
image: "valkey/valkey:${REDIS_VERSION:-7}"
command: >
bash -c "apt-get update > /dev/null
&& apt-get install --no-install-recommends --no-install-suggests -y dnsutils > /dev/null
&& rm -rf /var/lib/apt/lists/*
&& yes yes | valkey-cli --cluster create
$$(dig node1 +short):6379
$$(dig node2 +short):6379
$$(dig node3 +short):6379
$$(dig node4 +short):6379
$$(dig node5 +short):6379
$$(dig node6 +short):6379
--cluster-replicas 1"
depends_on:
node1:
condition: service_healthy
node2:
condition: service_healthy
node3:
condition: service_healthy
node4:
condition: service_healthy
node5:
condition: service_healthy
node6:
condition: service_healthy
ruby:
image: "ruby:${RUBY_VERSION:-3}"
restart: always
working_dir: /client
volumes:
- .:/client
command:
- ruby
- "-e"
- 'Signal.trap(:INT, "EXIT"); Signal.trap(:TERM, "EXIT"); loop { sleep 1 }'
environment:
REDIS_HOST: node1
cap_drop:
- ALL
healthcheck:
test: ["CMD", "ruby", "-e", "'puts 1'"]
interval: "5s"
timeout: "3s"
retries: 3
22 changes: 21 additions & 1 deletion test/benchmark_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,24 @@
when 'hiredis' then require 'hiredis-client'
end

class BenchmarkWrapper < Minitest::Benchmark; end
class BenchmarkWrapper < Minitest::Benchmark
private

def swap_timeout(client, timeout:)
return if client.nil?

node = client.instance_variable_get(:@router)&.instance_variable_get(:@node)
raise 'The client must be initialized.' if node.nil?

updater = lambda do |c, t|
c.read_timeout = t
c.config.instance_variable_set(:@read_timeout, t)
end

regular_timeout = node.first.read_timeout
node.each { |cli| updater.call(cli, timeout) }
result = yield client
node.each { |cli| updater.call(cli, regular_timeout) }
result
end
end
8 changes: 6 additions & 2 deletions test/benchmark_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def bench_pipeline_get
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@client, timeout: 0.1) do |client|
client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
end
end
end

Expand Down Expand Up @@ -109,6 +111,8 @@ def new_cluster_client
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@cluster_client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@cluster_client, timeout: 0.1) do |cluster_client|
cluster_client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
end
end
end
17 changes: 16 additions & 1 deletion test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ def wait_failover(clients, primary_node_key:, replica_node_key:, max_attempts:)
def wait_replication_delay(clients, replica_size:, timeout:)
timeout_msec = timeout.to_i * 1000
wait_for_state(clients, max_attempts: clients.size + 1) do |client|
client.blocking_call(timeout, 'WAIT', replica_size, timeout_msec - 100) if primary_client?(client)
swap_timeout(client, timeout: 0.1) do |cli|
cli.blocking_call(timeout, 'WAIT', replica_size, timeout_msec - 100) if primary_client?(cli)
end
true
rescue ::RedisClient::ConnectionError
true
Expand Down Expand Up @@ -503,4 +505,17 @@ def print_debug(msg)

p msg
end

def swap_timeout(client, timeout:)
updater = lambda do |c, t|
c.read_timeout = t
c.config.instance_variable_set(:@read_timeout, t)
end

regular_timeout = client.read_timeout
updater.call(client, timeout)
result = yield client
updater.call(client, regular_timeout)
result
end
end
53 changes: 35 additions & 18 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ def test_blocking_call
client_side_timeout = TEST_REDIS_MAJOR_VERSION < 6 ? 2.0 : 1.5
server_side_timeout = TEST_REDIS_MAJOR_VERSION < 6 ? '1' : '0.5'

assert_equal(%w[foo world], @client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout), 'Case: 1st')
swap_timeout(@client, timeout: 0.1) do |client|
assert_equal(%w[foo world], client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout), 'Case: 1st')

# FIXME: too flaky, just a workaround
got = @client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout)
if got.nil?
assert_nil(got, 'Case: 2nd')
else
assert_equal(%w[foo hello], got, 'Case: 2nd')
end
# FIXME: too flaky, just a workaround
got = client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout)
if got.nil?
assert_nil(got, 'Case: 2nd')
else
assert_equal(%w[foo hello], got, 'Case: 2nd')
end

assert_nil(@client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout), 'Case: 3rd')
assert_raises(::RedisClient::ReadTimeoutError, 'Case: 4th') { @client.blocking_call(0.1, 'BRPOP', 'foo', 0) }
assert_nil(client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout), 'Case: 3rd')
assert_raises(::RedisClient::ReadTimeoutError, 'Case: 4th') { client.blocking_call(0.1, 'BRPOP', 'foo', 0) }
end
end

def test_scan
Expand Down Expand Up @@ -146,12 +148,16 @@ def test_pipelined
want = %w[PONG] + (0..9).map(&:to_s) + [%w[list 2]]
client_side_timeout = TEST_REDIS_MAJOR_VERSION < 6 ? 1.5 : 1.0
server_side_timeout = TEST_REDIS_MAJOR_VERSION < 6 ? '1' : '0.5'
got = @client.pipelined do |pipeline|
pipeline.call_once('PING')
10.times { |i| pipeline.call('GET', "string#{i}") }
pipeline.blocking_call(client_side_timeout, 'BRPOP', 'list', server_side_timeout)

swap_timeout(@client, timeout: 0.1) do |client|
got = client.pipelined do |pipeline|
pipeline.call_once('PING')
10.times { |i| pipeline.call('GET', "string#{i}") }
pipeline.blocking_call(client_side_timeout, 'BRPOP', 'list', server_side_timeout)
end

assert_equal(want, got)
end
assert_equal(want, got)
end

def test_pipelined_with_errors
Expand Down Expand Up @@ -815,8 +821,12 @@ def test_circuit_breakers
)
).new_client

assert_raises(::RedisClient::ReadTimeoutError) { cli.blocking_call(0.1, 'BRPOP', 'foo', 0) }
assert_raises(::RedisClient::CircuitBreaker::OpenCircuitError) { cli.blocking_call(0.1, 'BRPOP', 'foo', 0) }
cli.call('echo', 'init')

swap_timeout(cli, timeout: 0.1) do |c|
assert_raises(::RedisClient::ReadTimeoutError) { c.blocking_call(0.1, 'BRPOP', 'foo', 0) }
assert_raises(::RedisClient::CircuitBreaker::OpenCircuitError) { c.blocking_call(0.1, 'BRPOP', 'foo', 0) }
end

cli&.close
end
Expand Down Expand Up @@ -875,7 +885,9 @@ def test_initialization_delayed
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@client, timeout: 0.1) do |client|
client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
end
end

def collect_messages(pubsub, size:, max_attempts: 30, timeout: 1.0)
Expand Down Expand Up @@ -912,6 +924,7 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
slow_command_timeout: TEST_TIMEOUT_SEC,
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
Expand All @@ -930,6 +943,7 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
slow_command_timeout: TEST_TIMEOUT_SEC,
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
Expand All @@ -948,6 +962,7 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar
replica: true,
replica_affinity: :random_with_primary,
fixed_hostname: TEST_FIXED_HOSTNAME,
slow_command_timeout: TEST_TIMEOUT_SEC,
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
Expand All @@ -966,6 +981,7 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar
replica: true,
replica_affinity: :latency,
fixed_hostname: TEST_FIXED_HOSTNAME,
slow_command_timeout: TEST_TIMEOUT_SEC,
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
Expand All @@ -982,6 +998,7 @@ def new_test_client(custom: { captured_commands: @captured_commands }, middlewar
config = ::RedisClient::ClusterConfig.new(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
slow_command_timeout: TEST_TIMEOUT_SEC,
middlewares: middlewares,
custom: custom,
**TEST_GENERIC_OPTIONS,
Expand Down
4 changes: 3 additions & 1 deletion test/test_against_cluster_broken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def test_a_primary_is_down
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@client, timeout: 0.1) do |client|
client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
end
end

def do_test_a_node_is_down(sacrifice, number_of_keys:)
Expand Down
4 changes: 3 additions & 1 deletion test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def test_02_scale_in
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@client, timeout: 0.1) do |client|
client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
end
end

def build_cluster_controller(nodes, shard_size:)
Expand Down
6 changes: 5 additions & 1 deletion test/test_against_cluster_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
def wait_for_replication
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
swap_timeout(@client, timeout: 0.1) do |client|
client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
rescue RedisClient::Cluster::ErrorCollection => e
raise unless e.errors.values.all? { |err| err.message.start_with?('ERR WAIT cannot be used with replica instances') }
end
end

def fetch_cluster_info(key)
Expand Down
Loading