diff --git a/lib/message_bus/backends/redis_streams.rb b/lib/message_bus/backends/redis_streams.rb index c030af76..bad42cc0 100644 --- a/lib/message_bus/backends/redis_streams.rb +++ b/lib/message_bus/backends/redis_streams.rb @@ -181,7 +181,9 @@ def backlog(channel, last_id = nil) items = redis.xrange backlog_key, start, "+" items.map do |_id, (_, payload)| - MessageBus::Message.decode(payload) + m = MessageBus::Message.decode(payload) + m.global_id = -1 + m end end @@ -206,16 +208,9 @@ def global_backlog(last_id = nil) # (see Base#get_message) def get_message(channel, message_id) - redis = pub_redis - backlog_key = backlog_key(channel) - - items = redis.xrange backlog_key, "0-#{message_id}", "0-#{message_id}" - if items && items[0] - _id, (_, payload) = items[0] - MessageBus::Message.decode(payload) - else - nil - end + message = _get_message(channel, message_id) + message.global_id = -1 if message + message end # (see Base#subscribe) @@ -228,7 +223,7 @@ def subscribe(channel, last_id = nil) # we need to translate this to a global id, at least give it a shot # we are subscribing on global and global is always going to be bigger than local # so worst case is a replay of a few messages - message = get_message(channel, last_id) + message = _get_message(channel, last_id) if message last_id = message.global_id end @@ -341,11 +336,24 @@ def unsubscribe_key "__mb_unsubscribe_n" end + def _get_message(channel, message_id) + redis = pub_redis + backlog_key = backlog_key(channel) + + items = redis.xrange backlog_key, "0-#{message_id}", "0-#{message_id}" + if items && items[0] + _id, (_, payload) = items[0] + MessageBus::Message.decode(payload) + else + nil + end + end + def message_from_global_backlog(payload) pipe = payload.index "|" message_id = payload[0..pipe].to_i channel = payload[pipe + 1..-1] - get_message(channel, message_id) + _get_message(channel, message_id) end def cached_eval(redis, script, script_sha1, params)