Skip to content

Commit

Permalink
Make HTTP2StreamChannel generic over a message type (#218)
Browse files Browse the repository at this point in the history
Motivation:

As part of #214 we need to make `HTTP2StreamChannel` generic over the type
of message it expects to read and write. This allows us to define an
`HTTP2StreamChannel` which uses `HTTP2Frame.FramePayload` as its
currency type, rather than `HTTP2Frame`.

Modifications:

- Make `HTTP2StreamChannel` generic over `Message` which is constrained
  by conformance to `HTTP2FrameConvertible` and
  `HTTP2FramePayloadConvertible`
- `HTTP2StreamChannel` now expects to read in `Message`s (as opposed to
  `HTTP2Frame`s) and have `Message`s written in to it.
- Added typealiases for frame and payload based stream channels.
- Added support for the payload based channel in
  `MultiplexerAbstractChannel` (although one can't be created yet).

Result:

We can create payload based stream channels.

Co-authored-by: Cory Benfield <[email protected]>
  • Loading branch information
glbrntt and Lukasa authored Jul 29, 2020
1 parent 04706e7 commit cde820e
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 59 deletions.
8 changes: 8 additions & 0 deletions Sources/NIOHTTP2/HTTP2Frame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ public struct HTTP2Frame {
}

extension HTTP2Frame: HTTP2FrameConvertible, HTTP2FramePayloadConvertible {
init(http2Frame: HTTP2Frame) {
self = http2Frame
}

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
assert(self.streamID == streamID, "streamID does not match")
return self
Expand All @@ -264,6 +268,10 @@ extension HTTP2Frame.FramePayload: HTTP2FrameConvertible, HTTP2FramePayloadConve
return self
}

init(http2Frame: HTTP2Frame) {
self = http2Frame.payload
}

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
return HTTP2Frame(streamID: streamID, payload: self)
}
Expand Down
39 changes: 39 additions & 0 deletions Sources/NIOHTTP2/HTTP2FrameConvertible.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
//===----------------------------------------------------------------------===//

protocol HTTP2FrameConvertible {
/// Initialize `Self` from an `HTTP2Frame`.
init(http2Frame: HTTP2Frame)

/// Makes an `HTTPFrame` with the given `streamID`.
///
/// - Parameter streamID: The `streamID` to use when constructing the frame.
Expand All @@ -23,3 +26,39 @@ protocol HTTP2FramePayloadConvertible {
/// Makes a `HTTP2Frame.FramePayload`.
var payload: HTTP2Frame.FramePayload { get }
}

extension HTTP2FrameConvertible where Self: HTTP2FramePayloadConvertible {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
var estimatedFrameSize: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
82 changes: 34 additions & 48 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ private enum StreamChannelState {
}
}

/// An `HTTP2StreamChannel` which deals in `HTTPFrame`s.
typealias HTTP2FrameBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame>

final class HTTP2StreamChannel: Channel, ChannelCore {
/// An `HTTP2StreamChannel` which reads and writes `HTTPFrame.FramePayload`s.
typealias HTTP2PayloadBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame.FramePayload>

final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2FrameConvertible>: Channel, ChannelCore {
internal init(allocator: ByteBufferAllocator,
parent: Channel,
multiplexer: HTTP2StreamMultiplexer,
Expand Down Expand Up @@ -350,7 +355,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
/// In the future this buffer will be used to manage interactions with read() and even, one day,
/// with flow control. For now, though, all this does is hold frames until we have set the
/// channel up.
private var pendingReads: CircularBuffer<HTTP2Frame> = CircularBuffer(initialCapacity: 8)
private var pendingReads: CircularBuffer<Message> = CircularBuffer(initialCapacity: 8)

/// Whether `autoRead` is enabled. By default, all `HTTP2StreamChannel` objects inherit their `autoRead`
/// state from their parent.
Expand All @@ -364,7 +369,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
///
/// To correctly respect flushes, we deliberately withold frames from the parent channel until this
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
private var pendingWrites: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)
private var pendingWrites: MarkedCircularBuffer<(Message, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

/// A list node used to hold stream channels.
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()
Expand All @@ -390,13 +395,13 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

let frame = self.unwrapData(data, as: HTTP2Frame.self)
let outbound = self.unwrapData(data, as: Message.self)

// We need a promise to attach our flow control callback to.
// Regardless of whether the write succeeded or failed, we don't count
// the bytes any longer.
let promise = promise ?? self.eventLoop.makePromise()
let writeSize = frame.bufferBytes
let writeSize = outbound.estimatedFrameSize

// Right now we deal with this math by just attaching a callback to all promises. This is going
// to be annoyingly expensive, but for now it's the most straightforward approach.
Expand All @@ -405,7 +410,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.changeWritability(to: value)
}
}
self.pendingWrites.append((frame, promise))
self.pendingWrites.append((outbound, promise))

// Ok, we can make an outcall now, which means we can safely deal with the flow control.
if case .changed(newValue: let value) = self.writabilityManager.bufferedBytes(writeSize) {
Expand Down Expand Up @@ -511,7 +516,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.succeed(())
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
if let streamID = self.streamID {
self.multiplexer.childChannelClosed(streamID: streamID)
} else {
self.multiplexer.childChannelClosed(channelID: ObjectIdentifier(self))
}
}
}

Expand All @@ -532,7 +541,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.fail(error)
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
if let streamID = self.streamID {
self.multiplexer.childChannelClosed(streamID: streamID)
} else {
self.multiplexer.childChannelClosed(channelID: ObjectIdentifier(self))
}
}
}

Expand Down Expand Up @@ -606,9 +619,16 @@ private extension HTTP2StreamChannel {
return
}

// Get a streamID from the multiplexer if we haven't got one already.
if self.streamID == nil {
self.streamID = self.multiplexer.requestStreamID()
}

while self.pendingWrites.hasMark {
let write = self.pendingWrites.removeFirst()
self.receiveOutboundFrame(write.0, promise: write.1)
let (outbound, promise) = self.pendingWrites.removeFirst()
// This unwrap is okay: we just ensured that `self.streamID` was set above.
let frame = outbound.makeHTTP2Frame(streamID: self.streamID!)
self.receiveOutboundFrame(frame, promise: promise)
}
self.multiplexer.childChannelFlush()
}
Expand All @@ -634,18 +654,20 @@ internal extension HTTP2StreamChannel {
return
}

let message = Message(http2Frame: frame)

if self.unsatisfiedRead {
// We don't need to account for this frame in the window manager: it's being delivered
// straight into the pipeline.
self.pipeline.fireChannelRead(NIOAny(frame))
self.pipeline.fireChannelRead(NIOAny(message))
} else {
// Record the size of the frame so that when we receive a window update event our
// calculation on whether we emit a WINDOW_UPDATE frame is based on the bytes we have
// actually delivered into the pipeline.
if case .data(let dataPayload) = frame.payload {
self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes)
}
self.pendingReads.append(frame)
self.pendingReads.append(message)
}
}

Expand Down Expand Up @@ -744,39 +766,3 @@ extension HTTP2StreamChannel {
return "HTTP2StreamChannel(streamID: \(String(describing: self.streamID)), isActive: \(self.isActive), isWritable: \(self.isWritable))"
}
}

extension HTTP2Frame {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
fileprivate var bufferBytes: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
12 changes: 6 additions & 6 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,12 @@ extension HTTP2StreamMultiplexer {

// MARK:- Child to parent calls
extension HTTP2StreamMultiplexer {
internal func childChannelClosed(_ channel: MultiplexerAbstractChannel) {
if let streamID = channel.streamID {
self.streams.removeValue(forKey: streamID)
} else {
preconditionFailure("Child channels always have stream IDs right now.")
}
internal func childChannelClosed(streamID: HTTP2StreamID) {
self.streams.removeValue(forKey: streamID)
}

internal func childChannelClosed(channelID: ObjectIdentifier) {
preconditionFailure("We don't currently support closing channels by 'channelID'")
}

internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
Expand Down
Loading

0 comments on commit cde820e

Please sign in to comment.