Skip to content

Commit

Permalink
Expose synchronize and flush for better control over native IO.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 21, 2024
1 parent f80180b commit 11e42e4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
15 changes: 13 additions & 2 deletions lib/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,14 @@ def ignore_frame?(frame)
end
end

def synchronize
yield
end

# Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
def read_frame
frame = @framer.read_frame(@local_settings.maximum_frame_size)

# puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})"
# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"

Expand Down Expand Up @@ -207,12 +212,18 @@ def receive_goaway(frame)
end

def write_frame(frame)
@framer.write_frame(frame)
synchronize do
@framer.write_frame(frame)
@framer.flush
end
end

def write_frames
if @framer
yield @framer
synchronize do
yield @framer
@framer.flush
end
else
raise EOFError, "Connection closed!"
end
Expand Down
16 changes: 10 additions & 6 deletions lib/protocol/http2/framer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def initialize(stream, frames = FRAMES)
@frames = frames
end

def flush
@stream.flush
end

def close
@stream.close
end
Expand Down Expand Up @@ -69,7 +73,7 @@ def read_frame(maximum_frame_size = MAXIMUM_ALLOWED_FRAME_SIZE)
# Read the header:
length, type, flags, stream_id = read_header

# Async.logger.debug(self) {"read_frame: length=#{length} type=#{type} flags=#{flags} stream_id=#{stream_id} -> klass=#{@frames[type].inspect}"}
# Console.debug(self) {"read_frame: length=#{length} type=#{type} flags=#{flags} stream_id=#{stream_id} -> klass=#{@frames[type].inspect}"}

# Allocate the frame:
klass = @frames[type] || Frame
Expand All @@ -78,19 +82,19 @@ def read_frame(maximum_frame_size = MAXIMUM_ALLOWED_FRAME_SIZE)
# Read the payload:
frame.read(@stream, maximum_frame_size)

# Async.logger.debug(self, name: "read") {frame.inspect}
# Console.debug(self, name: "read") {frame.inspect}

return frame
end

# Write a frame to the underlying IO.
# After writing one or more frames, you should call flush to ensure the frames are sent to the remote peer.
# @parameter frame [Frame] the frame to write.
def write_frame(frame)
# Async.logger.debug(self, name: "write") {frame.inspect}
# Console.debug(self, name: "write") {frame.inspect}

frame.write(@stream)

# Don't call @stream.flush here because it can cause significant contention if there is a semaphore around this method.
# @stream.flush

return frame
end

Expand Down

0 comments on commit 11e42e4

Please sign in to comment.