From 020e322a65299d46c426b20f858edca61fba028a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 30 Apr 2020 17:28:49 +0100 Subject: [PATCH] Added NIOHTTPRequestCompressor to compress requests (#88) * Added NIOHTTPRequestCompressor to compress requests Also moved common code from request and response compressor into separate NIOHTTPCompression enum. * Updates after comments from @weissi Also reinstated public enum HTTPResponseCompressor.CompressionError * algorithms are now let not var * Catch situation where head is flushed before anything else comes through Content-encoding was not being set Added additional tests for header values * Added documentation around 5 bytes added to buffer size and add them * Renaming NIOHTTPCompressionSetting to NIOCompression Also NIOHTTPCompressionSetting.CompressionAlgorithm is NIOCompression.Algorithm NIOHTTPCompressionSetting.CompressionError is NIOCompression.Error Algorithm now conforms to Equatable * Forgot to run generate_linux_tests * Fix typos --- .../NIOHTTPCompression/HTTPCompression.swift | 156 +++++++ .../HTTPRequestCompressor.swift | 175 ++++++++ .../HTTPResponseCompressor.swift | 151 ++----- Tests/LinuxMain.swift | 1 + .../HTTPRequestCompressorTest+XCTest.swift | 42 ++ .../HTTPRequestCompressorTest.swift | 391 ++++++++++++++++++ 6 files changed, 792 insertions(+), 124 deletions(-) create mode 100644 Sources/NIOHTTPCompression/HTTPCompression.swift create mode 100644 Sources/NIOHTTPCompression/HTTPRequestCompressor.swift create mode 100644 Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest+XCTest.swift create mode 100644 Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest.swift diff --git a/Sources/NIOHTTPCompression/HTTPCompression.swift b/Sources/NIOHTTPCompression/HTTPCompression.swift new file mode 100644 index 00000000..53ccb934 --- /dev/null +++ b/Sources/NIOHTTPCompression/HTTPCompression.swift @@ -0,0 +1,156 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import CNIOExtrasZlib +import NIO + +public enum NIOCompression { + + public struct Algorithm: CustomStringConvertible, Equatable { + fileprivate enum AlgorithmEnum: String { + case gzip + case deflate + } + fileprivate let algorithm: AlgorithmEnum + + /// return as String + public var description: String { return algorithm.rawValue } + + public static let gzip = Algorithm(algorithm: .gzip) + public static let deflate = Algorithm(algorithm: .deflate) + } + + public struct Error: Swift.Error, CustomStringConvertible, Equatable { + fileprivate enum ErrorEnum: String { + case uncompressedWritesPending + case noDataToWrite + } + fileprivate let error: ErrorEnum + + /// return as String + public var description: String { return error.rawValue } + + public static let uncompressedWritesPending = Error(error: .uncompressedWritesPending) + public static let noDataToWrite = Error(error: .noDataToWrite) + } + + struct Compressor { + private var stream = z_stream() + var isActive = false + + init() { } + + /// Set up the encoder for compressing data according to a specific + /// algorithm. + mutating func initialize(encoding: Algorithm) { + assert(!isActive) + isActive = true + // zlib docs say: The application must initialize zalloc, zfree and opaque before calling the init function. + stream.zalloc = nil + stream.zfree = nil + stream.opaque = nil + + let windowBits: Int32 + switch encoding.algorithm { + case .deflate: + windowBits = 15 + case .gzip: + windowBits = 16 + 15 + } + + let rc = CNIOExtrasZlib_deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowBits, 8, Z_DEFAULT_STRATEGY) + precondition(rc == Z_OK, "Unexpected return from zlib init: \(rc)") + } + + mutating func compress(inputBuffer: inout ByteBuffer, allocator: ByteBufferAllocator, finalise: Bool) -> ByteBuffer { + assert(isActive) + let flags = finalise ? Z_FINISH : Z_SYNC_FLUSH + // don't compress an empty buffer if we aren't finishing the compress + guard inputBuffer.readableBytes > 0 || finalise == true else { return allocator.buffer(capacity: 0) } + // deflateBound() provides an upper limit on the number of bytes the input can + // compress to. We add 5 bytes to handle the fact that Z_SYNC_FLUSH will append + // an empty stored block that is 5 bytes long. + // From zlib docs (https://www.zlib.net/manual.html) + // If the parameter flush is set to Z_SYNC_FLUSH, all pending output is flushed to the output buffer and the output is + // aligned on a byte boundary, so that the decompressor can get all input data available so far. (In particular avail_in + // is zero after the call if enough output space has been provided before the call.) Flushing may degrade compression for + // some compression algorithms and so it should be used only when necessary. This completes the current deflate block and + // follows it with an empty stored block that is three bits plus filler bits to the next byte, followed by four bytes + // (00 00 ff ff). + let bufferSize = Int(deflateBound(&stream, UInt(inputBuffer.readableBytes))) + var outputBuffer = allocator.buffer(capacity: bufferSize + 5) + stream.oneShotDeflate(from: &inputBuffer, to: &outputBuffer, flag: flags) + return outputBuffer + } + + mutating func shutdown() { + assert(isActive) + isActive = false + deflateEnd(&stream) + } + + mutating func shutdownIfActive() { + if isActive { + isActive = false + deflateEnd(&stream) + } + } + } +} + +extension z_stream { + /// Executes deflate from one buffer to another buffer. The advantage of this method is that it + /// will ensure that the stream is "safe" after each call (that is, that the stream does not have + /// pointers to byte buffers any longer). + mutating func oneShotDeflate(from: inout ByteBuffer, to: inout ByteBuffer, flag: Int32) { + defer { + self.avail_in = 0 + self.next_in = nil + self.avail_out = 0 + self.next_out = nil + } + + from.readWithUnsafeMutableReadableBytes { dataPtr in + let typedPtr = dataPtr.baseAddress!.assumingMemoryBound(to: UInt8.self) + let typedDataPtr = UnsafeMutableBufferPointer(start: typedPtr, + count: dataPtr.count) + + self.avail_in = UInt32(typedDataPtr.count) + self.next_in = typedDataPtr.baseAddress! + + let rc = deflateToBuffer(buffer: &to, flag: flag) + precondition(rc == Z_OK || rc == Z_STREAM_END, "One-shot compression failed: \(rc)") + + return typedDataPtr.count - Int(self.avail_in) + } + } + + /// A private function that sets the deflate target buffer and then calls deflate. + /// This relies on having the input set by the previous caller: it will use whatever input was + /// configured. + private mutating func deflateToBuffer(buffer: inout ByteBuffer, flag: Int32) -> Int32 { + var rc = Z_OK + + buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: buffer.capacity) { outputPtr in + let typedOutputPtr = UnsafeMutableBufferPointer(start: outputPtr.baseAddress!.assumingMemoryBound(to: UInt8.self), + count: outputPtr.count) + self.avail_out = UInt32(typedOutputPtr.count) + self.next_out = typedOutputPtr.baseAddress! + rc = deflate(&self, flag) + return typedOutputPtr.count - Int(self.avail_out) + } + + return rc + } +} diff --git a/Sources/NIOHTTPCompression/HTTPRequestCompressor.swift b/Sources/NIOHTTPCompression/HTTPRequestCompressor.swift new file mode 100644 index 00000000..8f6a36c7 --- /dev/null +++ b/Sources/NIOHTTPCompression/HTTPRequestCompressor.swift @@ -0,0 +1,175 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import CNIOExtrasZlib +import NIO +import NIOHTTP1 + +/// NIOHTTPResponseCompressor is an outbound channel handler that handles automatic streaming compression of +/// HTTP requests. +/// +/// This compressor supports gzip and deflate. It works best if many writes are made between flushes. +/// +/// Note that this compressor performs the compression on the event loop thread. This means that compressing +/// some resources, particularly those that do not benefit from compression or that could have been compressed +/// ahead-of-time instead of dynamically, could be a waste of CPU time and latency for relatively minimal +/// benefit. This channel handler should be present in the pipeline only for dynamically-generated and +/// highly-compressible content, which will see the biggest benefits from streaming compression. +public final class NIOHTTPRequestCompressor: ChannelOutboundHandler, RemovableChannelHandler { + public typealias OutboundIn = HTTPClientRequestPart + public typealias OutboundOut = HTTPClientRequestPart + + /// Handler state + enum State { + /// handler hasn't started + case idle + /// handler has received a head + case head(HTTPRequestHead) + /// handler has received a head and a body, but hasnt written anything yet + case body(HTTPRequestHead, ByteBuffer) + /// handler has written the head and some of the body out. + case partialBody(ByteBuffer) + /// handler has finished + case end + } + + /// encoding algorithm to use + var encoding: NIOCompression.Algorithm + /// handler state + var state: State + /// compression handler + var compressor: NIOCompression.Compressor + /// pending write promise + var pendingWritePromise: EventLoopPromise! + + /// Initialize a NIOHTTPRequestCompressor + /// - Parameter encoding: Compression algorithm to use + public init(encoding: NIOCompression.Algorithm) { + self.encoding = encoding + self.state = .idle + self.compressor = NIOCompression.Compressor() + } + + public func handlerAdded(context: ChannelHandlerContext) { + pendingWritePromise = context.eventLoop.makePromise() + } + + public func handlerRemoved(context: ChannelHandlerContext) { + pendingWritePromise.fail(NIOCompression.Error.uncompressedWritesPending) + compressor.shutdownIfActive() + } + + /// Write to channel + /// - Parameters: + /// - context: Channel handle context which this handler belongs to + /// - data: Data being sent through the channel + /// - promise: The eventloop promise that should be notified when the operation completes + public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + pendingWritePromise.futureResult.cascade(to: promise) + + let httpData = unwrapOutboundIn(data) + switch httpData { + case .head(let head): + switch state { + case .idle: + state = .head(head) + default: + preconditionFailure("Unexpected HTTP head") + } + compressor.initialize(encoding: self.encoding) + + case .body(let buffer): + switch state { + case .head(var head): + // We only have a head, this is the first body part + guard case .byteBuffer(let part) = buffer else { preconditionFailure("Expected a ByteBuffer") } + // now we have a body lets add the content-encoding header + head.headers.replaceOrAdd(name: "Content-Encoding", value: self.encoding.description) + state = .body(head, part) + case .body(let head, var body): + // we have a head and a body, extend the body with this body part + guard case .byteBuffer(var part) = buffer else { preconditionFailure("Expected a ByteBuffer") } + body.writeBuffer(&part) + state = .body(head, body) + case .partialBody(var body): + // we have a partial body, extend the partial body with this body part + guard case .byteBuffer(var part) = buffer else { preconditionFailure("Expected a ByteBuffer") } + body.writeBuffer(&part) + state = .partialBody(body) + default: + preconditionFailure("Unexpected Body") + } + + case .end: + switch state { + case .head(let head): + // only found a head + context.write(wrapOutboundOut(.head(head)), promise: nil) + context.write(data, promise: pendingWritePromise) + case .body(var head, var body): + // have head and the whole of the body. Compress body, set content length header and write it all out, including the end + let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: true) + head.headers.replaceOrAdd(name: "Content-Length", value: outputBuffer.readableBytes.description) + context.write(wrapOutboundOut(.head(head)), promise: nil) + context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: nil) + context.write(data, promise: pendingWritePromise) + case .partialBody(var body): + // have a section of the body. Compress that section of the body and write it out along with the end + let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: true) + context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: nil) + context.write(data, promise: pendingWritePromise) + default: + preconditionFailure("Unexpected End") + } + state = .end + compressor.shutdown() + } + } + + public func flush(context: ChannelHandlerContext) { + switch state { + case .head(var head): + // given we are flushing the head now we have to assume we have a body and set Content-Encoding + head.headers.replaceOrAdd(name: "Content-Encoding", value: self.encoding.description) + head.headers.remove(name: "Content-Length") + head.headers.replaceOrAdd(name: "Transfer-Encoding", value: "chunked") + context.write(wrapOutboundOut(.head(head)), promise: pendingWritePromise) + state = .partialBody(context.channel.allocator.buffer(capacity: 0)) + + case .body(var head, var body): + // Write out head with transfer-encoding set to "chunked" as we cannot set the content length + // Compress and write out what we have of the the body + let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: false) + head.headers.remove(name: "Content-Length") + head.headers.replaceOrAdd(name: "Transfer-Encoding", value: "chunked") + context.write(wrapOutboundOut(.head(head)), promise: nil) + context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: pendingWritePromise) + state = .partialBody(context.channel.allocator.buffer(capacity: 0)) + + case .partialBody(var body): + // Compress and write out what we have of the body + let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: false) + context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: pendingWritePromise) + state = .partialBody(context.channel.allocator.buffer(capacity: 0)) + + default: + context.flush() + return + } + // reset pending write promise + pendingWritePromise = context.eventLoop.makePromise() + context.flush() + } +} + diff --git a/Sources/NIOHTTPCompression/HTTPResponseCompressor.swift b/Sources/NIOHTTPCompression/HTTPResponseCompressor.swift index ad113fc6..108b1c28 100644 --- a/Sources/NIOHTTPCompression/HTTPResponseCompressor.swift +++ b/Sources/NIOHTTPCompression/HTTPResponseCompressor.swift @@ -63,20 +63,13 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne public typealias OutboundIn = HTTPServerResponsePart public typealias OutboundOut = HTTPServerResponsePart + public enum CompressionError: Error { case uncompressedWritesPending case noDataToWrite } - - fileprivate enum CompressionAlgorithm: String { - case gzip = "gzip" - case deflate = "deflate" - } - - // Private variable for storing stream data. - private var stream = z_stream() - - private var algorithm: CompressionAlgorithm? + + private var compressor: NIOCompression.Compressor // A queue of accept headers. private var acceptQueue = CircularBuffer<[Substring]>(initialCapacity: 8) @@ -88,6 +81,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne public init(initialByteBufferCapacity: Int = 1024) { self.initialByteBufferCapacity = initialByteBufferCapacity + self.compressor = NIOCompression.Compressor() } public func handlerAdded(context: ChannelHandlerContext) { @@ -97,10 +91,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne public func handlerRemoved(context: ChannelHandlerContext) { pendingWritePromise?.fail(CompressionError.uncompressedWritesPending) - if algorithm != nil { - deinitializeEncoder() - algorithm = nil - } + compressor.shutdownIfActive() } public func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -115,19 +106,18 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne let httpData = unwrapOutboundIn(data) switch httpData { case .head(var responseHead): - algorithm = compressionAlgorithm() - guard algorithm != nil else { + guard let algorithm = compressionAlgorithm() else { context.write(wrapOutboundOut(.head(responseHead)), promise: promise) return } // Previous handlers in the pipeline might have already set this header even though // they should not as it is compressor responsibility to decide what encoding to use - responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm!.rawValue) - initializeEncoder(encoding: algorithm!) + responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm.description) + compressor.initialize(encoding: algorithm) pendingResponse.bufferResponseHead(responseHead) pendingWritePromise.futureResult.cascade(to: promise) case .body(let body): - if algorithm != nil { + if compressor.isActive { pendingResponse.bufferBodyPart(body) pendingWritePromise.futureResult.cascade(to: promise) } else { @@ -136,7 +126,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne case .end: // This compress is not done in flush because we need to be done with the // compressor now. - guard algorithm != nil else { + guard compressor.isActive else { context.write(data, promise: promise) return } @@ -144,8 +134,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne pendingResponse.bufferResponseEnd(httpData) pendingWritePromise.futureResult.cascade(to: promise) emitPendingWrites(context: context) - algorithm = nil - deinitializeEncoder() + compressor.shutdown() } } @@ -158,7 +147,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne /// /// Returns the compression algorithm to use, or nil if the next response /// should not be compressed. - private func compressionAlgorithm() -> CompressionAlgorithm? { + private func compressionAlgorithm() -> NIOCompression.Algorithm? { let acceptHeaders = acceptQueue.removeFirst() var gzipQValue: Float = -1 @@ -187,38 +176,12 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChanne return nil } - /// Set up the encoder for compressing data according to a specific - /// algorithm. - private func initializeEncoder(encoding: CompressionAlgorithm) { - // zlib docs say: The application must initialize zalloc, zfree and opaque before calling the init function. - stream.zalloc = nil - stream.zfree = nil - stream.opaque = nil - - let windowBits: Int32 - switch encoding { - case .deflate: - windowBits = 15 - case .gzip: - windowBits = 16 + 15 - } - - let rc = CNIOExtrasZlib_deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowBits, 8, Z_DEFAULT_STRATEGY) - precondition(rc == Z_OK, "Unexpected return from zlib init: \(rc)") - } - - private func deinitializeEncoder() { - // We deliberately discard the result here because we just want to free up - // the pending data. - deflateEnd(&stream) - } - /// Emits all pending buffered writes to the network, optionally compressing the /// data. Resets the pending write buffer and promise. /// /// Called either when a HTTP end message is received or our flush() method is called. private func emitPendingWrites(context: ChannelHandlerContext) { - let writesToEmit = pendingResponse.flush(compressor: &stream, allocator: context.channel.allocator) + let writesToEmit = pendingResponse.flush(compressor: &compressor, allocator: context.channel.allocator) var pendingPromise = pendingWritePromise if let writeHead = writesToEmit.0 { @@ -302,24 +265,6 @@ private struct PartialHTTPResponse { body.reserveCapacity(initialBufferSize) } - mutating private func compressBody(compressor: inout z_stream, allocator: ByteBufferAllocator, flag: Int32) -> ByteBuffer? { - guard body.readableBytes > 0 else { - return nil - } - - // deflateBound() provides an upper limit on the number of bytes the input can - // compress to. We add 5 bytes to handle the fact that Z_SYNC_FLUSH will append - // an empty stored block that is 5 bytes long. - let bufferSize = Int(deflateBound(&compressor, UInt(body.readableBytes))) - var outputBuffer = allocator.buffer(capacity: bufferSize) - - // Now do the one-shot compression. All the data should have been consumed. - compressor.oneShotDeflate(from: &body, to: &outputBuffer, flag: flag) - precondition(body.readableBytes == 0) - precondition(outputBuffer.readableBytes > 0) - return outputBuffer - } - /// Flushes the buffered data into its constituent parts. /// /// Returns a three-tuple of a HTTP response head, compressed body bytes, and any end that @@ -333,66 +278,24 @@ private struct PartialHTTPResponse { /// /// Calling this function resets the buffer, freeing any excess memory allocated in the internal /// buffer and losing all copies of the other HTTP data. At this point it may freely be reused. - mutating func flush(compressor: inout z_stream, allocator: ByteBufferAllocator) -> (HTTPResponseHead?, ByteBuffer?, HTTPServerResponsePart?) { - let flag = mustFlush ? Z_FINISH : Z_SYNC_FLUSH - - let body = compressBody(compressor: &compressor, allocator: allocator, flag: flag) - if let bodyLength = body?.readableBytes, isCompleteResponse && bodyLength > 0 { - head!.headers.remove(name: "transfer-encoding") - head!.headers.replaceOrAdd(name: "content-length", value: "\(bodyLength)") - } else if head != nil && head!.status.mayHaveResponseBody { - head!.headers.remove(name: "content-length") - head!.headers.replaceOrAdd(name: "transfer-encoding", value: "chunked") + mutating func flush(compressor: inout NIOCompression.Compressor, allocator: ByteBufferAllocator) -> (HTTPResponseHead?, ByteBuffer?, HTTPServerResponsePart?) { + var outputBody: ByteBuffer? = nil + if self.body.readableBytes > 0 { + let compressedBody = compressor.compress(inputBuffer: &self.body, allocator: allocator, finalise: mustFlush) + if isCompleteResponse { + head!.headers.remove(name: "transfer-encoding") + head!.headers.replaceOrAdd(name: "content-length", value: "\(compressedBody.readableBytes)") + } + else if head != nil && head!.status.mayHaveResponseBody { + head!.headers.remove(name: "content-length") + head!.headers.replaceOrAdd(name: "transfer-encoding", value: "chunked") + } + outputBody = compressedBody } - let response = (head, body, end) + let response = (head, outputBody, end) clear() return response } } -private extension z_stream { - /// Executes deflate from one buffer to another buffer. The advantage of this method is that it - /// will ensure that the stream is "safe" after each call (that is, that the stream does not have - /// pointers to byte buffers any longer). - mutating func oneShotDeflate(from: inout ByteBuffer, to: inout ByteBuffer, flag: Int32) { - defer { - self.avail_in = 0 - self.next_in = nil - self.avail_out = 0 - self.next_out = nil - } - - from.readWithUnsafeMutableReadableBytes { dataPtr in - let typedPtr = dataPtr.baseAddress!.assumingMemoryBound(to: UInt8.self) - let typedDataPtr = UnsafeMutableBufferPointer(start: typedPtr, - count: dataPtr.count) - - self.avail_in = UInt32(typedDataPtr.count) - self.next_in = typedDataPtr.baseAddress! - - let rc = deflateToBuffer(buffer: &to, flag: flag) - precondition(rc == Z_OK || rc == Z_STREAM_END, "One-shot compression failed: \(rc)") - - return typedDataPtr.count - Int(self.avail_in) - } - } - - /// A private function that sets the deflate target buffer and then calls deflate. - /// This relies on having the input set by the previous caller: it will use whatever input was - /// configured. - private mutating func deflateToBuffer(buffer: inout ByteBuffer, flag: Int32) -> Int32 { - var rc = Z_OK - - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: buffer.capacity) { outputPtr in - let typedOutputPtr = UnsafeMutableBufferPointer(start: outputPtr.baseAddress!.assumingMemoryBound(to: UInt8.self), - count: outputPtr.count) - self.avail_out = UInt32(typedOutputPtr.count) - self.next_out = typedOutputPtr.baseAddress! - rc = deflate(&self, flag) - return typedOutputPtr.count - Int(self.avail_out) - } - - return rc - } -} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 114b265c..7f3c77a9 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -30,6 +30,7 @@ import XCTest testCase(DebugInboundEventsHandlerTest.allTests), testCase(DebugOutboundEventsHandlerTest.allTests), testCase(FixedLengthFrameDecoderTest.allTests), + testCase(HTTPRequestCompressorTest.allTests), testCase(HTTPRequestDecompressorTest.allTests), testCase(HTTPResponseCompressorTest.allTests), testCase(HTTPResponseDecompressorTest.allTests), diff --git a/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest+XCTest.swift b/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest+XCTest.swift new file mode 100644 index 00000000..b5eb1224 --- /dev/null +++ b/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest+XCTest.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPRequestCompressorTest+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPRequestCompressorTest { + + static var allTests : [(String, (HTTPRequestCompressorTest) -> () throws -> Void)] { + return [ + ("testGzipContentEncoding", testGzipContentEncoding), + ("testDeflateContentEncoding", testDeflateContentEncoding), + ("testOneBuffer", testOneBuffer), + ("testMultipleBuffers", testMultipleBuffers), + ("testMultipleBuffersDeflate", testMultipleBuffersDeflate), + ("testMultipleBuffersWithFlushes", testMultipleBuffersWithFlushes), + ("testFlushAfterHead", testFlushAfterHead), + ("testFlushBeforeEnd", testFlushBeforeEnd), + ("testDoubleFlush", testDoubleFlush), + ("testNoBody", testNoBody), + ] + } +} + diff --git a/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest.swift b/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest.swift new file mode 100644 index 00000000..0b76d033 --- /dev/null +++ b/Tests/NIOHTTPCompressionTests/HTTPRequestCompressorTest.swift @@ -0,0 +1,391 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import XCTest +import CNIOExtrasZlib +import NIO +import NIOHTTP1 +@testable import NIOHTTPCompression + +class HTTPRequestCompressorTest: XCTestCase { + + func compressionChannel(_ compression: NIOCompression.Algorithm = .gzip) throws -> EmbeddedChannel { + let channel = EmbeddedChannel() + //XCTAssertNoThrow(try channel.pipeline.addHandler(HTTPRequestEncoder(), name: "encoder").wait()) + XCTAssertNoThrow(try channel.pipeline.addHandler(NIOHTTPRequestCompressor(encoding: compression), name: "compressor").wait()) + return channel + } + + func write(body: [ByteBuffer], to channel: EmbeddedChannel) throws { + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + try write(head: requestHead, body: body, to: channel) + } + + func write(head: HTTPRequestHead, body: [ByteBuffer], to channel: EmbeddedChannel) throws { + var promiseArray = PromiseArray(on: channel.eventLoop) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(head)), promise: promiseArray.makePromise()) + + for bodyChunk in body { + channel.pipeline.write(NIOAny(HTTPClientRequestPart.body(.byteBuffer(bodyChunk))), promise: promiseArray.makePromise()) + } + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + + try promiseArray.waitUntilComplete() + } + + func writeWithIntermittantFlush(body: [ByteBuffer], to channel: EmbeddedChannel) throws { + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + try writeWithIntermittantFlush(head: requestHead, body: body, to: channel) + } + + func writeWithIntermittantFlush(head: HTTPRequestHead, body: [ByteBuffer], to channel: EmbeddedChannel) throws { + var promiseArray = PromiseArray(on: channel.eventLoop) + var count = 3 + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(head)), promise: promiseArray.makePromise()) + + for bodyChunk in body { + channel.pipeline.write( + NIOAny(HTTPClientRequestPart.body(.byteBuffer(bodyChunk))), + promise: promiseArray.makePromise() + ) + count -= 1 + if count == 0 { + channel.pipeline.flush() + count = 3 + } + } + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + + try promiseArray.waitUntilComplete() + } + + func read(from channel: EmbeddedChannel) throws -> (head: HTTPRequestHead, body: ByteBuffer) { + var requestHead: HTTPRequestHead! + var byteBuffer = channel.allocator.buffer(capacity: 0) + channel.pipeline.read() + loop: while let requestPart: HTTPClientRequestPart = try channel.readOutbound() { + switch requestPart { + case .head(let head): + requestHead = head + + case .body(let data): + if case .byteBuffer(var buffer) = data { + byteBuffer.writeBuffer(&buffer) + } + case .end: + break loop + } + } + return (head: requestHead, body: byteBuffer) + } + + func readVerifyPart(from channel: EmbeddedChannel, verify: (HTTPClientRequestPart)->()) throws { + channel.pipeline.read() + loop: while let requestPart: HTTPClientRequestPart = try channel.readOutbound() { + verify(requestPart) + } + } + + func testGzipContentEncoding() throws { + let channel = try compressionChannel() + var buffer = ByteBufferAllocator().buffer(capacity: 0) + buffer.writeString("Test") + + _ = try write(body: [buffer], to: channel) + try readVerifyPart(from: channel) { part in + if case .head(let head) = part { + XCTAssertEqual(head.headers["Content-Encoding"].first, "gzip") + } + } + } + + func testDeflateContentEncoding() throws { + let channel = try compressionChannel(.deflate) + var buffer = ByteBufferAllocator().buffer(capacity: 0) + buffer.writeString("Test") + + _ = try write(body: [buffer], to: channel) + try readVerifyPart(from: channel) { part in + if case .head(let head) = part { + XCTAssertEqual(head.headers["Content-Encoding"].first, "deflate") + } + } + } + + func testOneBuffer() throws { + let channel = try compressionChannel() + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + + _ = try write(body: [buffer], to: channel) + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffer.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffer, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + } + + func testMultipleBuffers() throws { + let channel = try compressionChannel() + var buffers: [ByteBuffer] = [] + var buffersConcat = ByteBufferAllocator().buffer(capacity: 16 * 1024 * Int.bitWidth / 8) + for _ in 0..<16 { + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + buffers.append(buffer) + buffersConcat.writeBuffer(&buffer) + } + + try write(body: buffers, to: channel) + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffersConcat.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffersConcat, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + } + + func testMultipleBuffersDeflate() throws { + let channel = try compressionChannel(.deflate) + var buffers: [ByteBuffer] = [] + var buffersConcat = ByteBufferAllocator().buffer(capacity: 16 * 1024 * Int.bitWidth / 8) + for _ in 0..<16 { + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + buffers.append(buffer) + buffersConcat.writeBuffer(&buffer) + } + + try write(body: buffers, to: channel) + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffersConcat.readableBytes) + z_stream.decompressDeflate(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffersConcat, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "deflate") + } + + func testMultipleBuffersWithFlushes() throws { + let channel = try compressionChannel() + var buffers: [ByteBuffer] = [] + var buffersConcat = ByteBufferAllocator().buffer(capacity: 16 * 1024 * Int.bitWidth / 8) + for _ in 0..<16 { + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + buffers.append(buffer) + buffersConcat.writeBuffer(&buffer) + } + + try writeWithIntermittantFlush(body: buffers, to: channel) + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffersConcat.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffersConcat, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + XCTAssertEqual(result.head.headers["transfer-encoding"].first, "chunked") + XCTAssertNil(result.head.headers["content-size"].first) + } + + func testFlushAfterHead() throws { + let channel = try compressionChannel() + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + var promiseArray = PromiseArray(on: channel.eventLoop) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + channel.pipeline.write(NIOAny(HTTPClientRequestPart.body(.byteBuffer(buffer))), promise: promiseArray.makePromise()) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + try promiseArray.waitUntilComplete() + + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffer.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffer, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + } + + func testFlushBeforeEnd() throws { + let channel = try compressionChannel() + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + var promiseArray = PromiseArray(on: channel.eventLoop) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: promiseArray.makePromise()) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.body(.byteBuffer(buffer))), promise: promiseArray.makePromise()) + channel.pipeline.flush() + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + try promiseArray.waitUntilComplete() + + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffer.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffer, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + } + + func testDoubleFlush() throws { + let channel = try compressionChannel() + var buffer = ByteBufferAllocator().buffer(capacity: 1024 * Int.bitWidth / 8) + for _ in 0..<1024 { + buffer.writeInteger(Int.random(in: Int.min...Int.max)) + } + + let algo = NIOCompression.Algorithm.gzip + if algo == NIOCompression.Algorithm.deflate { + print("Hello") + } + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + var promiseArray = PromiseArray(on: channel.eventLoop) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: promiseArray.makePromise()) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.body(.byteBuffer(buffer))), promise: promiseArray.makePromise()) + channel.pipeline.flush() + channel.pipeline.flush() + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + try promiseArray.waitUntilComplete() + + var result = try read(from: channel) + var uncompressedBuffer = ByteBufferAllocator().buffer(capacity: buffer.readableBytes) + z_stream.decompressGzip(compressedBytes: &result.body, outputBuffer: &uncompressedBuffer) + + XCTAssertEqual(buffer, uncompressedBuffer) + XCTAssertEqual(result.head.headers["content-encoding"].first, "gzip") + } + + func testNoBody() throws { + let channel = try compressionChannel() + + let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/") + var promiseArray = PromiseArray(on: channel.eventLoop) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: promiseArray.makePromise()) + channel.pipeline.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: promiseArray.makePromise()) + channel.pipeline.flush() + try promiseArray.waitUntilComplete() + + try readVerifyPart(from: channel) { part in + switch part { + case .head(let head): + XCTAssertNil(head.headers["Content-Encoding"].first) + case.body: + XCTFail("Shouldn't return a body") + case .end: + break + } + } + } +} + +struct PromiseArray { + var promises: [EventLoopPromise] + let eventLoop: EventLoop + + init(on eventLoop: EventLoop) { + self.promises = [] + self.eventLoop = eventLoop + } + + mutating func makePromise() -> EventLoopPromise { + let promise: EventLoopPromise = eventLoop.makePromise() + self.promises.append(promise) + return promise + } + + func waitUntilComplete() throws { + let resultFutures = promises.map { $0.futureResult } + _ = try EventLoopFuture.whenAllComplete(resultFutures, on: eventLoop).wait() + } +} + +private extension ByteBuffer { + @discardableResult + mutating func withUnsafeMutableReadableUInt8Bytes(_ body: (UnsafeMutableBufferPointer) throws -> T) rethrows -> T { + return try self.withUnsafeMutableReadableBytes { (ptr: UnsafeMutableRawBufferPointer) -> T in + let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self) + let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count) + return try body(inputBufferPointer) + } + } + + @discardableResult + mutating func writeWithUnsafeMutableUInt8Bytes(_ body: (UnsafeMutableBufferPointer) throws -> Int) rethrows -> Int { + return try self.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { (ptr: UnsafeMutableRawBufferPointer) -> Int in + let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self) + let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count) + return try body(inputBufferPointer) + } + } +} + +private extension z_stream { + static func decompressDeflate(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer) { + decompress(compressedBytes: &compressedBytes, outputBuffer: &outputBuffer, windowSize: 15) + } + + static func decompressGzip(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer) { + decompress(compressedBytes: &compressedBytes, outputBuffer: &outputBuffer, windowSize: 16 + 15) + } + + private static func decompress(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer, windowSize: Int32) { + compressedBytes.withUnsafeMutableReadableUInt8Bytes { inputPointer in + outputBuffer.writeWithUnsafeMutableUInt8Bytes { outputPointer -> Int in + var stream = z_stream() + + // zlib requires we initialize next_in, avail_in, zalloc, zfree and opaque before calling inflateInit2. + stream.next_in = inputPointer.baseAddress! + stream.avail_in = UInt32(inputPointer.count) + stream.next_out = outputPointer.baseAddress! + stream.avail_out = UInt32(outputPointer.count) + stream.zalloc = nil + stream.zfree = nil + stream.opaque = nil + + var rc = CNIOExtrasZlib_inflateInit2(&stream, windowSize) + precondition(rc == Z_OK) + + rc = inflate(&stream, Z_FINISH) + XCTAssertEqual(rc, Z_STREAM_END) + XCTAssertEqual(stream.avail_in, 0) + + rc = inflateEnd(&stream) + XCTAssertEqual(rc, Z_OK) + + return outputPointer.count - Int(stream.avail_out) + } + } + } +} +