Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: patch some minor changes #395

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ jobs:
- {redis: '7.2', ruby: '3.3', driver: 'hiredis'}
- {redis: '7.2', ruby: '3.3', driver: 'hiredis', compose: compose.ssl.yaml}
- {redis: '7.2', ruby: '3.3', compose: compose.replica.yaml, replica: '2'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_down}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {redis: '8', ruby: '3.3', compose: compose.valkey.yaml, replica: '2'}
- {redis: '7.2', ruby: '3.2', compose: compose.auth.yaml}
- {redis: '7.0', ruby: '3.1'}
- {redis: '6.2', ruby: '3.0'}
- {redis: '5.0', ruby: '2.7'}
- {task: test_cluster_down}
- {task: test_cluster_broken, restart: 'no', startup: '6'}
- {task: test_cluster_state, pattern: 'PrimaryOnly', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'Pooled', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandom', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadRandomWithPrimary', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {task: test_cluster_state, pattern: 'ScaleReadLatency', compose: compose.valkey.yaml, redis: '8', replica: '2', startup: '9'}
- {ruby: 'jruby'}
- {ruby: 'truffleruby'}
- {task: test_cluster_scale, pattern: 'Single', compose: compose.scale.yaml, startup: '8'}
Expand Down
66 changes: 36 additions & 30 deletions bin/pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,57 @@ require 'bundler/setup'
require 'redis_cluster_client'

module PubSubDebug
WAIT_SEC = 2.0

module_function

def spawn_publisher(cli, chan)
Thread.new(cli, chan) do |r, c|
role = ' Publisher'
def spawn_publisher(client, channel)
Thread.new(client, channel) do |cli, chan|
role = 'Publisher'
i = 0

loop do
handle_errors(role) do
msg = format('%05d', i)
r.call('spublish', c, msg)
log "#{role}: sent: #{msg}"
cli.call('spublish', chan, i)
log(role, :spublish, chan, i)
i += 1
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize
Thread.new(cli, chan) do |r, c|
def spawn_subscriber(client, channel) # rubocop:disable Metrics/AbcSize
Thread.new(client, channel) do |cli, chan|
role = 'Subscriber'
ps = nil

loop do
ps = r.pubsub
ps.call('ssubscribe', c)
log "#{role}: done: subscription started to #{c}"
ps = cli.pubsub
ps.call('ssubscribe', chan)
break
rescue StandardError => e
log "#{role}: init: #{e.class}: #{e.message}"
log(role, :init, e.class, e.message)
ps&.close
ensure
sleep 1.0
sleep WAIT_SEC
end

loop do
handle_errors('Subscriber') do
e = ps.next_event(0.01)
log "#{role}: recv: #{e.nil? ? 'nil' : e}"
ps.call('ssubscribe', c) if !e.nil? && e.first == 'sunsubscribe'
event = ps.next_event(WAIT_SEC)
log(role, *event) unless event.nil?
case event&.first
when 'sunsubscribe' then ps.call('ssubscribe', chan)
end
end
ensure
sleep 1.0
end
rescue StandardError, SignalException => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
ps&.close
raise
end
Expand All @@ -64,23 +64,26 @@ module PubSubDebug
def handle_errors(role)
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: recv: #{e.class}"
log(role, e.class)
rescue RedisClient::CommandError => e
log "#{role}: recv: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.message.start_with?('CLUSTERDOWN')
rescue StandardError => e
log "#{role}: recv: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise
end

def log(msg)
print "#{msg}\n"
def log(*texts)
return if texts.nil? || texts.empty?

message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
print "#{message}\n"
end
end

clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client }
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
clients = Array.new(6) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
threads = []
channel = 'chan1'

Signal.trap(:INT) do
threads.each(&:exit)
Expand All @@ -89,6 +92,9 @@ Signal.trap(:INT) do
exit 0
end

threads << PubSubDebug.spawn_subscriber(clients[0], channel)
threads << PubSubDebug.spawn_publisher(clients[1], channel)
%w[chan1 chan2 chan3].each_with_index do |channel, i|
threads << PubSubDebug.spawn_subscriber(clients[i], channel)
threads << PubSubDebug.spawn_publisher(clients[i + 3], channel)
end

threads.each(&:join)
86 changes: 50 additions & 36 deletions bin/singlepiptx
Original file line number Diff line number Diff line change
Expand Up @@ -5,96 +5,101 @@ require 'bundler/setup'
require 'redis_cluster_client'

module SinglePipTxDebug
WAIT_SEC = 2.0

module_function

def spawn_single(cli)
Thread.new(cli) do |r|
role = ' Single'
def spawn_single(client, key)
Thread.new(client, key) do |cli, k|
role = 'Single'

loop do
handle_errors(role) do
reply = r.call('incr', 'single')
log "#{role}: #{reply}"
reply = cli.call('incr', k)
log(role, k, reply)
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_pipeline(cli)
Thread.new(cli) do |r|
role = ' Pipeline'
def spawn_pipeline(client, key)
Thread.new(client, key) do |cli, k|
role = 'Pipeline'

loop do
handle_errors(role) do
reply = r.pipelined do |pi|
pi.call('incr', 'pipeline')
pi.call('incr', 'pipeline')
reply = cli.pipelined do |pi|
pi.call('incr', k)
pi.call('incr', k)
end

log "#{role}: #{reply}"
log(role, k, reply.last)
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def spawn_transaction(cli)
Thread.new(cli) do |r|
def spawn_transaction(client, key)
Thread.new(client, key) do |cli, k|
role = 'Transaction'
i = 0

loop do
handle_errors(role) do
reply = r.multi(watch: i.odd? ? %w[transaction] : nil) do |tx|
i += 1
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
reply = cli.multi(watch: i.odd? ? [k] : nil) do |tx|
tx.call('incr', k)
tx.call('incr', k)
end

log "#{role}: #{reply}"
log(role, k, reply.last)
i += 1
end
ensure
sleep 1.0
sleep WAIT_SEC
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
log(role, :dead, e.class, e.message)
raise
end
end

def handle_errors(role) # rubocop:disable Metrics/AbcSize
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: #{e.class}"
log(role, e.class)
rescue RedisClient::CommandError => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.message.start_with?('CLUSTERDOWN')
rescue RedisClient::Cluster::ErrorCollection => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end
rescue StandardError => e
log "#{role}: #{e.class}: #{e.message}"
log(role, e.class, e.message)
raise
end

def log(msg)
print "#{msg}\n"
def log(*texts)
return if texts.nil? || texts.empty?

message = texts.map { |text| "#{' ' * [15 - text.to_s.size, 0].max}#{text}" }.join(': ')
print "#{message}\n"
end
end

clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client }
nodes = (6379..6384).map { |port| "redis://127.0.0.1:#{port}" }.freeze
clients = Array.new(9) { RedisClient.cluster(nodes: nodes, connect_with_original_config: true).new_client }.freeze
threads = []

Signal.trap(:INT) do
Expand All @@ -104,7 +109,16 @@ Signal.trap(:INT) do
exit 0
end

threads << SinglePipTxDebug.spawn_single(clients[0])
threads << SinglePipTxDebug.spawn_pipeline(clients[1])
threads << SinglePipTxDebug.spawn_transaction(clients[2])
%w[single1 single3 single4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_single(clients[i], key)
end

%w[pipeline1 pipeline2 pipeline4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_pipeline(clients[i + 3], key)
end

%w[transaction1 transaction3 transaction4].each_with_index do |key, i|
threads << SinglePipTxDebug.spawn_transaction(clients[i + 6], key)
end

threads.each(&:join)
6 changes: 3 additions & 3 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class Cluster

attr_reader :config

def initialize(config, pool: nil, concurrency: nil, **kwargs)
@config = config
def initialize(config = nil, pool: nil, concurrency: nil, **kwargs)
@config = config.nil? ? ClusterConfig.new(**kwargs) : config
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
@command_builder = config.command_builder
@command_builder = @config.command_builder

@pool = pool
@kwargs = kwargs
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M
work_group.push(i, raw_client) do |client|
regular_timeout = client.read_timeout
client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout
reply = client.call('CLUSTER', 'NODES')
reply = client.call_once('CLUSTER', 'NODES')
client.read_timeout = regular_timeout
parse_cluster_node_reply(reply)
rescue StandardError => e
Expand Down
5 changes: 3 additions & 2 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,9 @@ def wait_for_replication
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
swap_timeout(@client, timeout: 0.1) do |client|
client&.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
rescue RedisClient::ConnectionError
# ignore
rescue RedisClient::Cluster::ErrorCollection => e
# FIXME: flaky in jruby on #test_pubsub_with_wrong_command
raise unless e.errors.values.all? { |err| err.is_a?(::RedisClient::ConnectionError) }
end
end

Expand Down
Loading