Skip to content

Commit

Permalink
Added NIOHTTPRequestCompressor to compress requests (#88)
Browse files Browse the repository at this point in the history
* Added NIOHTTPRequestCompressor to compress requests

Also moved common code from request and response compressor into separate NIOHTTPCompression enum.

* Updates after comments from @weissi

Also reinstated public enum HTTPResponseCompressor.CompressionError

* algorithms are now let not var

* Catch situation where head is flushed before anything else comes through

Content-encoding was not being set
Added additional tests for header values

* Added documentation around 5 bytes added to buffer size and add them

* Renaming NIOHTTPCompressionSetting to NIOCompression

Also
NIOHTTPCompressionSetting.CompressionAlgorithm is NIOCompression.Algorithm
NIOHTTPCompressionSetting.CompressionError is NIOCompression.Error
Algorithm now conforms to Equatable

* Forgot to run generate_linux_tests

* Fix typos
  • Loading branch information
adam-fowler authored Apr 30, 2020
1 parent 0f26138 commit 020e322
Show file tree
Hide file tree
Showing 6 changed files with 792 additions and 124 deletions.
156 changes: 156 additions & 0 deletions Sources/NIOHTTPCompression/HTTPCompression.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import CNIOExtrasZlib
import NIO

public enum NIOCompression {

public struct Algorithm: CustomStringConvertible, Equatable {
fileprivate enum AlgorithmEnum: String {
case gzip
case deflate
}
fileprivate let algorithm: AlgorithmEnum

/// return as String
public var description: String { return algorithm.rawValue }

public static let gzip = Algorithm(algorithm: .gzip)
public static let deflate = Algorithm(algorithm: .deflate)
}

public struct Error: Swift.Error, CustomStringConvertible, Equatable {
fileprivate enum ErrorEnum: String {
case uncompressedWritesPending
case noDataToWrite
}
fileprivate let error: ErrorEnum

/// return as String
public var description: String { return error.rawValue }

public static let uncompressedWritesPending = Error(error: .uncompressedWritesPending)
public static let noDataToWrite = Error(error: .noDataToWrite)
}

struct Compressor {
private var stream = z_stream()
var isActive = false

init() { }

/// Set up the encoder for compressing data according to a specific
/// algorithm.
mutating func initialize(encoding: Algorithm) {
assert(!isActive)
isActive = true
// zlib docs say: The application must initialize zalloc, zfree and opaque before calling the init function.
stream.zalloc = nil
stream.zfree = nil
stream.opaque = nil

let windowBits: Int32
switch encoding.algorithm {
case .deflate:
windowBits = 15
case .gzip:
windowBits = 16 + 15
}

let rc = CNIOExtrasZlib_deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowBits, 8, Z_DEFAULT_STRATEGY)
precondition(rc == Z_OK, "Unexpected return from zlib init: \(rc)")
}

mutating func compress(inputBuffer: inout ByteBuffer, allocator: ByteBufferAllocator, finalise: Bool) -> ByteBuffer {
assert(isActive)
let flags = finalise ? Z_FINISH : Z_SYNC_FLUSH
// don't compress an empty buffer if we aren't finishing the compress
guard inputBuffer.readableBytes > 0 || finalise == true else { return allocator.buffer(capacity: 0) }
// deflateBound() provides an upper limit on the number of bytes the input can
// compress to. We add 5 bytes to handle the fact that Z_SYNC_FLUSH will append
// an empty stored block that is 5 bytes long.
// From zlib docs (https://www.zlib.net/manual.html)
// If the parameter flush is set to Z_SYNC_FLUSH, all pending output is flushed to the output buffer and the output is
// aligned on a byte boundary, so that the decompressor can get all input data available so far. (In particular avail_in
// is zero after the call if enough output space has been provided before the call.) Flushing may degrade compression for
// some compression algorithms and so it should be used only when necessary. This completes the current deflate block and
// follows it with an empty stored block that is three bits plus filler bits to the next byte, followed by four bytes
// (00 00 ff ff).
let bufferSize = Int(deflateBound(&stream, UInt(inputBuffer.readableBytes)))
var outputBuffer = allocator.buffer(capacity: bufferSize + 5)
stream.oneShotDeflate(from: &inputBuffer, to: &outputBuffer, flag: flags)
return outputBuffer
}

mutating func shutdown() {
assert(isActive)
isActive = false
deflateEnd(&stream)
}

mutating func shutdownIfActive() {
if isActive {
isActive = false
deflateEnd(&stream)
}
}
}
}

extension z_stream {
/// Executes deflate from one buffer to another buffer. The advantage of this method is that it
/// will ensure that the stream is "safe" after each call (that is, that the stream does not have
/// pointers to byte buffers any longer).
mutating func oneShotDeflate(from: inout ByteBuffer, to: inout ByteBuffer, flag: Int32) {
defer {
self.avail_in = 0
self.next_in = nil
self.avail_out = 0
self.next_out = nil
}

from.readWithUnsafeMutableReadableBytes { dataPtr in
let typedPtr = dataPtr.baseAddress!.assumingMemoryBound(to: UInt8.self)
let typedDataPtr = UnsafeMutableBufferPointer(start: typedPtr,
count: dataPtr.count)

self.avail_in = UInt32(typedDataPtr.count)
self.next_in = typedDataPtr.baseAddress!

let rc = deflateToBuffer(buffer: &to, flag: flag)
precondition(rc == Z_OK || rc == Z_STREAM_END, "One-shot compression failed: \(rc)")

return typedDataPtr.count - Int(self.avail_in)
}
}

/// A private function that sets the deflate target buffer and then calls deflate.
/// This relies on having the input set by the previous caller: it will use whatever input was
/// configured.
private mutating func deflateToBuffer(buffer: inout ByteBuffer, flag: Int32) -> Int32 {
var rc = Z_OK

buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: buffer.capacity) { outputPtr in
let typedOutputPtr = UnsafeMutableBufferPointer(start: outputPtr.baseAddress!.assumingMemoryBound(to: UInt8.self),
count: outputPtr.count)
self.avail_out = UInt32(typedOutputPtr.count)
self.next_out = typedOutputPtr.baseAddress!
rc = deflate(&self, flag)
return typedOutputPtr.count - Int(self.avail_out)
}

return rc
}
}
175 changes: 175 additions & 0 deletions Sources/NIOHTTPCompression/HTTPRequestCompressor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import CNIOExtrasZlib
import NIO
import NIOHTTP1

/// NIOHTTPResponseCompressor is an outbound channel handler that handles automatic streaming compression of
/// HTTP requests.
///
/// This compressor supports gzip and deflate. It works best if many writes are made between flushes.
///
/// Note that this compressor performs the compression on the event loop thread. This means that compressing
/// some resources, particularly those that do not benefit from compression or that could have been compressed
/// ahead-of-time instead of dynamically, could be a waste of CPU time and latency for relatively minimal
/// benefit. This channel handler should be present in the pipeline only for dynamically-generated and
/// highly-compressible content, which will see the biggest benefits from streaming compression.
public final class NIOHTTPRequestCompressor: ChannelOutboundHandler, RemovableChannelHandler {
public typealias OutboundIn = HTTPClientRequestPart
public typealias OutboundOut = HTTPClientRequestPart

/// Handler state
enum State {
/// handler hasn't started
case idle
/// handler has received a head
case head(HTTPRequestHead)
/// handler has received a head and a body, but hasnt written anything yet
case body(HTTPRequestHead, ByteBuffer)
/// handler has written the head and some of the body out.
case partialBody(ByteBuffer)
/// handler has finished
case end
}

/// encoding algorithm to use
var encoding: NIOCompression.Algorithm
/// handler state
var state: State
/// compression handler
var compressor: NIOCompression.Compressor
/// pending write promise
var pendingWritePromise: EventLoopPromise<Void>!

/// Initialize a NIOHTTPRequestCompressor
/// - Parameter encoding: Compression algorithm to use
public init(encoding: NIOCompression.Algorithm) {
self.encoding = encoding
self.state = .idle
self.compressor = NIOCompression.Compressor()
}

public func handlerAdded(context: ChannelHandlerContext) {
pendingWritePromise = context.eventLoop.makePromise()
}

public func handlerRemoved(context: ChannelHandlerContext) {
pendingWritePromise.fail(NIOCompression.Error.uncompressedWritesPending)
compressor.shutdownIfActive()
}

/// Write to channel
/// - Parameters:
/// - context: Channel handle context which this handler belongs to
/// - data: Data being sent through the channel
/// - promise: The eventloop promise that should be notified when the operation completes
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
pendingWritePromise.futureResult.cascade(to: promise)

let httpData = unwrapOutboundIn(data)
switch httpData {
case .head(let head):
switch state {
case .idle:
state = .head(head)
default:
preconditionFailure("Unexpected HTTP head")
}
compressor.initialize(encoding: self.encoding)

case .body(let buffer):
switch state {
case .head(var head):
// We only have a head, this is the first body part
guard case .byteBuffer(let part) = buffer else { preconditionFailure("Expected a ByteBuffer") }
// now we have a body lets add the content-encoding header
head.headers.replaceOrAdd(name: "Content-Encoding", value: self.encoding.description)
state = .body(head, part)
case .body(let head, var body):
// we have a head and a body, extend the body with this body part
guard case .byteBuffer(var part) = buffer else { preconditionFailure("Expected a ByteBuffer") }
body.writeBuffer(&part)
state = .body(head, body)
case .partialBody(var body):
// we have a partial body, extend the partial body with this body part
guard case .byteBuffer(var part) = buffer else { preconditionFailure("Expected a ByteBuffer") }
body.writeBuffer(&part)
state = .partialBody(body)
default:
preconditionFailure("Unexpected Body")
}

case .end:
switch state {
case .head(let head):
// only found a head
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(data, promise: pendingWritePromise)
case .body(var head, var body):
// have head and the whole of the body. Compress body, set content length header and write it all out, including the end
let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: true)
head.headers.replaceOrAdd(name: "Content-Length", value: outputBuffer.readableBytes.description)
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: nil)
context.write(data, promise: pendingWritePromise)
case .partialBody(var body):
// have a section of the body. Compress that section of the body and write it out along with the end
let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: true)
context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: nil)
context.write(data, promise: pendingWritePromise)
default:
preconditionFailure("Unexpected End")
}
state = .end
compressor.shutdown()
}
}

public func flush(context: ChannelHandlerContext) {
switch state {
case .head(var head):
// given we are flushing the head now we have to assume we have a body and set Content-Encoding
head.headers.replaceOrAdd(name: "Content-Encoding", value: self.encoding.description)
head.headers.remove(name: "Content-Length")
head.headers.replaceOrAdd(name: "Transfer-Encoding", value: "chunked")
context.write(wrapOutboundOut(.head(head)), promise: pendingWritePromise)
state = .partialBody(context.channel.allocator.buffer(capacity: 0))

case .body(var head, var body):
// Write out head with transfer-encoding set to "chunked" as we cannot set the content length
// Compress and write out what we have of the the body
let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: false)
head.headers.remove(name: "Content-Length")
head.headers.replaceOrAdd(name: "Transfer-Encoding", value: "chunked")
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: pendingWritePromise)
state = .partialBody(context.channel.allocator.buffer(capacity: 0))

case .partialBody(var body):
// Compress and write out what we have of the body
let outputBuffer = compressor.compress(inputBuffer: &body, allocator: context.channel.allocator, finalise: false)
context.write(wrapOutboundOut(.body(.byteBuffer(outputBuffer))), promise: pendingWritePromise)
state = .partialBody(context.channel.allocator.buffer(capacity: 0))

default:
context.flush()
return
}
// reset pending write promise
pendingWritePromise = context.eventLoop.makePromise()
context.flush()
}
}

Loading

0 comments on commit 020e322

Please sign in to comment.