forked from apple/swift-nio-extras
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HTTPResponseCompressor.swift
319 lines (277 loc) · 13.1 KB
/
HTTPResponseCompressor.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import CNIOExtrasZlib
import NIOCore
import NIOHTTP1
extension StringProtocol {
/// Test if this string starts with the same unicode scalars as the given string, `prefix`.
///
/// - note: This will be faster than `String.startsWith` as no unicode normalisations are performed.
///
/// - parameters:
/// - prefix: The string to match at the beginning of `self`
/// - returns: Whether or not `self` starts with the same unicode scalars as `prefix`.
func startsWithExactly<S: StringProtocol>(_ prefix: S) -> Bool {
return self.utf8.starts(with: prefix.utf8)
}
}
/// Given a header value, extracts the q value if there is one present. If one is not present,
/// returns the default q value, 1.0.
private func qValueFromHeader<S: StringProtocol>(_ text: S) -> Float {
let headerParts = text.split(separator: ";", maxSplits: 1, omittingEmptySubsequences: false)
guard headerParts.count > 1 && headerParts[1].count > 0 else {
return 1
}
// We have a Q value.
let qValue = Float(headerParts[1].split(separator: "=", maxSplits: 1, omittingEmptySubsequences: false)[1]) ?? 0
if qValue < 0 || qValue > 1 || qValue.isNaN {
return 0
}
return qValue
}
/// A ``HTTPResponseCompressor`` is a duplex channel handler that handles automatic streaming compression of
/// HTTP responses. It respects the client's Accept-Encoding preferences, including q-values if present,
/// and ensures that clients are served the compression algorithm that works best for them.
///
/// This compressor supports gzip and deflate. It works best if many writes are made between flushes.
///
/// Note that this compressor performs the compression on the event loop thread. This means that compressing
/// some resources, particularly those that do not benefit from compression or that could have been compressed
/// ahead-of-time instead of dynamically, could be a waste of CPU time and latency for relatively minimal
/// benefit. This channel handler should be present in the pipeline only for dynamically-generated and
/// highly-compressible content, which will see the biggest benefits from streaming compression.
public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChannelHandler {
/// This class accepts `HTTPServerRequestPart` inbound
public typealias InboundIn = HTTPServerRequestPart
/// This class emits `HTTPServerRequestPart` inbound.
public typealias InboundOut = HTTPServerRequestPart
/// This class accepts `HTTPServerResponsePart` outbound,
public typealias OutboundIn = HTTPServerResponsePart
/// This class emits `HTTPServerResponsePart` outbound.
public typealias OutboundOut = HTTPServerResponsePart
/// Errors which can occur when compressing
public enum CompressionError: Error {
// Writes were still pending when shutdown.
case uncompressedWritesPending
/// Data was somehow lost without being written.
case noDataToWrite
}
private var compressor: NIOCompression.Compressor
// A queue of accept headers.
private var acceptQueue = CircularBuffer<[Substring]>(initialCapacity: 8)
private var pendingResponse: PartialHTTPResponse!
private var pendingWritePromise: EventLoopPromise<Void>!
private let initialByteBufferCapacity: Int
/// Initialise a ``HTTPResponseCompressor``
/// - Parameter initialByteBufferCapacity: Initial size of buffer to allocate when hander is first added.
public init(initialByteBufferCapacity: Int = 1024) {
self.initialByteBufferCapacity = initialByteBufferCapacity
self.compressor = NIOCompression.Compressor()
}
/// Setup and add to the pipeline.
/// - Parameter context: Calling context.
public func handlerAdded(context: ChannelHandlerContext) {
pendingResponse = PartialHTTPResponse(bodyBuffer: context.channel.allocator.buffer(capacity: initialByteBufferCapacity))
pendingWritePromise = context.eventLoop.makePromise()
}
/// Remove channel handler from the pipeline.
/// - Parameter context: Calling context
public func handlerRemoved(context: ChannelHandlerContext) {
pendingWritePromise?.fail(CompressionError.uncompressedWritesPending)
compressor.shutdownIfActive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if case .head(let requestHead) = unwrapInboundIn(data) {
acceptQueue.append(requestHead.headers[canonicalForm: "accept-encoding"])
}
context.fireChannelRead(data)
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let httpData = unwrapOutboundIn(data)
switch httpData {
case .head(var responseHead):
guard let algorithm = compressionAlgorithm(), responseHead.status.mayHaveResponseBody else {
context.write(wrapOutboundOut(.head(responseHead)), promise: promise)
return
}
// Previous handlers in the pipeline might have already set this header even though
// they should not as it is compressor responsibility to decide what encoding to use
responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm.description)
compressor.initialize(encoding: algorithm)
pendingResponse.bufferResponseHead(responseHead)
pendingWritePromise.futureResult.cascade(to: promise)
case .body(let body):
if compressor.isActive {
pendingResponse.bufferBodyPart(body)
pendingWritePromise.futureResult.cascade(to: promise)
} else {
context.write(data, promise: promise)
}
case .end:
// This compress is not done in flush because we need to be done with the
// compressor now.
guard compressor.isActive else {
context.write(data, promise: promise)
return
}
pendingResponse.bufferResponseEnd(httpData)
pendingWritePromise.futureResult.cascade(to: promise)
emitPendingWrites(context: context)
compressor.shutdown()
}
}
public func flush(context: ChannelHandlerContext) {
emitPendingWrites(context: context)
context.flush()
}
/// Determines the compression algorithm to use for the next response.
///
/// Returns the compression algorithm to use, or nil if the next response
/// should not be compressed.
private func compressionAlgorithm() -> NIOCompression.Algorithm? {
let acceptHeaders = acceptQueue.removeFirst()
var gzipQValue: Float = -1
var deflateQValue: Float = -1
var anyQValue: Float = -1
for acceptHeader in acceptHeaders {
if acceptHeader.startsWithExactly("gzip") || acceptHeader.startsWithExactly("x-gzip") {
gzipQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithExactly("deflate") {
deflateQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithExactly("*") {
anyQValue = qValueFromHeader(acceptHeader)
}
}
if gzipQValue > 0 || deflateQValue > 0 {
return gzipQValue > deflateQValue ? .gzip : .deflate
} else if anyQValue > 0 {
// Though gzip is usually less well compressed than deflate, it has slightly
// wider support because it's unabiguous. We therefore default to that unless
// the client has expressed a preference.
return .gzip
}
return nil
}
/// Emits all pending buffered writes to the network, optionally compressing the
/// data. Resets the pending write buffer and promise.
///
/// Called either when a HTTP end message is received or our flush() method is called.
private func emitPendingWrites(context: ChannelHandlerContext) {
let writesToEmit = pendingResponse.flush(compressor: &compressor, allocator: context.channel.allocator)
var pendingPromise = pendingWritePromise
if let writeHead = writesToEmit.0 {
context.write(wrapOutboundOut(.head(writeHead)), promise: pendingPromise)
pendingPromise = nil
}
if let writeBody = writesToEmit.1 {
context.write(wrapOutboundOut(.body(.byteBuffer(writeBody))), promise: pendingPromise)
pendingPromise = nil
}
if let writeEnd = writesToEmit.2 {
context.write(wrapOutboundOut(writeEnd), promise: pendingPromise)
pendingPromise = nil
}
// If we still have the pending promise, we never emitted a write. Fail the promise,
// as anything that is listening for its data somehow lost it.
if let stillPendingPromise = pendingPromise {
stillPendingPromise.fail(CompressionError.noDataToWrite)
}
// Reset the pending promise.
pendingWritePromise = context.eventLoop.makePromise()
}
}
#if swift(>=5.6)
@available(*, unavailable)
extension HTTPResponseCompressor: Sendable {}
#endif
/// A buffer object that allows us to keep track of how much of a HTTP response we've seen before
/// a flush.
///
/// The strategy used in this module is that we want to have as much information as possible before
/// we compress, and to compress as few times as possible. This is because in the ideal situation we
/// will have a complete HTTP response to compress in one shot, allowing us to update the content
/// length, rather than force the response to be chunked. It is much easier to do the right thing
/// if we can encapsulate our ideas about how HTTP responses in an entity like this.
private struct PartialHTTPResponse {
var head: HTTPResponseHead?
var body: ByteBuffer
var end: HTTPServerResponsePart?
private let initialBufferSize: Int
var isCompleteResponse: Bool {
return head != nil && end != nil
}
var mustFlush: Bool {
return end != nil
}
init(bodyBuffer: ByteBuffer) {
body = bodyBuffer
initialBufferSize = bodyBuffer.capacity
}
mutating func bufferResponseHead(_ head: HTTPResponseHead) {
precondition(self.head == nil)
self.head = head
}
mutating func bufferBodyPart(_ bodyPart: IOData) {
switch bodyPart {
case .byteBuffer(var buffer):
body.writeBuffer(&buffer)
case .fileRegion:
fatalError("Cannot currently compress file regions")
}
}
mutating func bufferResponseEnd(_ end: HTTPServerResponsePart) {
precondition(self.end == nil)
guard case .end = end else {
fatalError("Buffering wrong entity type: \(end)")
}
self.end = end
}
private mutating func clear() {
head = nil
end = nil
body.clear()
body.reserveCapacity(initialBufferSize)
}
/// Flushes the buffered data into its constituent parts.
///
/// Returns a three-tuple of a HTTP response head, compressed body bytes, and any end that
/// may have been buffered. Each of these types is optional.
///
/// If the head is flushed, it will have had its headers mutated based on whether we had the whole
/// response or not. If nil, the head has previously been emitted.
///
/// If the body is nil, it means no writes were buffered (that is, our buffer of bytes has no
/// readable bytes in it). This should usually mean that no write is issued.
///
/// Calling this function resets the buffer, freeing any excess memory allocated in the internal
/// buffer and losing all copies of the other HTTP data. At this point it may freely be reused.
mutating func flush(compressor: inout NIOCompression.Compressor, allocator: ByteBufferAllocator) -> (HTTPResponseHead?, ByteBuffer?, HTTPServerResponsePart?) {
var outputBody: ByteBuffer? = nil
if self.body.readableBytes > 0 || mustFlush {
let compressedBody = compressor.compress(inputBuffer: &self.body, allocator: allocator, finalise: mustFlush)
if isCompleteResponse {
head!.headers.remove(name: "transfer-encoding")
head!.headers.replaceOrAdd(name: "content-length", value: "\(compressedBody.readableBytes)")
}
else if head != nil && head!.status.mayHaveResponseBody {
head!.headers.remove(name: "content-length")
head!.headers.replaceOrAdd(name: "transfer-encoding", value: "chunked")
}
outputBody = compressedBody
}
let response = (head, outputBody, end)
clear()
return response
}
}