Skip to content

Commit

Permalink
Refactor bridges
Browse files Browse the repository at this point in the history
There's a lot to this and unfortunately it is in a single commit—it is a result of the exploration required to make this work.

Some notable changes:

* The reactor no longer has a selector. Instead, the scheduler has an event loop for managing io select across reactors. This simplifies the reactor to only dealing with routines. IO concerns are abstracted to both the event loop and the io routine.
* Bridges now build on top of the above to manage the existing io routine.
* The scheduler and routine now handle wakeups with a dedicated coordinator, implemented as a thread queue. This has proven to be fairly simple to work with and avoids the race conditions that conditions can have because the signal is persisted to the queue—if the coordinator is signals before the listener subscribes the listener will still receive the signal when it gets around to listening.
  • Loading branch information
bryanp committed May 5, 2023
1 parent 2537cfa commit 129c571
Show file tree
Hide file tree
Showing 17 changed files with 360 additions and 299 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ Goru::Scheduler.go(io: io, intent: :r) { |routine|

## Bridges

Goru supports coordinates buffered io using bridges:
Goru supports coordinated buffered io using bridges:

```ruby
writer = Goru::Channel.new
Expand All @@ -224,8 +224,8 @@ Goru::Scheduler.go(io: io, intent: :r) { |routine|

Using bridges, the io routine is only called again when two conditions are met:

1. The io object is writable.
2. The channel has data available for reading.
1. The io object matches the bridged intent (e.g. it is writable).
2. The channel is in the correct state to reciprocate the intent (e.g. it has data).

See the [server example](./examples/server.rb) for a more complete use-case.

Expand Down
6 changes: 4 additions & 2 deletions examples/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def initialize(channel:)
routine << value
routine.update(:sleep)
puts "wrote: #{value}"
else
end

if values.empty?
channel.close
routine.finished
end
Expand All @@ -55,7 +57,7 @@ def initialize(channel:)
loop do
if reader.received == writer.writable
break
elsif Time.now - start > 5
elsif Time.now - start > 10
fail "timed out"
else
sleep(0.1)
Expand Down
16 changes: 9 additions & 7 deletions examples/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

sleep(1)

puts "making requests..."
puts "got: #{HTTP.get("http://localhost:4242").status}"
puts "got: #{HTTP.get("http://localhost:4242").status}"
puts "got: #{HTTP.get("http://localhost:4242").status}"

puts "shutting down..."
server.stop
begin
puts "making requests..."
puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}"
puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}"
puts "got: #{HTTP.timeout(1).get("http://localhost:4242").status}"
ensure
puts "shutting down..."
server.stop
end
53 changes: 53 additions & 0 deletions lib/goru/bridge.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

module Goru
# [public]
#
class Bridge
def initialize(routine:, channel:)
@routine = routine
@channel = channel
@channel.add_observer(self)
update_status
end

# [public]
#
attr_reader :status

# [public]
#
private def set_status(status)
@status = status
status_changed
end

# [public]
#
def update_status
# noop
end

private def status_changed
case @status
when :ready
@routine.bridged
when :finished
@channel.remove_observer(self)
@routine.unbridge
end
end

def channel_received
update_status
end

def channel_read
update_status
end

def channel_closed
update_status
end
end
end
23 changes: 23 additions & 0 deletions lib/goru/bridges/readable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require_relative "../bridge"

module Goru
module Bridges
class Readable < Bridge
private def update_status
status = if @routine.status == :finished
:finished
elsif @channel.full?
:idle
elsif @channel.closed?
:finished
else
:ready
end

set_status(status)
end
end
end
end
23 changes: 23 additions & 0 deletions lib/goru/bridges/writable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require_relative "../bridge"

module Goru
module Bridges
class Writable < Bridge
private def update_status
status = if @routine.status == :finished
:finished
elsif @channel.any?
:ready
elsif @channel.closed?
:finished
else
:idle
end

set_status(status)
end
end
end
end
56 changes: 56 additions & 0 deletions lib/goru/io_event_loop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

require "nio"

module Goru
# [public]
#
class IOEventLoop
def initialize
@commands = []
@selector = NIO::Selector.new
@stopped = false
end

# [public]
#
def run
until @stopped
while (command = @commands.shift)
action, routine = command

case action
when :register
monitor = @selector.register(routine.io, routine.intent)
monitor.value = routine.method(:wakeup)
routine.monitor = monitor
when :deregister
routine.monitor = nil
routine.monitor&.close
end
end

@selector.select do |monitor|
monitor.value.call
end
end
ensure
@selector.close
end

# [public]
#
def stop
@stopped = true
@selector.wakeup
rescue IOError
end

# [public]
#
def <<(tuple)
@commands << tuple
@selector.wakeup
end
end
end
Loading

0 comments on commit 129c571

Please sign in to comment.