From 129c5718d625baa98410cb34e39900aa211ebf46 Mon Sep 17 00:00:00 2001 From: Bryan Powell Date: Wed, 3 May 2023 08:05:47 -0700 Subject: [PATCH] Refactor bridges MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There's a lot to this and unfortunately it is in a single commit—it is a result of the exploration required to make this work. Some notable changes: * The reactor no longer has a selector. Instead, the scheduler has an event loop for managing io select across reactors. This simplifies the reactor to only dealing with routines. IO concerns are abstracted to both the event loop and the io routine. * Bridges now build on top of the above to manage the existing io routine. * The scheduler and routine now handle wakeups with a dedicated coordinator, implemented as a thread queue. This has proven to be fairly simple to work with and avoids the race conditions that conditions can have because the signal is persisted to the queue—if the coordinator is signals before the listener subscribes the listener will still receive the signal when it gets around to listening. --- README.md | 6 +- examples/channels.rb | 6 +- examples/server.rb | 16 ++- lib/goru/bridge.rb | 53 ++++++++ lib/goru/bridges/readable.rb | 23 ++++ lib/goru/bridges/writable.rb | 23 ++++ lib/goru/io_event_loop.rb | 56 ++++++++ lib/goru/reactor.rb | 189 +++++++------------------- lib/goru/routine.rb | 12 ++ lib/goru/routines/bridge.rb | 29 ---- lib/goru/routines/bridges/readable.rb | 31 ----- lib/goru/routines/bridges/writable.rb | 31 ----- lib/goru/routines/io.rb | 87 +++++++++--- lib/goru/scheduler.rb | 40 +++--- spec/features/io/bridging_spec.rb | 12 +- spec/features/io/non_blocking_spec.rb | 9 +- spec/support/server.rb | 36 +++-- 17 files changed, 360 insertions(+), 299 deletions(-) create mode 100644 lib/goru/bridge.rb create mode 100644 lib/goru/bridges/readable.rb create mode 100644 lib/goru/bridges/writable.rb create mode 100644 lib/goru/io_event_loop.rb delete mode 100644 lib/goru/routines/bridge.rb delete mode 100644 lib/goru/routines/bridges/readable.rb delete mode 100644 lib/goru/routines/bridges/writable.rb diff --git a/README.md b/README.md index e1b8fd6..25992d5 100644 --- a/README.md +++ b/README.md @@ -203,7 +203,7 @@ Goru::Scheduler.go(io: io, intent: :r) { |routine| ## Bridges -Goru supports coordinates buffered io using bridges: +Goru supports coordinated buffered io using bridges: ```ruby writer = Goru::Channel.new @@ -224,8 +224,8 @@ Goru::Scheduler.go(io: io, intent: :r) { |routine| Using bridges, the io routine is only called again when two conditions are met: -1. The io object is writable. -2. The channel has data available for reading. +1. The io object matches the bridged intent (e.g. it is writable). +2. The channel is in the correct state to reciprocate the intent (e.g. it has data). See the [server example](./examples/server.rb) for a more complete use-case. diff --git a/examples/channels.rb b/examples/channels.rb index 7c9f884..e2ec8cc 100644 --- a/examples/channels.rb +++ b/examples/channels.rb @@ -37,7 +37,9 @@ def initialize(channel:) routine << value routine.update(:sleep) puts "wrote: #{value}" - else + end + + if values.empty? channel.close routine.finished end @@ -55,7 +57,7 @@ def initialize(channel:) loop do if reader.received == writer.writable break - elsif Time.now - start > 5 + elsif Time.now - start > 10 fail "timed out" else sleep(0.1) diff --git a/examples/server.rb b/examples/server.rb index da0d541..9431c68 100644 --- a/examples/server.rb +++ b/examples/server.rb @@ -7,10 +7,12 @@ sleep(1) -puts "making requests..." -puts "got: #{HTTP.get("http://localhost:4242").status}" -puts "got: #{HTTP.get("http://localhost:4242").status}" -puts "got: #{HTTP.get("http://localhost:4242").status}" - -puts "shutting down..." -server.stop +begin + puts "making requests..." + puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}" + puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}" + puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}" +ensure + puts "shutting down..." + server.stop +end diff --git a/lib/goru/bridge.rb b/lib/goru/bridge.rb new file mode 100644 index 0000000..2d458e9 --- /dev/null +++ b/lib/goru/bridge.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module Goru + # [public] + # + class Bridge + def initialize(routine:, channel:) + @routine = routine + @channel = channel + @channel.add_observer(self) + update_status + end + + # [public] + # + attr_reader :status + + # [public] + # + private def set_status(status) + @status = status + status_changed + end + + # [public] + # + def update_status + # noop + end + + private def status_changed + case @status + when :ready + @routine.bridged + when :finished + @channel.remove_observer(self) + @routine.unbridge + end + end + + def channel_received + update_status + end + + def channel_read + update_status + end + + def channel_closed + update_status + end + end +end diff --git a/lib/goru/bridges/readable.rb b/lib/goru/bridges/readable.rb new file mode 100644 index 0000000..c2278a8 --- /dev/null +++ b/lib/goru/bridges/readable.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require_relative "../bridge" + +module Goru + module Bridges + class Readable < Bridge + private def update_status + status = if @routine.status == :finished + :finished + elsif @channel.full? + :idle + elsif @channel.closed? + :finished + else + :ready + end + + set_status(status) + end + end + end +end diff --git a/lib/goru/bridges/writable.rb b/lib/goru/bridges/writable.rb new file mode 100644 index 0000000..c1abc5b --- /dev/null +++ b/lib/goru/bridges/writable.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require_relative "../bridge" + +module Goru + module Bridges + class Writable < Bridge + private def update_status + status = if @routine.status == :finished + :finished + elsif @channel.any? + :ready + elsif @channel.closed? + :finished + else + :idle + end + + set_status(status) + end + end + end +end diff --git a/lib/goru/io_event_loop.rb b/lib/goru/io_event_loop.rb new file mode 100644 index 0000000..12b03a4 --- /dev/null +++ b/lib/goru/io_event_loop.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "nio" + +module Goru + # [public] + # + class IOEventLoop + def initialize + @commands = [] + @selector = NIO::Selector.new + @stopped = false + end + + # [public] + # + def run + until @stopped + while (command = @commands.shift) + action, routine = command + + case action + when :register + monitor = @selector.register(routine.io, routine.intent) + monitor.value = routine.method(:wakeup) + routine.monitor = monitor + when :deregister + routine.monitor = nil + routine.monitor&.close + end + end + + @selector.select do |monitor| + monitor.value.call + end + end + ensure + @selector.close + end + + # [public] + # + def stop + @stopped = true + @selector.wakeup + rescue IOError + end + + # [public] + # + def <<(tuple) + @commands << tuple + @selector.wakeup + end + end +end diff --git a/lib/goru/reactor.rb b/lib/goru/reactor.rb index e8294e5..f6af985 100644 --- a/lib/goru/reactor.rb +++ b/lib/goru/reactor.rb @@ -1,11 +1,8 @@ # frozen_string_literal: true -require "nio" - require "timers/group" require "timers/wait" -require_relative "routines/bridge" require_relative "routines/io" module Goru @@ -16,17 +13,11 @@ def initialize(queue:, scheduler:) @queue = queue @scheduler = scheduler @routines = Set.new - @bridges = Set.new @timers = Timers::Group.new - @selector = NIO::Selector.new @stopped = false @status = nil - @mutex = Mutex.new - @cleanup = { - bridges: [], - monitors: [], - routines: [] - } + @coordinator = Thread::Queue.new + @commands = [] end # [public] @@ -39,138 +30,80 @@ def run set_status(:running) until @stopped - cleanup + while (command = @commands.shift) + action, routine = command + + case action + when :adopt + routine.reactor = self + @routines << routine + routine.adopted + when :cleanup + @routines.delete(routine) + end + end @routines.each do |routine| - call_routine(routine) + routine.call if routine.ready? end begin - wait_for_routine(block: false) + if (routine = @queue.pop(true)) + adopt_routine(routine) + end rescue ThreadError interval = @timers.wait_interval - if interval.nil? - if @routines.empty? - if @selector.empty? - become_idle - else - wait_for_bridge do - wait_for_selector - end - end - else - wait_for_bridge do - wait_for_selector(0) - end - end - elsif interval > 0 - if @selector.empty? - wait_for_interval(interval) - else - wait_for_bridge(interval) do - wait_for_selector(interval) + if interval.nil? && @routines.empty? + set_status(:idle) + @scheduler.signal + wait; set_status(:running) + else + if interval.nil? + wait unless @routines.any?(&:ready?) + elsif interval > 0 + Timers::Wait.for(interval) do |remaining| + break if wait(timeout: remaining) end end end - - @timers.fire end + + @timers.fire end ensure - @selector.close + @timers.cancel + @coordinator.close set_status(:finished) end - private def cleanup - while (routine = @cleanup[:bridges].shift) - @bridges.delete(routine) - end - - while (routine = @cleanup[:monitors].shift) - routine.monitor.close - end - - while (routine = @cleanup[:routines].shift) - @routines.delete(routine) - end - end - - private def become_idle - set_status(:idle) - @scheduler.signal(self) - wait_for_routine - set_status(:running) - end - - private def wait_for_selector(timeout = nil) - @selector.select(timeout) do |monitor| - # The routine is called directly rather than on the next tick of `run`. - # This works because io routines are not added to `@routines` like non-io routines are. - # - monitor.value.call - end - end - - private def wait_for_bridge(interval = nil) - if @bridges.any?(&:applicable?) && @bridges.none?(&:ready?) - if interval.nil? - wait_for_routine - elsif interval > 0 - wait_for_interval(interval) - end - else - yield - end - end - - private def wait_for_interval(timeout) - Timers::Wait.for(timeout) do |remaining| - break if wait_for_routine(timeout: remaining) - rescue ThreadError - # nothing to do - end - end - - private def wait_for_routine(block: true, timeout: nil) + private def wait(timeout: nil) if timeout - if (routine = @queue.pop(timeout: timeout)) - adopt_routine(routine) - end - elsif (routine = @queue.pop(!block)) - adopt_routine(routine) + @coordinator.pop(timeout: timeout) + else + @coordinator.pop end end # [public] # def finished? - @mutex.synchronize do - @status == :idle || @status == :stopped - end - end - - # [public] - # - def signal - unless @selector.empty? - @selector.wakeup - end + @status == :idle || @status == :stopped end # [public] # def wakeup - signal - @queue << :wakeup + @coordinator << :wakeup end # [public] # def stop @stopped = true - @selector.wakeup - rescue IOError + wakeup + rescue ClosedQueueError + # nothing to do end # [public] @@ -184,48 +117,22 @@ def asleep_for(seconds) # [public] # def adopt_routine(routine) - case routine - when Routines::IO - monitor = @selector.register(routine.io, routine.intent) - monitor.value = routine - routine.monitor = monitor - routine.reactor = self - when Routines::Bridge - routine.reactor = self - @bridges << routine - when Routine - routine.reactor = self - @routines << routine - end + command(:adopt, routine) end # [public] # def routine_finished(routine) - cleanup_key = case routine - when Routines::Bridge - :bridges - when Routines::IO - :monitors - else - :routines - end - - @cleanup[cleanup_key] << routine - signal + command(:cleanup, routine) end - private def set_status(status) - @mutex.synchronize do - @status = status - end + private def command(action, routine) + @commands << [action, routine] + wakeup end - private def call_routine(routine) - case routine.status - when :ready - routine.call - end + private def set_status(status) + @status = status end end end diff --git a/lib/goru/routine.rb b/lib/goru/routine.rb index 6ded37a..ee1aa80 100644 --- a/lib/goru/routine.rb +++ b/lib/goru/routine.rb @@ -85,6 +85,12 @@ def sleep(seconds) end end + # [public] + # + def ready? + @status == :ready + end + # [public] # private def set_status(status) @@ -100,5 +106,11 @@ def sleep(seconds) @reactor&.routine_finished(self) end end + + # [public] + # + def adopted + # noop + end end end diff --git a/lib/goru/routines/bridge.rb b/lib/goru/routines/bridge.rb deleted file mode 100644 index 0787086..0000000 --- a/lib/goru/routines/bridge.rb +++ /dev/null @@ -1,29 +0,0 @@ -# frozen_string_literal: true - -require_relative "channel" - -module Goru - module Routines - # [public] - # - class Bridge < Channel - def initialize(routine:, channel:) - @routine = routine - - super(channel: channel) - end - - # [public] - # - def ready? - @status == :ready - end - - # [public] - # - def call - # noop - end - end - end -end diff --git a/lib/goru/routines/bridges/readable.rb b/lib/goru/routines/bridges/readable.rb deleted file mode 100644 index a06544a..0000000 --- a/lib/goru/routines/bridges/readable.rb +++ /dev/null @@ -1,31 +0,0 @@ -# frozen_string_literal: true - -require_relative "../bridge" - -module Goru - module Routines - module Bridges - class Readable < Bridge - # [public] - # - def applicable? - @routine.intent == :r - end - - private def update_status - status = if @routine.status == :finished - :finished - elsif @channel.full? - :idle - elsif @channel.closed? - :finished - else - :ready - end - - set_status(status) - end - end - end - end -end diff --git a/lib/goru/routines/bridges/writable.rb b/lib/goru/routines/bridges/writable.rb deleted file mode 100644 index 7bbf697..0000000 --- a/lib/goru/routines/bridges/writable.rb +++ /dev/null @@ -1,31 +0,0 @@ -# frozen_string_literal: true - -require_relative "../bridge" - -module Goru - module Routines - module Bridges - class Writable < Bridge - # [public] - # - def applicable? - @routine.intent == :w - end - - private def update_status - status = if @routine.status == :finished - :finished - elsif @channel.any? - :ready - elsif @channel.closed? - :finished - else - :idle - end - - set_status(status) - end - end - end - end -end diff --git a/lib/goru/routines/io.rb b/lib/goru/routines/io.rb index 728cf89..c0c89b4 100644 --- a/lib/goru/routines/io.rb +++ b/lib/goru/routines/io.rb @@ -1,22 +1,22 @@ # frozen_string_literal: true require_relative "../routine" -require_relative "bridges/readable" -require_relative "bridges/writable" +require_relative "../bridges/readable" +require_relative "../bridges/writable" module Goru module Routines # [public] # class IO < Routine - def initialize(state = nil, io:, intent:, &block) + def initialize(state = nil, io:, intent:, event_loop:, &block) super(state, &block) @io = io @intent = normalize_intent(intent) - @status = :selecting + @event_loop = event_loop + @status = :orphaned @monitor = nil - @finishers = [] end # [public] @@ -25,10 +25,52 @@ def initialize(state = nil, io:, intent:, &block) attr_accessor :monitor + # [public] + # + def adopted + set_status(:ready) + end + + # [public] + # + def wakeup + # Keep this io from being selected again until the underlying routine is called. + # Interests are reset in `#call`. + # + @monitor&.interests = nil + + set_status(:io_ready) + @reactor.wakeup + end + + READY_STATUSES = [:io_ready, :ready].freeze + READY_BRIDGE_STATUSES = [nil, :ready].freeze + + # [public] + # + def ready? + READY_STATUSES.include?(@status) && READY_BRIDGE_STATUSES.include?(@bridge&.status) + end + + def call + super + + @monitor&.interests = @intent + end + # [public] # def accept @io.accept_nonblock + rescue Errno::EAGAIN + handle_io_error; nil + rescue Errno::ECONNRESET + finished; nil + end + + def handle_io_error + set_status(:selecting) + @event_loop << [:register, self] unless @monitor end # [public] @@ -45,9 +87,10 @@ def read(bytes) else result end + rescue Errno::EAGAIN + handle_io_error; nil rescue Errno::ECONNRESET - finished - nil + finished; nil end # [public] @@ -64,9 +107,10 @@ def write(data) else result end + rescue Errno::EAGAIN + handle_io_error; nil rescue Errno::ECONNRESET - finished - nil + finished; nil end # [public] @@ -75,27 +119,26 @@ def intent=(intent) intent = normalize_intent(intent) validate_intent!(intent) - @monitor.interests = intent + @monitor&.interests = intent @intent = intent end # [public] # def bridge(state = nil, intent:, channel:, &block) + raise "routine is already bridged" if @bridge + intent = normalize_intent(intent) validate_intent!(intent) self.intent = intent - bridge = case intent + @bridge = case intent when :r Bridges::Readable.new(routine: self, channel: channel) when :w Bridges::Writable.new(routine: self, channel: channel) end - on_finished { bridge.finished } - @reactor.adopt_routine(bridge) - routine = case intent when :r Routines::Channels::Readable.new(state, channel: channel, &block) @@ -104,21 +147,29 @@ def bridge(state = nil, intent:, channel:, &block) end @reactor.adopt_routine(routine) - @reactor.signal + @reactor.wakeup routine end # [public] # - def on_finished(&block) - @finishers << block + def bridged + @reactor.wakeup + end + + # [public] + # + def unbridge + @bridge = nil + @reactor.wakeup end private def status_changed case @status when :finished - @finishers.each(&:call) + @event_loop << [:deregister, self] + @bridge&.finished end super diff --git a/lib/goru/scheduler.rb b/lib/goru/scheduler.rb index e0671af..15a8316 100644 --- a/lib/goru/scheduler.rb +++ b/lib/goru/scheduler.rb @@ -4,6 +4,7 @@ require "is/global" require_relative "channel" +require_relative "io_event_loop" require_relative "reactor" require_relative "routines/channel" require_relative "routines/io" @@ -16,7 +17,6 @@ module Goru # class Scheduler include Is::Global - include MonitorMixin class << self # Prevent issues when including `Goru` at the toplevel. @@ -37,9 +37,9 @@ def default_scheduler_count def initialize(count: self.class.default_scheduler_count) super() - @stopping = false + @stopped = false @routines = Thread::Queue.new - @condition = new_cond + @coordinator = Thread::Queue.new @reactors = count.times.map { Reactor.new(queue: @routines, scheduler: self) @@ -52,6 +52,11 @@ def initialize(count: self.class.default_scheduler_count) end } } + + @io_event_loop = IOEventLoop.new + @threads << Thread.new { + @io_event_loop.run + } end # [public] @@ -60,7 +65,7 @@ def go(state = nil, io: nil, channel: nil, intent: nil, &block) raise ArgumentError, "cannot set both `io` and `channel`" if io && channel routine = if io - Routines::IO.new(state, io: io, intent: intent, &block) + Routines::IO.new(state, io: io, intent: intent, event_loop: @io_event_loop, &block) elsif channel case intent when :r @@ -73,7 +78,7 @@ def go(state = nil, io: nil, channel: nil, intent: nil, &block) end @routines << routine - @reactors.each(&:signal) + @reactors.each(&:wakeup) routine end @@ -81,10 +86,8 @@ def go(state = nil, io: nil, channel: nil, intent: nil, &block) # [public] # def wait - synchronize do - @condition.wait_until do - @stopping - end + until @stopped + @coordinator.pop end rescue Interrupt ensure @@ -94,22 +97,25 @@ def wait # [public] # def stop - @stopping = true + @stopped = true @routines.close + @io_event_loop.stop @reactors.each(&:stop) @threads.each(&:join) end # [public] # - def signal(reactor) - synchronize do - if @reactors.all?(&:finished?) - @stopping = true - end - - @condition.signal + def signal + if @reactors.all?(&:finished?) + @stopped = true end + + wakeup + end + + def wakeup + @coordinator << :wakeup end end end diff --git a/spec/features/io/bridging_spec.rb b/spec/features/io/bridging_spec.rb index 1759af0..18fd811 100644 --- a/spec/features/io/bridging_spec.rb +++ b/spec/features/io/bridging_spec.rb @@ -18,11 +18,13 @@ # wait a second for the server to start sleep(0.25) - statuses = 3.times.map { - HTTP.get("http://localhost:4242").status.to_i - } - - server.stop + begin + statuses = 3.times.map { + HTTP.timeout(1).get("http://localhost:4242").status.to_i + } + ensure + server.stop + end expect(statuses.count).to eq(3) expect(statuses.uniq.count).to eq(1) diff --git a/spec/features/io/non_blocking_spec.rb b/spec/features/io/non_blocking_spec.rb index dbff77b..b882cac 100644 --- a/spec/features/io/non_blocking_spec.rb +++ b/spec/features/io/non_blocking_spec.rb @@ -14,7 +14,9 @@ def initialize end def start - @routine = @scheduler.go(io: TCPServer.new("localhost", 4242), intent: :r) { |server_routine| + @server = TCPServer.new("localhost", 4243) + + @routine = @scheduler.go(io: @server, intent: :r) { |server_routine| if (client_io = server_routine.accept) state = {delegate: Delegate.new} state[:parser] = LLHttp::Parser.new(state[:delegate]) @@ -29,6 +31,7 @@ def start client_routine.state[:delegate].reset client_routine.state[:parser].reset + client_routine.finished end end } @@ -38,6 +41,8 @@ def start def stop @routine.finished + @scheduler.stop + @server.close end end @@ -53,7 +58,7 @@ def stop sleep(0.25) statuses = 100.times.map { - HTTP.get("http://localhost:4242").status.to_i + HTTP.get("http://localhost:4243").status.to_i } server.stop diff --git a/spec/support/server.rb b/spec/support/server.rb index 4e8cc29..213fe7b 100644 --- a/spec/support/server.rb +++ b/spec/support/server.rb @@ -32,7 +32,7 @@ def accept(routine:) writer = Goru::Channel.new state = {delegate: delegate, parser: parser, writer: writer} - go(state, io: client_io, intent: :r) { |client_routine| + go(state, io: client_io, intent: :r) do |client_routine| client_routine.debug = true case client_routine.intent @@ -41,7 +41,13 @@ def accept(routine:) when :w write(routine: client_routine) end - } + rescue => error + $stderr << "!!! #{error}\n" + $stderr << "#{error.backtrace.join("\n")}\n" + + client_io.close + client_routine.finished + end end end @@ -62,26 +68,30 @@ def write(routine:) routine.state[:delegate].reset routine.state[:parser].reset routine.finished + else + fail "tried to write but no data was available" end end def dispatch(routine:) writer = routine.state[:writer] - routine.bridge({mode: :sleep}, intent: :w, channel: writer) { |bridge_routine| + data = [ + "HTTP/1.1 204 No Content\r\n", + "Content-Length: 0\r\n", + "\r\n", + ] + + routine.bridge(intent: :w, channel: writer) { |bridge_routine| bridge_routine.debug = true - case bridge_routine.state[:mode] - when :sleep - bridge_routine.sleep(rand) - bridge_routine.state[:mode] = :write - else - bridge_routine << <<~RESPONSE - HTTP/1.1 204 No Content\r - Content-Length: 0\r - \r - RESPONSE + # Write data 5% of the time... + # + if 5 >= rand(1..100) + bridge_routine << data.shift + end + if data.empty? bridge_routine.finished writer.close end