diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 11bac8a..42f55c5 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -46,21 +46,28 @@ def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize private - def parse_command_reply(rows) # rubocop:disable Metrics/CyclomaticComplexity + def parse_command_reply(rows) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity rows&.each_with_object({}) do |row, acc| next if row.first.nil? + # TODO: in redis 7.0 or later, subcommand information included in the command reply + pos = case row.first when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3 - when 'object' then 2 + when 'object', 'xgroup' then 2 when 'migrate', 'xread', 'xreadgroup' then 0 else row[3] end + writable = case row.first + when 'xgroup' then true + else row[2].include?('write') + end + acc[row.first] = ::RedisClient::Cluster::Command::Detail.new( first_key_position: pos, key_step: row[5], - write?: row[2].include?('write'), + write?: writable, readonly?: row[2].include?('readonly') ) end.freeze || EMPTY_HASH @@ -115,8 +122,11 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo end def determine_optional_key_position(command, option_name) - idx = command.map { |e| e.to_s.downcase(:ascii) }.index(option_name) - idx.nil? ? 0 : idx + 1 + command.each_with_index do |e, i| + return i + 1 if e.to_s.downcase(:ascii) == option_name + end + + 0 end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index a48897a..2b0b6e6 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -711,6 +711,47 @@ def test_other_pubsub_commands ps.close end + def test_stream_commands + @client.call('xadd', '{stream}1', '*', 'mesage', 'foo') + @client.call('xadd', '{stream}1', '*', 'mesage', 'bar') + @client.call('xadd', '{stream}2', '*', 'mesage', 'baz') + @client.call('xadd', '{stream}2', '*', 'mesage', 'zap') + wait_for_replication + + consumer = new_test_client + got = consumer.call('xread', 'streams', '{stream}1', '{stream}2', '0', '0') + consumer.close + + got = got.to_h if TEST_REDIS_MAJOR_VERSION < 6 + + assert_equal('foo', got.fetch('{stream}1')[0][1][1]) + assert_equal('bar', got.fetch('{stream}1')[1][1][1]) + assert_equal('baz', got.fetch('{stream}2')[0][1][1]) + assert_equal('zap', got.fetch('{stream}2')[1][1][1]) + end + + def test_stream_group_commands + @client.call('xadd', '{stream}1', '*', 'task', 'data1') + @client.call('xadd', '{stream}1', '*', 'task', 'data2') + @client.call('xgroup', 'create', '{stream}1', 'worker', '0') + wait_for_replication + + consumer1 = new_test_client + consumer2 = new_test_client + got1 = consumer1.call('xreadgroup', 'group', 'worker', 'consumer1', 'count', '1', 'streams', '{stream}1', '>') + got2 = consumer2.call('xreadgroup', 'group', 'worker', 'consumer2', 'count', '1', 'streams', '{stream}1', '>') + consumer1.close + consumer2.close + + if TEST_REDIS_MAJOR_VERSION < 6 + got1 = got1.to_h + got2 = got2.to_h + end + + assert_equal('data1', got1.fetch('{stream}1')[0][1][1]) + assert_equal('data2', got2.fetch('{stream}1')[0][1][1]) + end + def test_with_method assert_raises(NotImplementedError) { @client.with } end