From ecb762535a0468ba591e79878f0597ae83dd55e7 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Wed, 9 Aug 2023 22:34:45 +0900 Subject: [PATCH] fix: broken behavior for Pub/Sub --- lib/redis_client/cluster/pub_sub.rb | 15 ++++++++------- test/redis_client/test_cluster.rb | 18 ++++++------------ 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index 02df8e11..20157cef 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -9,6 +9,7 @@ def initialize(router, command_builder) @router = router @command_builder = command_builder @pubsub_states = {} + @messages = [] end def call(*args, **kwargs) @@ -22,15 +23,15 @@ def call_v(command) def close @pubsub_states.each_value(&:close) @pubsub_states.clear + @messages.clear end def next_event(timeout = nil) return if @pubsub_states.empty? + return @messages.shift unless @messages.empty? - msgs = collect_messages(timeout).compact - return msgs.first if msgs.size < 2 - - msgs + collect_messages(timeout) + @messages.shift end private @@ -45,8 +46,8 @@ def _call(command) pubsub.call_v(command) end - def collect_messages(timeout) # rubocop:disable Metrics/AbcSize - @pubsub_states.each_slice(MAX_THREADS).each_with_object([]) do |chuncked_pubsub_states, acc| + def collect_messages(timeout) + @pubsub_states.each_slice(MAX_THREADS) do |chuncked_pubsub_states| threads = chuncked_pubsub_states.map do |_, v| Thread.new(v) do |pubsub| Thread.current[:reply] = pubsub.next_event(timeout) @@ -57,7 +58,7 @@ def collect_messages(timeout) # rubocop:disable Metrics/AbcSize threads.each do |t| t.join - acc << t[:reply] unless t[:reply].nil? + @messages << t[:reply] unless t[:reply].nil? end end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 8554de07..d79e63b4 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -214,10 +214,8 @@ def test_global_pubsub_with_multiple_channels sub = Fiber.new do |pubsub| pubsub.call('SUBSCRIBE', *Array.new(10) { |i| "g-chan#{i}" }) - assert_equal( - Array.new(10) { |i| ['subscribe', "g-chan#{i}", i + 1] }, - collect_messages(pubsub).sort_by { |e| e[1].to_s } - ) + got = collect_messages(pubsub).sort_by { |e| e[1].to_s } + 10.times { |i| assert_equal(['subscribe', "g-chan#{i}", i + 1], got[i]) } Fiber.yield Fiber.yield(collect_messages(pubsub)) pubsub.call('UNSUBSCRIBE') @@ -229,10 +227,8 @@ def test_global_pubsub_with_multiple_channels cli.pipelined { |pi| 10.times { |i| pi.call('PUBLISH', "g-chan#{i}", i) } } end - assert_equal( - Array.new(10) { |i| ['message', "g-chan#{i}", i.to_s] }, - sub.resume.sort_by { |e| e[1].to_s } - ) + got = sub.resume.sort_by { |e| e[1].to_s } + 10.times { |i| assert_equal(['message', "g-chan#{i}", i.to_s], got[i]) } end def test_sharded_pubsub @@ -285,10 +281,8 @@ def test_sharded_pubsub_with_multiple_channels cli.pipelined { |pi| 10.times { |i| pi.call('SPUBLISH', "s-chan#{i}", i) } } end - assert_equal( - Array.new(10) { |i| ['smessage', "s-chan#{i}", i.to_s] }, - sub.resume.sort_by { |e| e[1].to_s } - ) + got = sub.resume.sort_by { |e| e[1].to_s } + 10.times { |i| assert_equal(['smessage', "s-chan#{i}", i.to_s], got[i]) } end def test_close