Skip to content

Commit

Permalink
Merge pull request #30 from xconnio/broker
Browse files Browse the repository at this point in the history
Broker Implementation
  • Loading branch information
rubyonrails3 authored May 9, 2024
2 parents 0996e0b + e110aca commit 6693d7a
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/wampproto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require_relative "wampproto/session"
require_relative "wampproto/message_with_recipient"
require_relative "wampproto/dealer"
require_relative "wampproto/broker"

module Wampproto
class Error < StandardError; end
Expand Down
139 changes: 139 additions & 0 deletions lib/wampproto/broker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# frozen_string_literal: true

module Wampproto
# Wampproto broker implementation
class Broker # rubocop:disable Metrics/ClassLength
attr_reader :subscriptions_by_session, :subscriptions_by_topic, :id_gen

def initialize(id_gen = IdGenerator.new)
@id_gen = id_gen
@subscriptions_by_session = {}
@subscriptions_by_topic = {}
end

def add_session(session_id)
error_message = "cannot add session twice"
raise KeyError, error_message if subscriptions_by_session.include?(session_id)

subscriptions_by_session[session_id] = {}
end

def remove_session(session_id)
error_message = "cannot remove non-existing session"
raise KeyError, error_message unless subscriptions_by_session.include?(session_id)

subscriptions = subscriptions_by_session.delete(session_id) || {}
subscriptions.each do |subscription_id, topic|
remove_topic_subscriber(topic, subscription_id, session_id)
end
end

def subscription?(topic)
subscriptions = subscriptions_by_topic[topic]
return false unless subscriptions

subscriptions.any?
end

def receive_message(session_id, message)
case message
when Message::Subscribe then handle_subscribe(session_id, message)
when Message::Unsubscribe then handle_unsubscribe(session_id, message)
when Message::Publish then handle_publish(session_id, message)
else
raise ValueError, "message type not supported"
end
end

def handle_publish(session_id, message) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
error_message = "cannot publish, session #{session_id} doesn't exist"
raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

subscriptions = subscriptions_by_topic.fetch(message.topic, {})
if subscriptions.empty?
error = Message::Error.new(Message::Type::PUBLISH, message.request_id, {}, "wamp.error.no_such_subscription")
return MessageWithRecipient.new(error, session_id)
end

publication_id = id_gen.next

messages = []
if message.options[:acknowledge]
published = Message::Published.new(message.request_id, publication_id)
messages << MessageWithRecipient.new(published, session_id)
end
subscription_id, session_ids = subscriptions.first

event = Message::Event.new(subscription_id, publication_id, {}, *message.args, **message.kwargs)

session_ids.each_with_object(messages) do |recipient_id, list|
list << MessageWithRecipient.new(event, recipient_id) unless session_id == recipient_id
end
end

def handle_subscribe(session_id, message)
error_message = "cannot subscribe, session #{session_id} doesn't exist"
raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

subscription_id = find_subscription_id_from(message.topic)
add_topic_subscriber(message.topic, subscription_id, session_id)
subscriptions_by_session[session_id][subscription_id] = message.topic

subscribed = Message::Subscribed.new(message.request_id, subscription_id)
MessageWithRecipient.new(subscribed, session_id)
end

def handle_unsubscribe(session_id, message) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
error_message = "cannot unsubscribe, session #{session_id} doesn't exist"
raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

subscriptions = subscriptions_by_session.fetch(session_id)

unless subscriptions.include?(message.subscription_id)
error = Message::Error.new(Message::Type::UNSUBSCRIBE, message.request_id, {},
"wamp.error.no_such_subscription")
return MessageWithRecipient.new(error, session_id)
end

topic = subscriptions.fetch(message.subscription_id)

remove_topic_subscriber(topic, message.subscription_id, session_id)
subscriptions_by_session[session_id].delete(message.subscription_id)

unsubscribed = Message::Unsubscribed.new(message.request_id)
MessageWithRecipient.new(unsubscribed, session_id)
end

private

def find_subscription_id_from(topic)
subscription_id, = subscriptions_by_topic.fetch(topic, {}).first
return subscription_id if subscription_id

id_gen.next
end

def remove_topic_subscriber(topic, subscription_id, session_id)
subscriptions = subscriptions_by_topic.fetch(topic, {})
return if subscriptions.empty?

if subscriptions.one? && subscriptions[subscription_id].include?(session_id)
return subscriptions_by_topic.delete(topic)
end

subscriptions_by_topic[topic][subscription_id].delete(session_id)
end

def add_topic_subscriber(topic, subscription_id, session_id)
subscriptions = subscriptions_by_topic.fetch(topic, {})
if subscriptions.empty?
subscriptions[subscription_id] = [session_id]
else
sessions = subscriptions.fetch(subscription_id, [])
sessions << session_id unless sessions.include?(session_id)
subscriptions[subscription_id] = sessions
end
subscriptions_by_topic[topic] = subscriptions
end
end
end
2 changes: 1 addition & 1 deletion lib/wampproto/dealer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def handle_unregister(session_id, message) # rubocop:disable Metrics/AbcSize, Me
registrations = registrations_by_session.fetch(session_id)

unless registrations.include?(message.registration_id)
error = Message::Error.new(Message::Type::CALL, message.request_id, {}, "wamp.error.no_such_registration")
error = Message::Error.new(Message::Type::UNREGISTER, message.request_id, {}, "wamp.error.no_such_registration")
return MessageWithRecipient.new(error, session_id)
end

Expand Down
40 changes: 40 additions & 0 deletions sig/wampproto/broker.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Wampproto
# Wampproto broker implementation
class Broker
@id_gen: untyped

@subscriptions_by_session: Hash[Integer, Hash[Integer, String]]

@subscriptions_by_topic: Hash[String, Hash[Integer, Array[Integer]]]

attr_reader subscriptions_by_session: Hash[Integer, Hash[Integer, String]]

attr_reader subscriptions_by_topic: Hash[String, Hash[Integer, Array[Integer]]]

attr_reader id_gen: IdGenerator

def initialize: (?IdGenerator id_gen) -> void

def add_session: (Integer session_id) -> untyped

def remove_session: (Integer session_id) -> untyped

def subscription?: (String topic) -> (false | untyped)

def receive_message: (Integer session_id, untyped message) -> untyped

def handle_publish: (Integer session_id, Message::Publish message) -> untyped

def handle_subscribe: (Integer session_id, Message::Subscribe message) -> untyped

def handle_unsubscribe: (Integer session_id, Message::Unsubscribe message) -> untyped

private

def find_subscription_id_from: (String topic) -> Integer

def remove_topic_subscriber: (String topic, Integer subscription_id, Integer session_id) -> untyped

def add_topic_subscriber: (String topic, Integer subscription_id, Integer session_id) -> untyped
end
end
92 changes: 92 additions & 0 deletions spec/wampproto/broker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# frozen_string_literal: true

RSpec.describe Wampproto::Broker do
let(:session_id) { 123_456 }
let(:request_id) { 1 }
let(:topic) { "com.hello.first" }
let(:broker) { described_class.new }
let(:subscribe) { Wampproto::Message::Subscribe.new(request_id, {}, topic) }

context "when session is added" do
before { broker.add_session(session_id) }

context "when topic is subscribed" do
subject { subscribe_response.message }

let(:subscribe_response) { broker.receive_message(session_id, subscribe) }

it { is_expected.to be_instance_of Wampproto::Message::Subscribed }

context "when second session subscribes to topic" do
subject { next_subscribe_response.message }

let(:subscription_id) { subscribe_response.message.subscription_id }
let(:next_subscription_id) { next_subscribe_response.message.subscription_id }
let(:next_session_id) { 445_666 }
let(:next_subscribe) { Wampproto::Message::Subscribe.new(request_id, {}, topic) }
let(:next_subscribe_response) { broker.receive_message(next_session_id, next_subscribe) }

before do
broker.add_session(next_session_id)
subscribe_response
end

it { is_expected.to be_instance_of Wampproto::Message::Subscribed }

it "share the same subscription_id" do
expect(subscription_id).to eq(next_subscription_id)
end
end

context "when topic is unsubscribed" do
subject { unsubscribe_response.message }

before { subscribe_response }

let(:unsubscribe) do
Wampproto::Message::Unsubscribe.new(request_id + 1, subscribe_response.message.subscription_id)
end

let(:unsubscribe_response) { broker.receive_message(session_id, unsubscribe) }

it { is_expected.to be_an_instance_of Wampproto::Message::Unsubscribed }
end

context "when topic is published" do
let(:publisher_id) { 333 }

before { broker.add_session(publisher_id) }

context "when acknowledge option is passed" do
subject { publish_response }

let(:publish) { Wampproto::Message::Publish.new(request_id, { acknowledge: true }, topic) }
let(:publish_response) { broker.receive_message(publisher_id, publish) }

before { subscribe_response }

it { is_expected.to be_instance_of Array }

it "includes two messages" do
expect(publish_response.length).to eq 2
end
end

context "when acknowledge option is missing" do
subject { publish_response }

let(:publish) { Wampproto::Message::Publish.new(request_id, {}, topic) }
let(:publish_response) { broker.receive_message(publisher_id, publish) }

before { subscribe_response }

it { is_expected.to be_instance_of Array }

it "includes two messages" do
expect(publish_response.length).to eq 1
end
end
end
end
end
end

0 comments on commit 6693d7a

Please sign in to comment.