From cdc69608b1c1448593a7f1dff9344922670beee0 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 8 Oct 2024 15:06:55 +0900 Subject: [PATCH] wip --- .github/workflows/test.yaml | 14 ++--- bin/pubsub | 66 +++++++++++++----------- bin/singlepiptx | 86 ++++++++++++++++++------------- lib/redis_client/cluster.rb | 6 +-- lib/redis_client/cluster/node.rb | 2 +- test/redis_client/test_cluster.rb | 5 +- test/test_against_cluster_down.rb | 84 ++++++++++++++++-------------- 7 files changed, 144 insertions(+), 119 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 11abf69..4a3b2d6 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -32,18 +32,18 @@ jobs: - {redis: '7.2', ruby: '3.3', driver: 'hiredis'} - {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml} - {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'} - - {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - - {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - - {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - - {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - - {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_down} + - {task: test_cluster_broken, restart: 'no', startup: '6'} - {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'} - {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml} - {redis: '7.0', ruby: '3.1'} - {redis: '6.2', ruby: '3.0'} - {redis: '5.0', ruby: '2.7'} - - {task: test_cluster_down} - - {task: test_cluster_broken, restart: 'no', startup: '6'} + - {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} + - {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'} - {ruby: 'jruby'} - {ruby: 'truffleruby'} - {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'} diff --git a/bin/pubsub b/bin/pubsub index d043a71..54c33b8 100755 --- a/bin/pubsub +++ b/bin/pubsub @@ -5,57 +5,57 @@ require 'bundler/setup' require 'redis_cluster_client' module PubSubDebug + WAIT_SEC = 2.0 + module_function - def spawn_publisher(cli, chan) - Thread.new(cli, chan) do |r, c| - role = ' Publisher' + def spawn_publisher(client, channel) + Thread.new(client, channel) do |cli, chan| + role = 'Publisher' i = 0 loop do handle_errors(role) do - msg = format('%05d', i) - r.call('spublish', c, msg) - log "#{role}: sent: #{msg}" + cli.call('spublish', chan, i) + log(role, :spublish, chan, i) i += 1 end ensure - sleep 1.0 + sleep WAIT_SEC end rescue StandardError => e - log "#{role}: dead: #{e.class}: #{e.message}" + log(role, :dead, e.class, e.message) raise end end - def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize - Thread.new(cli, chan) do |r, c| + def spawn_subscriber(client, channel) # rubocop:disable Metrics/AbcSize + Thread.new(client, channel) do |cli, chan| role = 'Subscriber' ps = nil loop do - ps = r.pubsub - ps.call('ssubscribe', c) - log "#{role}: done: subscription started to #{c}" + ps = cli.pubsub + ps.call('ssubscribe', chan) break rescue StandardError => e - log "#{role}: init: #{e.class}: #{e.message}" + log(role, :init, e.class, e.message) ps&.close ensure - sleep 1.0 + sleep WAIT_SEC end loop do handle_errors('Subscriber') do - e = ps.next_event(0.01) - log "#{role}: recv: #{e.nil? ? 'nil' : e}" - ps.call('ssubscribe', c) if !e.nil? && e.first == 'sunsubscribe' + event = ps.next_event(WAIT_SEC) + log(role, *event) unless event.nil? + case event&.first + when 'sunsubscribe' then ps.call('ssubscribe', chan) + end end - ensure - sleep 1.0 end rescue StandardError, SignalException => e - log "#{role}: dead: #{e.class}: #{e.message}" + log(role, :dead, e.class, e.message) ps&.close raise end @@ -64,23 +64,26 @@ module PubSubDebug def handle_errors(role) yield rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e - log "#{role}: recv: #{e.class}" + log(role, e.class) rescue RedisClient::CommandError => e - log "#{role}: recv: #{e.class}: #{e.message}" + log(role, e.class, e.message) raise unless e.message.start_with?('CLUSTERDOWN') rescue StandardError => e - log "#{role}: recv: #{e.class}: #{e.message}" + log(role, e.class, e.message) raise end - def log(msg) - print "#{msg}\n" + def log(*texts) + return if texts.nil? || texts.empty? + + message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ') + print "#{message}\n" end end -clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client } +nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze +clients = Array.new(6) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze threads = [] -channel = 'chan1' Signal.trap(:INT) do threads.each(&:exit) @@ -89,6 +92,9 @@ Signal.trap(:INT) do exit 0 end -threads << PubSubDebug.spawn_subscriber(clients[0], channel) -threads << PubSubDebug.spawn_publisher(clients[1], channel) +%w[chan1 chan2 chan3].each_with_index do |channel, i| + threads << PubSubDebug.spawn_subscriber(clients[i], channel) + threads << PubSubDebug.spawn_publisher(clients[i + 3], channel) +end + threads.each(&:join) diff --git a/bin/singlepiptx b/bin/singlepiptx index 15a64a9..8054fc9 100755 --- a/bin/singlepiptx +++ b/bin/singlepiptx @@ -5,69 +5,70 @@ require 'bundler/setup' require 'redis_cluster_client' module SinglePipTxDebug + WAIT_SEC = 2.0 + module_function - def spawn_single(cli) - Thread.new(cli) do |r| - role = ' Single' + def spawn_single(client, key) + Thread.new(client, key) do |cli, k| + role = 'Single' loop do handle_errors(role) do - reply = r.call('incr', 'single') - log "#{role}: #{reply}" + reply = cli.call('incr', k) + log(role, k, reply) end ensure - sleep 1.0 + sleep WAIT_SEC end rescue StandardError => e - log "#{role}: dead: #{e.class}: #{e.message}" + log(role, :dead, e.class, e.message) raise end end - def spawn_pipeline(cli) - Thread.new(cli) do |r| - role = ' Pipeline' + def spawn_pipeline(client, key) + Thread.new(client, key) do |cli, k| + role = 'Pipeline' loop do handle_errors(role) do - reply = r.pipelined do |pi| - pi.call('incr', 'pipeline') - pi.call('incr', 'pipeline') + reply = cli.pipelined do |pi| + pi.call('incr', k) + pi.call('incr', k) end - log "#{role}: #{reply}" + log(role, k, reply.last) end ensure - sleep 1.0 + sleep WAIT_SEC end rescue StandardError => e - log "#{role}: dead: #{e.class}: #{e.message}" + log(role, :dead, e.class, e.message) raise end end - def spawn_transaction(cli) - Thread.new(cli) do |r| + def spawn_transaction(client, key) + Thread.new(client, key) do |cli, k| role = 'Transaction' i = 0 loop do handle_errors(role) do - reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx| - i += 1 - tx.call('incr', 'transaction') - tx.call('incr', 'transaction') - tx.call('incr', 'transaction') + reply = cli.multi(watch: i.odd? ? [k] : nil) do |tx| + tx.call('incr', k) + tx.call('incr', k) end - log "#{role}: #{reply}" + log(role, k, reply.last) + i += 1 end ensure - sleep 1.0 + sleep WAIT_SEC end rescue StandardError => e - log "#{role}: dead: #{e.class}: #{e.message}" + log(role, :dead, e.class, e.message) raise end end @@ -75,26 +76,30 @@ module SinglePipTxDebug def handle_errors(role) # rubocop:disable Metrics/AbcSize yield rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e - log "#{role}: #{e.class}" + log(role, e.class) rescue RedisClient::CommandError => e - log "#{role}: #{e.class}: #{e.message}" + log(role, e.class, e.message) raise unless e.message.start_with?('CLUSTERDOWN') rescue RedisClient::Cluster::ErrorCollection => e - log "#{role}: #{e.class}: #{e.message}" + log(role, e.class, e.message) raise unless e.errors.values.all? do |err| err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError) end rescue StandardError => e - log "#{role}: #{e.class}: #{e.message}" + log(role, e.class, e.message) raise end - def log(msg) - print "#{msg}\n" + def log(*texts) + return if texts.nil? || texts.empty? + + message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ') + print "#{message}\n" end end -clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client } +nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze +clients = Array.new(9) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze threads = [] Signal.trap(:INT) do @@ -104,7 +109,16 @@ Signal.trap(:INT) do exit 0 end -threads << SinglePipTxDebug.spawn_single(clients[0]) -threads << SinglePipTxDebug.spawn_pipeline(clients[1]) -threads << SinglePipTxDebug.spawn_transaction(clients[2]) +%w[single1 single3 single4].each_with_index do |key, i| + threads << SinglePipTxDebug.spawn_single(clients[i], key) +end + +%w[pipeline1 pipeline2 pipeline4].each_with_index do |key, i| + threads << SinglePipTxDebug.spawn_pipeline(clients[i + 3], key) +end + +%w[transaction1 transaction3 transaction4].each_with_index do |key, i| + threads << SinglePipTxDebug.spawn_transaction(clients[i + 6], key) +end + threads.each(&:join) diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index ceacde1..d31de89 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -15,10 +15,10 @@ class Cluster attr_reader :config - def initialize(config, pool: nil, concurrency: nil, **kwargs) - @config = config + def initialize(config = nil, pool: nil, concurrency: nil, **kwargs) + @config = config.nil? ? ClusterConfig.new(**kwargs) : config @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {})) - @command_builder = config.command_builder + @command_builder = @config.command_builder @pool = pool @kwargs = kwargs diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index 8ff62ce..80a4008 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -309,7 +309,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M work_group.push(i, raw_client) do |client| regular_timeout = client.read_timeout client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout - reply = client.call('CLUSTER', 'NODES') + reply = client.call_once('CLUSTER', 'NODES') client.read_timeout = regular_timeout parse_cluster_node_reply(reply) rescue StandardError => e diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 7a787e2..d20f629 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -892,8 +892,9 @@ def wait_for_replication server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i swap_timeout(@client, timeout: 0.1) do |client| client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout) - rescue RedisClient::ConnectionError - # ignore + rescue RedisClient::Cluster::ErrorCollection => e + # FIXME: flaky in jruby on #test_pubsub_with_wrong_command + raise unless e.errors.values.all? { |err| err.is_a?(::RedisClient::ConnectionError) } end end diff --git a/test/test_against_cluster_down.rb b/test/test_against_cluster_down.rb index cf8b78a..771c76f 100644 --- a/test/test_against_cluster_down.rb +++ b/test/test_against_cluster_down.rb @@ -4,7 +4,12 @@ class TestAgainstClusterDown < TestingWrapper WAIT_SEC = 0.1 - NUMBER_OF_JOBS = 5 + CASES = %w[Single Pipeline Transaction Subscriber Publisher].freeze + SINGLE_KEYS = %w[single1 single3 single4].freeze + PIPELINE_KEYS = %w[pipeline1 pipeline2 pipeline4].freeze + TRANSACTION_KEYS = %w[transaction1 transaction3 transaction4].freeze + CHANNELS = %w[chan1 chan2 chan3].freeze + NUMBER_OF_JOBS = SINGLE_KEYS.size + PIPELINE_KEYS.size + TRANSACTION_KEYS.size + CHANNELS.size * 2 def setup @captured_commands = ::Middlewares::CommandCapture::CommandBuffer.new @@ -28,12 +33,13 @@ def teardown end def test_recoverability_from_cluster_down - cases = %w[Single Pipeline Transaction Subscriber Publisher] - @threads << spawn_single(@clients[0], @recorders[0]) - @threads << spawn_pipeline(@clients[1], @recorders[1]) - @threads << spawn_transaction(@clients[2], @recorders[2]) - @threads << spawn_subscriber(@clients[3], @recorders[3]) - @threads << spawn_publisher(@clients[4], @recorders[4]) + SINGLE_KEYS.each_with_index { |key, i| @threads << spawn_single(@clients[i], @recorders[i], key) } + PIPELINE_KEYS.each_with_index { |key, i| @threads << spawn_pipeline(@clients[i + 3], @recorders[i + 3], key) } + TRANSACTION_KEYS.each_with_index { |key, i| @threads << spawn_transaction(@clients[i + 6], @recorders[i + 6], key) } + CHANNELS.each_with_index do |channel, i| + @threads << spawn_subscriber(@clients[i + 9], @recorders[i + 9], channel) + @threads << spawn_publisher(@clients[i + 12], @recorders[i + 12], channel) + end wait_for_jobs_to_be_stable system('docker compose --progress quiet down', exception: true) @@ -50,7 +56,7 @@ def test_recoverability_from_cluster_down wait_for_jobs_to_be_stable @values_b = @recorders.map { |r| r.get.to_i } @recorders.each_with_index do |_, i| - assert(@values_a[i] < @values_b[i], "#{cases[i]}: #{@values_a[i]} < #{@values_b[i]}") + assert(@values_a[i] < @values_b[i], "#{CASES[i]}: #{@values_a[i]} < #{@values_b[i]}") end end @@ -80,13 +86,12 @@ def build_controller ) end - def spawn_single(cli, rec) - Thread.new(cli, rec) do |c, r| + def spawn_single(client, recorder, key) + Thread.new(client, recorder, key) do |cli, rec, k| loop do handle_errors do - c.call('incr', 'single') - reply = c.call('incr', 'single') - r.set(reply) + reply = cli.call('incr', k) + rec.set(reply) end ensure sleep WAIT_SEC @@ -94,16 +99,16 @@ def spawn_single(cli, rec) end end - def spawn_pipeline(cli, rec) - Thread.new(cli, rec) do |c, r| + def spawn_pipeline(client, recorder, key) + Thread.new(client, recorder, key) do |cli, rec, k| loop do handle_errors do - reply = c.pipelined do |pi| - pi.call('incr', 'pipeline') - pi.call('incr', 'pipeline') + reply = cli.pipelined do |pi| + pi.call('incr', k) + pi.call('incr', k) end - r.set(reply[1]) + rec.set(reply.last) end ensure sleep WAIT_SEC @@ -111,18 +116,18 @@ def spawn_pipeline(cli, rec) end end - def spawn_transaction(cli, rec) - Thread.new(cli, rec) do |c, r| + def spawn_transaction(client, recorder, key) + Thread.new(client, recorder, key) do |cli, rec, k| i = 0 loop do handle_errors do - reply = c.multi(watch: i.odd? ? %w[transaction] : nil) do |tx| - i += 1 - tx.call('incr', 'transaction') - tx.call('incr', 'transaction') + reply = cli.multi(watch: i.odd? ? [k] : nil) do |tx| + tx.call('incr', k) + tx.call('incr', k) end - r.set(reply[1]) + rec.set(reply.last) + i += 1 end ensure sleep WAIT_SEC @@ -130,28 +135,29 @@ def spawn_transaction(cli, rec) end end - def spawn_publisher(cli, rec) - Thread.new(cli, rec) do |c, r| + def spawn_publisher(client, recorder, channel) + Thread.new(client, recorder, channel) do |cli, rec, chan| i = 0 loop do handle_errors do - c.call('spublish', 'chan', i) - r.set(i) - i += 1 + cli.call('spublish', chan, i) end + + rec.set(i) + i += 1 ensure sleep WAIT_SEC end end end - def spawn_subscriber(cli, rec) - Thread.new(cli, rec) do |c, r| + def spawn_subscriber(client, recorder, channel) + Thread.new(client, recorder, channel) do |cli, rec, chan| ps = nil loop do - ps = c.pubsub - ps.call('ssubscribe', 'chan') + ps = cli.pubsub + ps.call('ssubscribe', chan) break rescue StandardError ps&.close @@ -161,14 +167,12 @@ def spawn_subscriber(cli, rec) loop do handle_errors do - event = ps.next_event(0.01) + event = ps.next_event(WAIT_SEC) case event&.first - when 'smessage' then r.set(event[2]) - when 'sunsubscribe' then ps.call('ssubscribe', 'chan') + when 'smessage' then rec.set(event[2]) + when 'sunsubscribe' then ps.call('ssubscribe', chan) end end - ensure - sleep WAIT_SEC end rescue StandardError, SignalException ps&.close