Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progressive calls and ability to handle concurrent requests #42

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions lib/wampproto/dealer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

module Wampproto
# Wamprpoto Dealer handler
class Dealer
attr_reader :registrations_by_procedure, :registrations_by_session, :pending_calls, :pending_invocations, :id_gen,
class Dealer # rubocop:disable Metrics/ClassLength
PendingInvocation = Struct.new(:caller_id, :callee_id, :call_id, :invocation_id, :receive_progress, :progress)

attr_reader :registrations_by_procedure, :registrations_by_session, :pending_calls, :id_gen,
:sessions

def initialize(id_gen = IdGenerator.new)
@registrations_by_session = {}
@registrations_by_procedure = Hash.new { |h, k| h[k] = {} }
@pending_calls = {}
@pending_invocations = {}
@id_gen = id_gen
@sessions = {}
end
Expand Down Expand Up @@ -63,23 +64,18 @@ def handle_call(session_id, message) # rubocop:disable Metrics/MethodLength, Met

registration_id, callee_id = registrations.first

pending_calls[callee_id] = {} unless pending_calls.include?(callee_id)
pending_invocations[callee_id] = {} unless pending_invocations.include?(callee_id)

# we received call from the "caller" lets call that request_id "1"
# we need to send invocation message to "callee" let call that request_id "10"
# we need "caller" id after we have received yield so that request_id will be "10"
# we need to send request to "caller" to the original request_id 1
request_id = id_gen.next

pending_invocations[callee_id][request_id] = session_id
details = invocation_details_for(session_id, message)

pending_calls[callee_id][session_id] = message.request_id
pending_calls[[callee_id, request_id]] = PendingInvocation.new(
session_id, callee_id, message.request_id, request_id, details[:receive_progress]
)

invocation = Message::Invocation.new(
request_id,
registration_id,
invocation_details_for(session_id, message),
details,
*message.args,
**message.kwargs
)
Expand All @@ -88,26 +84,42 @@ def handle_call(session_id, message) # rubocop:disable Metrics/MethodLength, Met
end

def invocation_details_for(session_id, message)
return {} unless message.options.include?(:disclose_me)
options = {}
return options if message.options.empty?

receive_progress = message.options[:receive_progress]
options.merge!(receive_progress: true) if receive_progress

return options unless message.options.include?(:disclose_me)

session = sessions[session_id]
{ caller: session_id, caller_authid: session.authid, caller_authrole: session.authrole }
options.merge({ caller: session_id, caller_authid: session.authid, caller_authrole: session.authrole })
end

def handle_yield(session_id, message)
calls = pending_calls.fetch(session_id, {})
def handle_yield(session_id, message) # rubocop:disable Metrics/AbcSize
pending_invocation = pending_calls[[session_id, message.request_id]]
error_message = "no pending calls for session #{session_id}"
raise ValueError, error_message if calls.empty?
raise ValueError, error_message if pending_invocation.nil?

invocations = pending_invocations[session_id]
caller_id = invocations.delete(message.request_id).to_i # make steep happy
caller_id = pending_invocation.caller_id
request_id = pending_invocation.call_id
pending_calls.delete([session_id, message.request_id]) unless message.options[:progress]

request_id = calls.delete(caller_id)

result = Message::Result.new(request_id, {}, *message.args, **message.kwargs)
result = Message::Result.new(request_id, result_details_for(session_id, message), *message.args, **message.kwargs)
MessageWithRecipient.new(result, caller_id)
end

def result_details_for(session_id, message)
options = {}
return options if message.options.empty?

pending_invocation = pending_calls[[session_id, message.request_id]]

progress = message.options[:progress] && pending_invocation.receive_progress
options.merge!(progress:) if progress
options
end

def handle_register(session_id, message)
error_message = "cannot register, session #{session_id} doesn't exist"
raise ValueError, error_message unless registrations_by_session.include?(session_id)
Expand Down
5 changes: 3 additions & 2 deletions lib/wampproto/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def send_message(msg) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, M
raise ValueError, "cannot yield for unknown invocation request"
end

invocation_requests.delete(msg.request_id)
invocation_requests.delete(msg.request_id) unless msg.options[:progress]
when Message::Publish
publish_requests[msg.request_id] = msg.request_id if msg.options.fetch(:acknowledge, false)
when Message::Subscribe
Expand All @@ -62,7 +62,8 @@ def receive_message(msg) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
case msg
when Message::Result
error_message = "received RESULT for invalid request_id"
raise ValueError, error_message unless call_requests.delete(msg.request_id)
request_id = msg.details[:progress] ? call_requests.fetch(msg.request_id) : call_requests.delete(msg.request_id)
raise ValueError, error_message unless request_id
when Message::Registered
error_message = "received REGISTERED for invalid request_id"
raise ValueError, error_message unless register_requests.delete(msg.request_id)
Expand Down
10 changes: 5 additions & 5 deletions sig/wampproto/dealer.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ module Wampproto

@sessions: Hash[Integer, SessionDetails]

@pending_calls: Hash[Integer, Hash[Integer, Integer]]
PendingInvocation: untyped

@pending_invocations: Hash[Integer, Hash[Integer, Integer]]
@pending_calls: Hash[[Integer, Integer], untyped]

@id_gen: IdGenerator

Expand All @@ -19,9 +19,7 @@ module Wampproto

attr_reader sessions: Hash[Integer, SessionDetails]

attr_reader pending_calls: Hash[Integer, Hash[Integer, Integer]]

attr_reader pending_invocations: Hash[Integer, Hash[Integer, Integer]]
attr_reader pending_calls: Hash[[Integer, Integer], untyped]

attr_reader id_gen: IdGenerator

Expand All @@ -44,6 +42,8 @@ module Wampproto
def handle_unregister: (Integer session_id, Message::Unregister message) -> (nil | MessageWithRecipient)

def invocation_details_for: (Integer session_id, untyped message) -> Hash[Symbol, untyped]

def result_details_for: (Integer _session_id, untyped message) -> Hash[Symbol, untyped]
end
end

24 changes: 24 additions & 0 deletions spec/wampproto/dealer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,30 @@
expect(call_response.message.details).to include(:caller)
end

context "when progressive call is made" do
let(:args) { [2020, 2021, 2022, 2023] }
let(:progressive_call) do
Wampproto::Message::Call.new(request_id, { receive_progress: true }, procedure, args)
end
let(:call_response) { dealer.receive_message(caller_id, progressive_call) }

it { is_expected.to be_instance_of Wampproto::Message::Invocation }

it "sends progressive call result" do # rubocop:disable RSpec/ExampleLength, RSpec/MultipleExpectations
call_response
args.each_with_index do |_year, index|
progress_yield = Wampproto::Message::Yield.new(request_id + 1, { progress: true }, index + 1)
msg = dealer.receive_message(session_id, progress_yield).message
expect(msg).to be_instance_of(Wampproto::Message::Result)
expect(msg.details).to include(:progress)
end
progress_yield = Wampproto::Message::Yield.new(request_id + 1, {})
msg = dealer.receive_message(session_id, progress_yield).message
expect(msg).to be_instance_of(Wampproto::Message::Result)
expect(msg.details).not_to include(:progress)
end
end

context "when calling unregistered procedure" do
let(:call) { Wampproto::Message::Call.new(request_id, {}, "invalid.procedure", 1) }

Expand Down