diff --git a/Sources/NIOHTTP2/HTTP2Error.swift b/Sources/NIOHTTP2/HTTP2Error.swift index 872f8b12..445c424c 100644 --- a/Sources/NIOHTTP2/HTTP2Error.swift +++ b/Sources/NIOHTTP2/HTTP2Error.swift @@ -301,6 +301,11 @@ public enum NIOHTTP2Errors { public struct ExcessivelyLargeHeaderBlock: NIOHTTP2Error { public init() { } } + + /// The channel does not yet have a stream ID, as it has not reached the network yet. + public struct NoStreamIDAvailable: NIOHTTP2Error { + public init() { } + } } diff --git a/Sources/NIOHTTP2/HTTP2StreamChannel.swift b/Sources/NIOHTTP2/HTTP2StreamChannel.swift index 01bf2b42..358954df 100644 --- a/Sources/NIOHTTP2/HTTP2StreamChannel.swift +++ b/Sources/NIOHTTP2/HTTP2StreamChannel.swift @@ -38,6 +38,9 @@ extension HTTP2StreamChannelOptions.Types { /// stream ID the channel owns. This channel option allows that query. Please note that this channel option /// is *get-only*: that is, it cannot be used with `setOption`. The stream ID for a given `HTTP2StreamChannel` /// is immutable. + /// + /// If a channel is not active, the stream ID will not be present, and attempting to + /// get this channel option will fail. public struct StreamIDOption: ChannelOption { public typealias Value = HTTP2StreamID @@ -123,7 +126,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { internal init(allocator: ByteBufferAllocator, parent: Channel, multiplexer: HTTP2StreamMultiplexer, - streamID: HTTP2StreamID, + streamID: HTTP2StreamID?, targetWindowSize: Int32, outboundBytesHighWatermark: Int, outboundBytesLowWatermark: Int) { @@ -157,7 +160,9 @@ final class HTTP2StreamChannel: Channel, ChannelCore { // 4. Catching errors if they occur. let f = self.parent!.getOption(ChannelOptions.autoRead).flatMap { autoRead -> EventLoopFuture in self.autoRead = autoRead - return initializer?(self, self.streamID) ?? self.eventLoop.makeSucceededFuture(()) + // This initializer callback can only be invoked if we already have a stream ID. + // So we force-unwrap here. + return initializer?(self, self.streamID!) ?? self.eventLoop.makeSucceededFuture(()) }.map { // This force unwrap is safe as parent is assigned in the initializer, and never unassigned. // If parent is not active, we expect to receive a channelActive later. @@ -203,9 +208,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore { internal func networkActivationReceived() { if self.state == .closed { - // Uh-oh: we got an activation but we think we're closed! We need to send a RST_STREAM frame. - let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel)) - self.parent?.writeAndFlush(resetFrame, promise: nil) + // Uh-oh: we got an activation but we think we're closed! We need to send a RST_STREAM frame. We'll only do it if we have a stream ID. + if let streamID = self.streamID { + let resetFrame = HTTP2Frame(streamID: streamID, payload: .rstStream(.cancel)) + self.parent?.writeAndFlush(resetFrame, promise: nil) + } return } self.modifyingState { $0.networkActive() } @@ -294,7 +301,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore { switch option { case _ as HTTP2StreamChannelOptions.Types.StreamIDOption: - return self.streamID as! Option.Value + if let streamID = self.streamID { + return streamID as! Option.Value + } else { + throw NIOHTTP2Errors.NoStreamIDAvailable() + } case _ as ChannelOptions.Types.AutoReadOption: return self.autoRead as! Option.Value default: @@ -324,7 +335,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { public let eventLoop: EventLoop - private let streamID: HTTP2StreamID + internal var streamID: HTTP2StreamID? private var state: StreamChannelState @@ -478,7 +489,8 @@ final class HTTP2StreamChannel: Channel, ChannelCore { } self.modifyingState { $0.beginClosing() } - let resetFrame = HTTP2Frame(streamID: self.streamID, payload: .rstStream(.cancel)) + // We should have a stream ID here, force-unwrap is safe. + let resetFrame = HTTP2Frame(streamID: self.streamID!, payload: .rstStream(.cancel)) self.receiveOutboundFrame(resetFrame, promise: nil) self.multiplexer.childChannelFlush() } @@ -499,7 +511,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { self.eventLoop.execute { self.removeHandlers(channel: self) self.closePromise.succeed(()) - self.multiplexer.childChannelClosed(streamID: self.streamID) + self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self)) } } @@ -520,7 +532,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore { self.eventLoop.execute { self.removeHandlers(channel: self) self.closePromise.fail(error) - self.multiplexer.childChannelClosed(streamID: self.streamID) + self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self)) } } @@ -577,7 +589,8 @@ private extension HTTP2StreamChannel { self.pipeline.fireChannelRead(NIOAny(frame)) if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) { - let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment)) + // To have a pending read, we must have a stream ID. + let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment)) self.receiveOutboundFrame(frame, promise: nil) // This flush should really go away, but we need it for now until we sort out window management. self.multiplexer.childChannelFlush() @@ -661,7 +674,8 @@ internal extension HTTP2StreamChannel { self.tryToRead() if let reason = reason { - let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID, errorCode: reason) + // To receive from the network, it must be safe to force-unwrap here. + let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID!, errorCode: reason) self.errorEncountered(error: err) } else { self.closedCleanly() @@ -670,7 +684,8 @@ internal extension HTTP2StreamChannel { func receiveWindowUpdatedEvent(_ windowSize: Int) { if let increment = self.windowManager.newWindowSize(windowSize) { - let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment)) + // To receive from the network, it must be safe to force-unwrap here. + let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment)) self.receiveOutboundFrame(frame, promise: nil) // This flush should really go away, but we need it for now until we sort out window management. self.multiplexer.childChannelFlush() @@ -679,7 +694,8 @@ internal extension HTTP2StreamChannel { func initialWindowSizeChanged(delta: Int) { if let increment = self.windowManager.initialWindowSizeChanged(delta: delta) { - let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment)) + // To receive from the network, it must be safe to force-unwrap here. + let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment)) self.receiveOutboundFrame(frame, promise: nil) // This flush should really go away, but we need it for now until we sort out window management. self.multiplexer.childChannelFlush() @@ -725,7 +741,7 @@ extension HTTP2StreamChannel { // MARK: Custom String Convertible extension HTTP2StreamChannel { public var description: String { - return "HTTP2StreamChannel(streamID: \(self.streamID), isActive: \(self.isActive), isWritable: \(self.isWritable))" + return "HTTP2StreamChannel(streamID: \(String(describing: self.streamID)), isActive: \(self.isActive), isWritable: \(self.isWritable))" } } diff --git a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift index 746d1c85..c9b0d77a 100644 --- a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift @@ -299,8 +299,7 @@ extension HTTP2StreamMultiplexer { /// `ChannelPipeline` for the newly created channel. public func createStreamChannel(promise: EventLoopPromise?, _ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture) { self.channel.eventLoop.execute { - let streamID = self.nextOutboundStreamID - self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2) + let streamID = self.requestStreamID() let channel = MultiplexerAbstractChannel( allocator: self.channel.allocator, parent: self.channel, @@ -319,8 +318,12 @@ extension HTTP2StreamMultiplexer { // MARK:- Child to parent calls extension HTTP2StreamMultiplexer { - internal func childChannelClosed(streamID: HTTP2StreamID) { - self.streams.removeValue(forKey: streamID) + internal func childChannelClosed(_ channel: MultiplexerAbstractChannel) { + if let streamID = channel.streamID { + self.streams.removeValue(forKey: streamID) + } else { + preconditionFailure("Child channels always have stream IDs right now.") + } } internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise?) { @@ -330,4 +333,10 @@ extension HTTP2StreamMultiplexer { internal func childChannelFlush() { self.flush(context: context) } + + internal func requestStreamID() -> HTTP2StreamID { + let streamID = self.nextOutboundStreamID + self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2) + return streamID + } } diff --git a/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift b/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift index b827757d..ec345427 100644 --- a/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift +++ b/Sources/NIOHTTP2/MultiplexerAbstractChannel.swift @@ -41,6 +41,10 @@ struct MultiplexerAbstractChannel { outboundBytesHighWatermark: outboundBytesHighWatermark, outboundBytesLowWatermark: outboundBytesLowWatermark)) } + + init(_ channel: HTTP2StreamChannel) { + self.baseChannel = .frameBased(channel) + } } extension MultiplexerAbstractChannel { @@ -51,6 +55,13 @@ extension MultiplexerAbstractChannel { // MARK: API for HTTP2StreamMultiplexer extension MultiplexerAbstractChannel { + var streamID: HTTP2StreamID? { + switch self.baseChannel { + case .frameBased(let base): + return base.streamID + } + } + var inList: Bool { switch self.baseChannel { case .frameBased(let base):