diff --git a/lib/wamp/message_handler.rb b/lib/wamp/message_handler.rb index 0ad553e..cdfc9f6 100644 --- a/lib/wamp/message_handler.rb +++ b/lib/wamp/message_handler.rb @@ -2,6 +2,11 @@ require_relative "message_handler/base" +require_relative "message_handler/hello" +require_relative "message_handler/welcome" +require_relative "message_handler/challenge" +require_relative "message_handler/goodbye" + require_relative "message_handler/subscribe" require_relative "message_handler/subscribed" require_relative "message_handler/unsubscribe" @@ -32,9 +37,9 @@ module MessageHandler # instantiate correct handler module ClassMethods def resolve(data, connection) - return handle_when_not_joined(data, connection) unless connection.joiner.joined? + # return handle_when_not_joined(data, connection) unless connection.joiner.joined? - message = connection.session.receive(data) + message = connection.joiner.serializer.deserialize(data) klass_name = demodulize(message.class.name) klass = constantize("Wamp::MessageHandler::#{klass_name}") klass.new(message, connection) diff --git a/lib/wamp/message_handler/api.rb b/lib/wamp/message_handler/api.rb index c9eea64..cda0018 100644 --- a/lib/wamp/message_handler/api.rb +++ b/lib/wamp/message_handler/api.rb @@ -8,17 +8,11 @@ module MessageHandler class Api extend Forwardable - attr_reader :connection, :session_id + attr_reader :connection, :session_id, :id_gen def initialize(connection) @connection = connection - end - - def send_protocol_violation(text, *args, **kwargs) - message = Message::Abort.new({ message: text }, "wamp.error.protocol_violation", *args, **kwargs) - manager = Manager::Event::Abort.new(message, self) - connection.transmit(message.payload) - manager.emit_event(message) + @id_gen = Wampproto::IdGenerator.new end def subscribe(topic, handler, options = {}, &block) @@ -63,25 +57,10 @@ def unregister(registration_id, &block) action.send_message(&block) end - def on_message(message) - manager = Manager::Event.resolve(message, self) - manager.emit_event(message) - end - - def create_request_id - next_request_id - end - private def next_request_id - @next_request_id = create_request_id_generator unless defined?(@next_request_id) - @next_request_id.call - end - - def create_request_id_generator - request_id = 0 - -> { request_id += 1 } + id_gen.next end end end diff --git a/lib/wamp/message_handler/base.rb b/lib/wamp/message_handler/base.rb index d4d785a..f900a00 100644 --- a/lib/wamp/message_handler/base.rb +++ b/lib/wamp/message_handler/base.rb @@ -14,6 +14,16 @@ def initialize(message, connection) @connection = connection end + def handle + raise NotImplementedError + end + + def send_message + raise NotImplementedError + end + + private + def stored_data @stored_data ||= store.delete(store_key) || {} end @@ -30,14 +40,6 @@ def identity message.request_id end - def handle - raise NotImplementedError - end - - def send_message - raise NotImplementedError - end - def deliver_response callback = stored_data.fetch(:callback, proc {}) return unless callback @@ -45,6 +47,10 @@ def deliver_response callback.call(message) end + def validate_received_message + connection.session.receive_message(message) + end + def send_serialized(message) connection.transmit session.send_message(message) end diff --git a/lib/wamp/message_handler/call.rb b/lib/wamp/message_handler/call.rb index c5dc5a3..7c99beb 100644 --- a/lib/wamp/message_handler/call.rb +++ b/lib/wamp/message_handler/call.rb @@ -4,18 +4,10 @@ module Wamp module MessageHandler # Call class Call < Base - # Call and Result share the same request_id - # Invocation and Yield share the same request_id - def handle - invocation = find_connection.handle_call(message) - connection.invocation_requests[invocation.request_id] = message.request_id - connection.transmit invocation - end - def send_message(handler) - send_serialized message + store[store_key] = { handler: handler, callback: handler } - store[store_key] = { handler: handler } + send_serialized message end end end diff --git a/lib/wamp/message_handler/challenge.rb b/lib/wamp/message_handler/challenge.rb new file mode 100644 index 0000000..74bfb4f --- /dev/null +++ b/lib/wamp/message_handler/challenge.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Challenge + class Challenge < Hello + def handle + connection.transmit connection.joiner.receive(connection.joiner.serializer.serialize(message)) + end + end + end +end diff --git a/lib/wamp/message_handler/error.rb b/lib/wamp/message_handler/error.rb index 8f0c268..ebfd884 100644 --- a/lib/wamp/message_handler/error.rb +++ b/lib/wamp/message_handler/error.rb @@ -5,7 +5,9 @@ module MessageHandler # Call handler with error message class Error < Base def handle - stored_data[:handler].call(message) + validate_received_message + + stored_data[:callback].call(message) end end end diff --git a/lib/wamp/message_handler/event.rb b/lib/wamp/message_handler/event.rb index 1562f66..d9f51c1 100644 --- a/lib/wamp/message_handler/event.rb +++ b/lib/wamp/message_handler/event.rb @@ -5,6 +5,8 @@ module MessageHandler # publish event to subscriber class Event < Base def handle + validate_received_message + store[alt_store_key].fetch(:handler).call(message) end diff --git a/lib/wamp/message_handler/goodbye.rb b/lib/wamp/message_handler/goodbye.rb new file mode 100644 index 0000000..8d5729b --- /dev/null +++ b/lib/wamp/message_handler/goodbye.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # send unregister message + class Goodbye < Base + def send_message(&callback) + store[store_key] = { callback: callback } + + send_serialized message + end + + def handle + goodbye = Wampproto::Message::Goodbye.new({}, "wamp.close.goodbye_and_out") + send_serialized goodbye + end + end + end +end diff --git a/lib/wamp/message_handler/hello.rb b/lib/wamp/message_handler/hello.rb index 02a4797..24c3c65 100644 --- a/lib/wamp/message_handler/hello.rb +++ b/lib/wamp/message_handler/hello.rb @@ -3,13 +3,16 @@ module Wamp module MessageHandler # Hello - class Hello - def initialize - @message = message - @connection = connection + class Hello < Base + def handle + msg, is_welcome = connection.acceptor.receive(connection.serializer.serialize(message)) + connection.transmit msg + connection.router.attach_client(connection) if is_welcome end - def send_message; end + def send_message + connection.transmit connection.joiner.send_hello + end end end end diff --git a/lib/wamp/message_handler/invocation.rb b/lib/wamp/message_handler/invocation.rb index 0e1a2e4..23a4f08 100644 --- a/lib/wamp/message_handler/invocation.rb +++ b/lib/wamp/message_handler/invocation.rb @@ -5,6 +5,7 @@ module MessageHandler # Call class Invocation < Base def handle + connection.session.receive_message(message) data = store.fetch(alt_store_key) send_yield_message data.fetch(:handler) diff --git a/lib/wamp/message_handler/publish.rb b/lib/wamp/message_handler/publish.rb index 446f5b6..d18c49b 100644 --- a/lib/wamp/message_handler/publish.rb +++ b/lib/wamp/message_handler/publish.rb @@ -5,11 +5,9 @@ module MessageHandler # Publish message class Publish < Base def send_message(&callback) - send_serialized message - - return unless message.options[:acknowledge] + store[store_key] = { callback: callback } if message.options[:acknowledge] - store[store_key] = { callback: callback } + send_serialized message end end end diff --git a/lib/wamp/message_handler/published.rb b/lib/wamp/message_handler/published.rb index e82ba61..f5c3386 100644 --- a/lib/wamp/message_handler/published.rb +++ b/lib/wamp/message_handler/published.rb @@ -5,6 +5,8 @@ module MessageHandler # Published confirmation message class Published < Base def handle + validate_received_message + deliver_response end end diff --git a/lib/wamp/message_handler/registered.rb b/lib/wamp/message_handler/registered.rb index 4a381de..5168ace 100644 --- a/lib/wamp/message_handler/registered.rb +++ b/lib/wamp/message_handler/registered.rb @@ -5,6 +5,8 @@ module MessageHandler # Registered callback class Registered < Base def handle + validate_received_message + store[alt_store_key] = { handler: stored_data.fetch(:handler), procedure: stored_data.fetch(:procedure) } store_procedure diff --git a/lib/wamp/message_handler/result.rb b/lib/wamp/message_handler/result.rb index a889590..7165cf8 100644 --- a/lib/wamp/message_handler/result.rb +++ b/lib/wamp/message_handler/result.rb @@ -5,6 +5,7 @@ module MessageHandler # Result class Result < Base def handle + validate_received_message stored_data.fetch(:handler).call(message) end end diff --git a/lib/wamp/message_handler/subscribed.rb b/lib/wamp/message_handler/subscribed.rb index 7ea15e3..206efb6 100644 --- a/lib/wamp/message_handler/subscribed.rb +++ b/lib/wamp/message_handler/subscribed.rb @@ -5,6 +5,8 @@ module MessageHandler # Receive subscribed class Subscribed < Base def handle + validate_received_message + store[alt_store_key] = { handler: stored_data.fetch(:handler), topic: stored_data.fetch(:topic) } store_topic diff --git a/lib/wamp/message_handler/unregistered.rb b/lib/wamp/message_handler/unregistered.rb index 9a93d09..96af562 100644 --- a/lib/wamp/message_handler/unregistered.rb +++ b/lib/wamp/message_handler/unregistered.rb @@ -5,6 +5,8 @@ module MessageHandler # callback for unregister message class Unregistered < Base def handle + validate_received_message + delete_procedure store.delete(alt_store_key) deliver_response diff --git a/lib/wamp/message_handler/unsubscribed.rb b/lib/wamp/message_handler/unsubscribed.rb index 5354c89..7d49b2a 100644 --- a/lib/wamp/message_handler/unsubscribed.rb +++ b/lib/wamp/message_handler/unsubscribed.rb @@ -5,6 +5,7 @@ module MessageHandler # Receive unsubscribed class Unsubscribed < Base def handle + validate_received_message delete_topic store.delete(alt_store_key) deliver_response diff --git a/lib/wamp/message_handler/welcome.rb b/lib/wamp/message_handler/welcome.rb new file mode 100644 index 0000000..118ed54 --- /dev/null +++ b/lib/wamp/message_handler/welcome.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Wamp + module MessageHandler + # Welcome + class Welcome < Base + def handle + connection.executor&.call(connection.api) + end + end + end +end diff --git a/lib/wamp/message_handler/yield.rb b/lib/wamp/message_handler/yield.rb index fcf83e6..06d67d7 100644 --- a/lib/wamp/message_handler/yield.rb +++ b/lib/wamp/message_handler/yield.rb @@ -3,18 +3,7 @@ module Wamp module MessageHandler # Yield - class Yield - def initialize - @message = message - @connection = connection - end - - def handle - call_request_id = connection.invocation_requests.delete(message.request_id) - handler = call_requests.delete(call_request_id) - handler.call(message) - end - + class Yield < Base def send_message(handler) connection.transmit message connection.call_requests[message.request_id] = handler diff --git a/lib/wamp/router/registrations.rb b/lib/wamp/router/registrations.rb deleted file mode 100644 index 28246e6..0000000 --- a/lib/wamp/router/registrations.rb +++ /dev/null @@ -1,122 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Router - # Handle Procedure Registrations - class Registrations - @registrations = {} - @registration_ids = {} - class << self - def register(message, session) - return procedure_already_registered(message) if check_registered?(message) - - registration_id = create_or_update_registration(message, session) - - Wamp::Message::Registered.new(message.request_id, registration_id) - end - - def check_registered?(message) - invocation_policy = message.options.fetch(:invoke, :single) - return @registrations.include?(message.procedure) if invocation_policy == "single" - - registration = @registrations[message.procedure] - return false unless registration - - return true if registration[:message].options.fetch(:invoke, :single) != invocation_policy - - false - end - - def clean_registrations(session) - @registrations.each_key { |procedure| clean_registration(procedure, session) } - end - - def clean_registration(procedure, session) - registration = @registrations[procedure] - sessions = registration[:sessions] - if sessions.one? && sessions.include?(session) - puts "Removing Registration #{registration[:registration_id]}, procedure: #{procedure}" - @registrations.delete(procedure) - elsif sessions.include?(session) - puts "Removing Session #{session.session_id}, procedure: #{procedure}" - sessions.delete(session) - end - end - - def clean_registration_by_id(registration_id, session) - procedure = @registration_ids[registration_id] - return unless procedure - - clean_registration(procedure, session) - end - - def create_or_update_registration(message, session) - registration = @registrations[message.procedure] || {} - registration.empty? ? create_registration(message, session) : update_registration(registration, session) - end - - def create_registration(message, session) - registration_id = create_registration_id(message.procedure) - @registrations[message.procedure] = { - message: message, - registration_id: registration_id, - sessions: [session] - } - registration_id - end - - def update_registration(registration, session) - registration[:sessions] << session - registration.fetch(:registration_id) - end - - def invoke(message, caller_session) - unless @registrations.include?(message.procedure) - return Manager::Event.resolve(no_such_procedure(message), caller_session) - end - - registration = @registrations.fetch(message.procedure) - registration_id = registration[:registration_id] - callee_session = find_session(registration) - - Wamp::Message::Invocation.new(message.request_id * 2000, registration_id, {}, *message.args, **message.kwargs) - .then { |msg| Manager::Event.resolve(msg, callee_session) } - end - - def find_session(registration) - sessions = registration.fetch(:sessions) - index = find_session_index(registration, sessions.length) - sessions[index] - end - - def find_session_index(registration, session_length) - invocation_policy = registration.fetch(:message).options.fetch(:invoke, :single).intern - index = { single: 0, first: 0, last: -1, random: rand(0..(session_length - 1)) }[invocation_policy] - return index if index - - cycle_index = registration.fetch(:cycle_index, 0) - registration[:cycle_index] = cycle_index < session_length - 1 ? cycle_index + 1 : 0 - cycle_index - end - - def procedure_already_registered(message) - Message::Error.new(Message::Type::REGISTER, message.request_id, {}, "wamp.error.procedure_already_exists") - end - - def no_such_procedure(message) - Message::Error.new(Message::Type::CALL, message.request_id, {}, "wamp.error.no_such_procedure") - end - - def create_registration_id(procedure) - id = rand(100_000..(2**53)) - if @registration_ids.include?(id) - create_registration_id(procedure) - else - @registration_ids[id] = procedure - id - end - end - end - end - end -end diff --git a/lib/wamp/router/session/base.rb b/lib/wamp/router/session/base.rb deleted file mode 100644 index 6260509..0000000 --- a/lib/wamp/router/session/base.rb +++ /dev/null @@ -1,72 +0,0 @@ -# frozen_string_literal: true - -module Wamp - module Router - module Session - # handle session establishment - class Base - REALMS = ["realm1"].freeze - attr_reader :hello - - def initialize(hello) - @hello = hello - end - - def authenticate(message) - return protocol_violation if message.instance_of?(Message::Hello) - - return unless auth_method == "ticket" - - return welcome_message if message.signature == "hello" - - send_abort - end - - def send_abort - Message::Abort.new({ message: "Not Authorized" }, "wamp.error.not_authorized") - end - - def protocol_violation - Message::Abort.new( - { message: "Received HELLO message after session was established" }, - "wamp.error.protocol_violation" - ) - end - - def handle_auth - realm = find_realm(hello.realm) - return realm_missing unless realm - - handle_correct_auth - end - - def handle_correct_auth - if auth_method == "ticket" - Message::Challenge.new("ticket", {}) - else - welcome_message - end - end - - def auth_method - authmethods = [*hello.details[:authmethods]] - authmethods.first - end - - def welcome_message - Message::Welcome.new(Router.create_identifier, { roles: { broker: {} } }) - end - - def realm_missing - Wamp::Message::Abort.new( - { message: "The realm does not exists." }, "wamp.error.no_such_realm" - ) - end - - def find_realm(realm) - realm if REALMS.include?(realm) - end - end - end - end -end