diff --git a/lib/wampproto/dealer.rb b/lib/wampproto/dealer.rb index 377e33a..3488e77 100644 --- a/lib/wampproto/dealer.rb +++ b/lib/wampproto/dealer.rb @@ -2,15 +2,16 @@ module Wampproto # Wamprpoto Dealer handler - class Dealer - attr_reader :registrations_by_procedure, :registrations_by_session, :pending_calls, :pending_invocations, :id_gen, + class Dealer # rubocop:disable Metrics/ClassLength + PendingInvocation = Struct.new(:caller_id, :callee_id, :call_id, :invocation_id, :receive_progress, :progress) + + attr_reader :registrations_by_procedure, :registrations_by_session, :pending_calls, :id_gen, :sessions def initialize(id_gen = IdGenerator.new) @registrations_by_session = {} @registrations_by_procedure = Hash.new { |h, k| h[k] = {} } @pending_calls = {} - @pending_invocations = {} @id_gen = id_gen @sessions = {} end @@ -63,23 +64,18 @@ def handle_call(session_id, message) # rubocop:disable Metrics/MethodLength, Met registration_id, callee_id = registrations.first - pending_calls[callee_id] = {} unless pending_calls.include?(callee_id) - pending_invocations[callee_id] = {} unless pending_invocations.include?(callee_id) - - # we received call from the "caller" lets call that request_id "1" - # we need to send invocation message to "callee" let call that request_id "10" - # we need "caller" id after we have received yield so that request_id will be "10" - # we need to send request to "caller" to the original request_id 1 request_id = id_gen.next - pending_invocations[callee_id][request_id] = session_id + details = invocation_details_for(session_id, message) - pending_calls[callee_id][session_id] = message.request_id + pending_calls[[callee_id, request_id]] = PendingInvocation.new( + session_id, callee_id, message.request_id, request_id, details[:receive_progress] + ) invocation = Message::Invocation.new( request_id, registration_id, - invocation_details_for(session_id, message), + details, *message.args, **message.kwargs ) @@ -88,26 +84,42 @@ def handle_call(session_id, message) # rubocop:disable Metrics/MethodLength, Met end def invocation_details_for(session_id, message) - return {} unless message.options.include?(:disclose_me) + options = {} + return options if message.options.empty? + + receive_progress = message.options[:receive_progress] + options.merge!(receive_progress: true) if receive_progress + + return options unless message.options.include?(:disclose_me) session = sessions[session_id] - { caller: session_id, caller_authid: session.authid, caller_authrole: session.authrole } + options.merge({ caller: session_id, caller_authid: session.authid, caller_authrole: session.authrole }) end - def handle_yield(session_id, message) - calls = pending_calls.fetch(session_id, {}) + def handle_yield(session_id, message) # rubocop:disable Metrics/AbcSize + pending_invocation = pending_calls[[session_id, message.request_id]] error_message = "no pending calls for session #{session_id}" - raise ValueError, error_message if calls.empty? + raise ValueError, error_message if pending_invocation.nil? - invocations = pending_invocations[session_id] - caller_id = invocations.delete(message.request_id).to_i # make steep happy + caller_id = pending_invocation.caller_id + request_id = pending_invocation.call_id + pending_calls.delete([session_id, message.request_id]) unless message.options[:progress] - request_id = calls.delete(caller_id) - - result = Message::Result.new(request_id, {}, *message.args, **message.kwargs) + result = Message::Result.new(request_id, result_details_for(session_id, message), *message.args, **message.kwargs) MessageWithRecipient.new(result, caller_id) end + def result_details_for(session_id, message) + options = {} + return options if message.options.empty? + + pending_invocation = pending_calls[[session_id, message.request_id]] + + progress = message.options[:progress] && pending_invocation.receive_progress + options.merge!(progress:) if progress + options + end + def handle_register(session_id, message) error_message = "cannot register, session #{session_id} doesn't exist" raise ValueError, error_message unless registrations_by_session.include?(session_id) diff --git a/lib/wampproto/session.rb b/lib/wampproto/session.rb index 54752b6..714c2f3 100644 --- a/lib/wampproto/session.rb +++ b/lib/wampproto/session.rb @@ -36,7 +36,7 @@ def send_message(msg) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, M raise ValueError, "cannot yield for unknown invocation request" end - invocation_requests.delete(msg.request_id) + invocation_requests.delete(msg.request_id) unless msg.options[:progress] when Message::Publish publish_requests[msg.request_id] = msg.request_id if msg.options.fetch(:acknowledge, false) when Message::Subscribe @@ -62,7 +62,8 @@ def receive_message(msg) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize case msg when Message::Result error_message = "received RESULT for invalid request_id" - raise ValueError, error_message unless call_requests.delete(msg.request_id) + request_id = msg.details[:progress] ? call_requests.fetch(msg.request_id) : call_requests.delete(msg.request_id) + raise ValueError, error_message unless request_id when Message::Registered error_message = "received REGISTERED for invalid request_id" raise ValueError, error_message unless register_requests.delete(msg.request_id) diff --git a/sig/wampproto/dealer.rbs b/sig/wampproto/dealer.rbs index d7f8adb..69f20d4 100644 --- a/sig/wampproto/dealer.rbs +++ b/sig/wampproto/dealer.rbs @@ -7,9 +7,9 @@ module Wampproto @sessions: Hash[Integer, SessionDetails] - @pending_calls: Hash[Integer, Hash[Integer, Integer]] + PendingInvocation: untyped - @pending_invocations: Hash[Integer, Hash[Integer, Integer]] + @pending_calls: Hash[[Integer, Integer], untyped] @id_gen: IdGenerator @@ -19,9 +19,7 @@ module Wampproto attr_reader sessions: Hash[Integer, SessionDetails] - attr_reader pending_calls: Hash[Integer, Hash[Integer, Integer]] - - attr_reader pending_invocations: Hash[Integer, Hash[Integer, Integer]] + attr_reader pending_calls: Hash[[Integer, Integer], untyped] attr_reader id_gen: IdGenerator @@ -44,6 +42,8 @@ module Wampproto def handle_unregister: (Integer session_id, Message::Unregister message) -> (nil | MessageWithRecipient) def invocation_details_for: (Integer session_id, untyped message) -> Hash[Symbol, untyped] + + def result_details_for: (Integer _session_id, untyped message) -> Hash[Symbol, untyped] end end diff --git a/spec/wampproto/dealer_spec.rb b/spec/wampproto/dealer_spec.rb index 7a6a59f..3bc520f 100644 --- a/spec/wampproto/dealer_spec.rb +++ b/spec/wampproto/dealer_spec.rb @@ -93,6 +93,30 @@ expect(call_response.message.details).to include(:caller) end + context "when progressive call is made" do + let(:args) { [2020, 2021, 2022, 2023] } + let(:progressive_call) do + Wampproto::Message::Call.new(request_id, { receive_progress: true }, procedure, args) + end + let(:call_response) { dealer.receive_message(caller_id, progressive_call) } + + it { is_expected.to be_instance_of Wampproto::Message::Invocation } + + it "sends progressive call result" do # rubocop:disable RSpec/ExampleLength, RSpec/MultipleExpectations + call_response + args.each_with_index do |_year, index| + progress_yield = Wampproto::Message::Yield.new(request_id + 1, { progress: true }, index + 1) + msg = dealer.receive_message(session_id, progress_yield).message + expect(msg).to be_instance_of(Wampproto::Message::Result) + expect(msg.details).to include(:progress) + end + progress_yield = Wampproto::Message::Yield.new(request_id + 1, {}) + msg = dealer.receive_message(session_id, progress_yield).message + expect(msg).to be_instance_of(Wampproto::Message::Result) + expect(msg.details).not_to include(:progress) + end + end + context "when calling unregistered procedure" do let(:call) { Wampproto::Message::Call.new(request_id, {}, "invalid.procedure", 1) }