Skip to content

Commit

Permalink
Streams implementation shouldn't unduly leak global_id
Browse files Browse the repository at this point in the history
  • Loading branch information
benlangfeld committed Dec 3, 2018
1 parent c15ed2d commit 1dd89d4
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions lib/message_bus/backends/redis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ def backlog(channel, last_id = nil)
items = redis.xrange backlog_key, start, "+"

items.map do |_id, (_, payload)|
MessageBus::Message.decode(payload)
m = MessageBus::Message.decode(payload)
m.global_id = -1
m
end
end

Expand All @@ -206,16 +208,9 @@ def global_backlog(last_id = nil)

# (see Base#get_message)
def get_message(channel, message_id)
redis = pub_redis
backlog_key = backlog_key(channel)

items = redis.xrange backlog_key, "0-#{message_id}", "0-#{message_id}"
if items && items[0]
_id, (_, payload) = items[0]
MessageBus::Message.decode(payload)
else
nil
end
message = _get_message(channel, message_id)
message.global_id = -1 if message
message
end

# (see Base#subscribe)
Expand All @@ -228,7 +223,7 @@ def subscribe(channel, last_id = nil)
# we need to translate this to a global id, at least give it a shot
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
message = _get_message(channel, last_id)
if message
last_id = message.global_id
end
Expand Down Expand Up @@ -341,11 +336,24 @@ def unsubscribe_key
"__mb_unsubscribe_n"
end

def _get_message(channel, message_id)
redis = pub_redis
backlog_key = backlog_key(channel)

items = redis.xrange backlog_key, "0-#{message_id}", "0-#{message_id}"
if items && items[0]
_id, (_, payload) = items[0]
MessageBus::Message.decode(payload)
else
nil
end
end

def message_from_global_backlog(payload)
pipe = payload.index "|"
message_id = payload[0..pipe].to_i
channel = payload[pipe + 1..-1]
get_message(channel, message_id)
_get_message(channel, message_id)
end

def cached_eval(redis, script, script_sha1, params)
Expand Down

0 comments on commit 1dd89d4

Please sign in to comment.