Skip to content

Commit

Permalink
Update HTTP2StreamChannel to tolerate nil stream ID. (#217)
Browse files Browse the repository at this point in the history
Motivation:

As part of the work in #214, HTTP2StreamID will need to be able to
tolerate not knowing its stream ID. For now this is not a problem, but
we should think about everywhere we use the stream ID and make sure we
have a solid principle in mind. This lays the groundwork for future
enhancements.

Modifications:

- Update HTTP2StreamChannel to hold an optional stream ID.
- Update interface to HTTP2StreamMultiplexer to tolerate closing
  channels without stream IDs (we'll need it later).
- Move the "get a new stream ID" functionality to a method on
  HTTP2StreamMultiplexer.
- Validate we behave correctly with optional stream IDs in the channel.

Result:

We can tolerate stream IDs being not present.
  • Loading branch information
Lukasa authored Jul 28, 2020
1 parent 125cab7 commit edd373d
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
5 changes: 5 additions & 0 deletions Sources/NIOHTTP2/HTTP2Error.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ public enum NIOHTTP2Errors {
public struct ExcessivelyLargeHeaderBlock: NIOHTTP2Error {
public init() { }
}

/// The channel does not yet have a stream ID, as it has not reached the network yet.
public struct NoStreamIDAvailable: NIOHTTP2Error {
public init() { }
}
}


Expand Down
46 changes: 31 additions & 15 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ extension HTTP2StreamChannelOptions.Types {
/// stream ID the channel owns. This channel option allows that query. Please note that this channel option
/// is *get-only*: that is, it cannot be used with `setOption`. The stream ID for a given `HTTP2StreamChannel`
/// is immutable.
///
/// If a channel is not active, the stream ID will not be present, and attempting to
/// get this channel option will fail.
public struct StreamIDOption: ChannelOption {
public typealias Value = HTTP2StreamID

Expand Down Expand Up @@ -123,7 +126,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
internal init(allocator: ByteBufferAllocator,
parent: Channel,
multiplexer: HTTP2StreamMultiplexer,
streamID: HTTP2StreamID,
streamID: HTTP2StreamID?,
targetWindowSize: Int32,
outboundBytesHighWatermark: Int,
outboundBytesLowWatermark: Int) {
Expand Down Expand Up @@ -157,7 +160,9 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
// 4. Catching errors if they occur.
let f = self.parent!.getOption(ChannelOptions.autoRead).flatMap { autoRead -> EventLoopFuture<Void> in
self.autoRead = autoRead
return initializer?(self, self.streamID) ?? self.eventLoop.makeSucceededFuture(())
// This initializer callback can only be invoked if we already have a stream ID.
// So we force-unwrap here.
return initializer?(self, self.streamID!) ?? self.eventLoop.makeSucceededFuture(())
}.map {
// This force unwrap is safe as parent is assigned in the initializer, and never unassigned.
// If parent is not active, we expect to receive a channelActive later.
Expand Down Expand Up @@ -203,9 +208,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {

internal func networkActivationReceived() {
if self.state == .closed {
// Uh-oh: we got an activation but we think we're closed! We need to send a RST_STREAM frame.
let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel))
self.parent?.writeAndFlush(resetFrame, promise: nil)
// Uh-oh: we got an activation but we think we're closed! We need to send a RST_STREAM frame. We'll only do it if we have a stream ID.
if let streamID = self.streamID {
let resetFrame = HTTP2Frame(streamID: streamID, payload: .rstStream(.cancel))
self.parent?.writeAndFlush(resetFrame, promise: nil)
}
return
}
self.modifyingState { $0.networkActive() }
Expand Down Expand Up @@ -294,7 +301,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {

switch option {
case _ as HTTP2StreamChannelOptions.Types.StreamIDOption:
return self.streamID as! Option.Value
if let streamID = self.streamID {
return streamID as! Option.Value
} else {
throw NIOHTTP2Errors.NoStreamIDAvailable()
}
case _ as ChannelOptions.Types.AutoReadOption:
return self.autoRead as! Option.Value
default:
Expand Down Expand Up @@ -324,7 +335,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {

public let eventLoop: EventLoop

private let streamID: HTTP2StreamID
internal var streamID: HTTP2StreamID?

private var state: StreamChannelState

Expand Down Expand Up @@ -478,7 +489,8 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
}

self.modifyingState { $0.beginClosing() }
let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel))
// We should have a stream ID here, force-unwrap is safe.
let resetFrame = HTTP2Frame(streamID: self.streamID!, payload: .rstStream(.cancel))
self.receiveOutboundFrame(resetFrame, promise: nil)
self.multiplexer.childChannelFlush()
}
Expand All @@ -499,7 +511,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.succeed(())
self.multiplexer.childChannelClosed(streamID: self.streamID)
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
}
}

Expand All @@ -520,7 +532,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.fail(error)
self.multiplexer.childChannelClosed(streamID: self.streamID)
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
}
}

Expand Down Expand Up @@ -577,7 +589,8 @@ private extension HTTP2StreamChannel {
self.pipeline.fireChannelRead(NIOAny(frame))

if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
// To have a pending read, we must have a stream ID.
let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.multiplexer.childChannelFlush()
Expand Down Expand Up @@ -661,7 +674,8 @@ internal extension HTTP2StreamChannel {
self.tryToRead()

if let reason = reason {
let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID, errorCode: reason)
// To receive from the network, it must be safe to force-unwrap here.
let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID!, errorCode: reason)
self.errorEncountered(error: err)
} else {
self.closedCleanly()
Expand All @@ -670,7 +684,8 @@ internal extension HTTP2StreamChannel {

func receiveWindowUpdatedEvent(_ windowSize: Int) {
if let increment = self.windowManager.newWindowSize(windowSize) {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
// To receive from the network, it must be safe to force-unwrap here.
let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.multiplexer.childChannelFlush()
Expand All @@ -679,7 +694,8 @@ internal extension HTTP2StreamChannel {

func initialWindowSizeChanged(delta: Int) {
if let increment = self.windowManager.initialWindowSizeChanged(delta: delta) {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
// To receive from the network, it must be safe to force-unwrap here.
let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.multiplexer.childChannelFlush()
Expand Down Expand Up @@ -725,7 +741,7 @@ extension HTTP2StreamChannel {
// MARK: Custom String Convertible
extension HTTP2StreamChannel {
public var description: String {
return "HTTP2StreamChannel(streamID: \(self.streamID), isActive: \(self.isActive), isWritable: \(self.isWritable))"
return "HTTP2StreamChannel(streamID: \(String(describing: self.streamID)), isActive: \(self.isActive), isWritable: \(self.isWritable))"
}
}

Expand Down
17 changes: 13 additions & 4 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ extension HTTP2StreamMultiplexer {
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let streamID = self.nextOutboundStreamID
self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2)
let streamID = self.requestStreamID()
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
Expand All @@ -319,8 +318,12 @@ extension HTTP2StreamMultiplexer {

// MARK:- Child to parent calls
extension HTTP2StreamMultiplexer {
internal func childChannelClosed(streamID: HTTP2StreamID) {
self.streams.removeValue(forKey: streamID)
internal func childChannelClosed(_ channel: MultiplexerAbstractChannel) {
if let streamID = channel.streamID {
self.streams.removeValue(forKey: streamID)
} else {
preconditionFailure("Child channels always have stream IDs right now.")
}
}

internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
Expand All @@ -330,4 +333,10 @@ extension HTTP2StreamMultiplexer {
internal func childChannelFlush() {
self.flush(context: context)
}

internal func requestStreamID() -> HTTP2StreamID {
let streamID = self.nextOutboundStreamID
self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2)
return streamID
}
}
11 changes: 11 additions & 0 deletions Sources/NIOHTTP2/MultiplexerAbstractChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct MultiplexerAbstractChannel {
outboundBytesHighWatermark: outboundBytesHighWatermark,
outboundBytesLowWatermark: outboundBytesLowWatermark))
}

init(_ channel: HTTP2StreamChannel) {
self.baseChannel = .frameBased(channel)
}
}

extension MultiplexerAbstractChannel {
Expand All @@ -51,6 +55,13 @@ extension MultiplexerAbstractChannel {

// MARK: API for HTTP2StreamMultiplexer
extension MultiplexerAbstractChannel {
var streamID: HTTP2StreamID? {
switch self.baseChannel {
case .frameBased(let base):
return base.streamID
}
}

var inList: Bool {
switch self.baseChannel {
case .frameBased(let base):
Expand Down

0 comments on commit edd373d

Please sign in to comment.