diff --git a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift index 6e794380..fcac381a 100644 --- a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift @@ -433,7 +433,7 @@ extension HTTP2StreamMultiplexer { } internal func childChannelFlush() { - self.flush(context: context) + self.flush(context: self.context) } /// Requests a `HTTP2StreamID` for the given `Channel`. diff --git a/Sources/NIOHTTP2/HTTP2ToHTTP1Codec.swift b/Sources/NIOHTTP2/HTTP2ToHTTP1Codec.swift index 6d74b715..cad5356f 100644 --- a/Sources/NIOHTTP2/HTTP2ToHTTP1Codec.swift +++ b/Sources/NIOHTTP2/HTTP2ToHTTP1Codec.swift @@ -274,7 +274,7 @@ fileprivate struct BaseServerCodec { } } - mutating func processOutboundData(_ data: HTTPServerResponsePart, allocator: ByteBufferAllocator) throws -> HTTP2Frame.FramePayload { + mutating func processOutboundData(_ data: HTTPServerResponsePart, allocator: ByteBufferAllocator) -> HTTP2Frame.FramePayload { switch data { case .head(let head): let h1 = HTTPHeaders(responseHead: head) @@ -350,15 +350,9 @@ public final class HTTP2ToHTTP1ServerCodec: ChannelInboundHandler, ChannelOutbou public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { let responsePart = self.unwrapOutboundIn(data) - - do { - let transformedPayload = try self.baseCodec.processOutboundData(responsePart, allocator: context.channel.allocator) - let part = HTTP2Frame(streamID: self.streamID, payload: transformedPayload) - context.write(self.wrapOutboundOut(part), promise: promise) - } catch { - promise?.fail(error) - context.fireErrorCaught(error) - } + let transformedPayload = self.baseCodec.processOutboundData(responsePart, allocator: context.channel.allocator) + let part = HTTP2Frame(streamID: self.streamID, payload: transformedPayload) + context.write(self.wrapOutboundOut(part), promise: promise) } } @@ -409,14 +403,8 @@ public final class HTTP2FramePayloadToHTTP1ServerCodec: ChannelInboundHandler, C public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { let responsePart = self.unwrapOutboundIn(data) - - do { - let transformedPayload = try self.baseCodec.processOutboundData(responsePart, allocator: context.channel.allocator) - context.write(self.wrapOutboundOut(transformedPayload), promise: promise) - } catch { - promise?.fail(error) - context.fireErrorCaught(error) - } + let transformedPayload = self.baseCodec.processOutboundData(responsePart, allocator: context.channel.allocator) + context.write(self.wrapOutboundOut(transformedPayload), promise: promise) } } diff --git a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift index 057dd2b8..78ff3000 100644 --- a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift @@ -73,6 +73,7 @@ extension HTTP2FramePayloadStreamMultiplexerTests { ("testMultiplexerModifiesStreamChannelWritabilityBasedOnFixedSizeTokensAndChannelWritability", testMultiplexerModifiesStreamChannelWritabilityBasedOnFixedSizeTokensAndChannelWritability), ("testStreamChannelToleratesFailingInitializer", testStreamChannelToleratesFailingInitializer), ("testInboundChannelWindowSizeIsCustomisable", testInboundChannelWindowSizeIsCustomisable), + ("testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer", testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer), ] } } diff --git a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift index 214c3d22..fef405a6 100644 --- a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift @@ -1716,4 +1716,50 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try channel.finish(acceptAlreadyClosed: false)) } + @available(*, deprecated, message: "Deprecated so deprecated functionality can be tested without warnings") + func testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer() throws { + let frameRecorder = FrameWriteRecorder() + XCTAssertNoThrow(try self.channel.pipeline.addHandler(frameRecorder).wait()) + + let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.channel, inboundStreamInitializer: nil) + XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) + XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) + + // Create a payload based stream. + let streamAPromise = self.channel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamAPromise) { channel in + return channel.eventLoop.makeSucceededFuture(()) + } + self.channel.embeddedEventLoop.run() + let streamA = try assertNoThrowWithValue(try streamAPromise.futureResult.wait()) + // We haven't written on the stream yet: it shouldn't have a stream ID. + XCTAssertThrowsError(try streamA.getOption(HTTP2StreamChannelOptions.streamID).wait()) { error in + XCTAssert(error is NIOHTTP2Errors.NoStreamIDAvailable) + } + + // Create a frame based stream. + let streamBPromise = self.channel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamBPromise) { channel, streamID in + // stream A doesn't have an ID yet. + XCTAssertEqual(streamID, HTTP2StreamID(1)) + return channel.eventLoop.makeSucceededFuture(()) + } + self.channel.embeddedEventLoop.run() + let streamB = try assertNoThrowWithValue(try streamBPromise.futureResult.wait()) + + // Do some writes on A and B. + let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")]) + let headersPayload = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: false)) + + // (We checked the streamID above.) + XCTAssertNoThrow(try streamB.writeAndFlush(HTTP2Frame(streamID: 1, payload: headersPayload)).wait()) + + // Write on stream A. + XCTAssertNoThrow(try streamA.writeAndFlush(headersPayload).wait()) + // Stream A must have an ID now. + XCTAssertEqual(try streamA.getOption(HTTP2StreamChannelOptions.streamID).wait(), HTTP2StreamID(3)) + + frameRecorder.flushedWrites.assertFramesMatch([HTTP2Frame(streamID: 1, payload: headersPayload), + HTTP2Frame(streamID: 3, payload: headersPayload)]) + } } diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests+XCTest.swift b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests+XCTest.swift index 32470e18..b6902516 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests+XCTest.swift @@ -73,6 +73,7 @@ extension SimpleClientServerFramePayloadStreamTests { ("testNoStreamWindowUpdateOnEndStreamFrameFromServer", testNoStreamWindowUpdateOnEndStreamFrameFromServer), ("testNoStreamWindowUpdateOnEndStreamFrameFromClient", testNoStreamWindowUpdateOnEndStreamFrameFromClient), ("testGreasedSettingsAreTolerated", testGreasedSettingsAreTolerated), + ("testStreamCreationOrder", testStreamCreationOrder), ] } } diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift index 92b38496..fddbf6bc 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift @@ -1881,4 +1881,35 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { let settings = nioDefaultSettings + [HTTP2Setting(parameter: .init(extensionSetting: 0xfafa), value: 0xf0f0f0f0)] XCTAssertNoThrow(try self.basicHTTP2Connection(clientSettings: settings)) } + + func testStreamCreationOrder() throws { + try self.basicHTTP2Connection() + let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.clientChannel, inboundStreamInitializer: nil) + XCTAssertNoThrow(try self.clientChannel.pipeline.addHandler(multiplexer).wait()) + + let streamAPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamAPromise) { channel in + return channel.eventLoop.makeSucceededFuture(()) + } + self.clientChannel.embeddedEventLoop.run() + let streamA = try assertNoThrowWithValue(try streamAPromise.futureResult.wait()) + + let streamBPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamBPromise) { channel in + return channel.eventLoop.makeSucceededFuture(()) + } + self.clientChannel.embeddedEventLoop.run() + let streamB = try assertNoThrowWithValue(try streamBPromise.futureResult.wait()) + + let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")]) + let headersFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: false)) + + // Write on 'B' first. + XCTAssertNoThrow(try streamB.writeAndFlush(headersFramePayload).wait()) + XCTAssertEqual(try streamB.getOption(HTTP2StreamChannelOptions.streamID).wait(), HTTP2StreamID(1)) + + // Now write on stream 'A'. This would fail on frame-based stream channel. + XCTAssertNoThrow(try streamA.writeAndFlush(headersFramePayload).wait()) + XCTAssertEqual(try streamA.getOption(HTTP2StreamChannelOptions.streamID).wait(), HTTP2StreamID(3)) + } } diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift b/Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift index 27182a96..48be2005 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift @@ -32,6 +32,7 @@ extension SimpleClientServerTests { ("testStreamMultiplexerAcknowledgesSettingsBasedFlowControlChanges", testStreamMultiplexerAcknowledgesSettingsBasedFlowControlChanges), ("testNoStreamWindowUpdateOnEndStreamFrameFromServer", testNoStreamWindowUpdateOnEndStreamFrameFromServer), ("testNoStreamWindowUpdateOnEndStreamFrameFromClient", testNoStreamWindowUpdateOnEndStreamFrameFromClient), + ("testStreamCreationOrder", testStreamCreationOrder), ] } } diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerTests.swift b/Tests/NIOHTTP2Tests/SimpleClientServerTests.swift index 68763c37..3261e100 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerTests.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerTests.swift @@ -319,4 +319,44 @@ class SimpleClientServerTests: XCTestCase { XCTAssertNoThrow(XCTAssertTrue(try self.clientChannel.finish().isClean)) XCTAssertNoThrow(XCTAssertTrue(try self.serverChannel.finish().isClean)) } + + @available(*, deprecated, message: "Deprecated so deprecated functionality can be tested without warnings") + func testStreamCreationOrder() throws { + try self.basicHTTP2Connection() + let multiplexer = HTTP2StreamMultiplexer(mode: .client, channel: self.clientChannel) + XCTAssertNoThrow(try self.clientChannel.pipeline.addHandler(multiplexer).wait()) + + let streamAPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamAPromise) { channel, _ in + return channel.eventLoop.makeSucceededFuture(()) + } + self.clientChannel.embeddedEventLoop.run() + let streamA = try assertNoThrowWithValue(try streamAPromise.futureResult.wait()) + + let streamBPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamBPromise) { channel, _ in + return channel.eventLoop.makeSucceededFuture(()) + } + self.clientChannel.embeddedEventLoop.run() + let streamB = try assertNoThrowWithValue(try streamBPromise.futureResult.wait()) + + let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")]) + let headersFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: false)) + + // Write on 'B' first. + let streamBHeadersWritten = streamB.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture in + let frame = HTTP2Frame(streamID: streamID, payload: headersFramePayload) + return streamB.writeAndFlush(frame) + } + XCTAssertNoThrow(try streamBHeadersWritten.wait()) + + // Now write on stream 'A', it will fail. (This failure motivated the frame-payload based stream channel.) + let streamAHeadersWritten = streamA.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture in + let frame = HTTP2Frame(streamID: streamID, payload: headersFramePayload) + return streamA.writeAndFlush(frame) + } + XCTAssertThrowsError(try streamAHeadersWritten.wait()) { error in + XCTAssert(error is NIOHTTP2Errors.StreamIDTooSmall) + } + } }