Skip to content

Commit

Permalink
Responsiveness under Working Conditions
Browse files Browse the repository at this point in the history
Implementation of https://datatracker.ietf.org/doc/draft-ietf-ippm-responsiveness/ (draft 5) with
flexible download and upload handlers to suit other use cases as well.
  • Loading branch information
ehaydenr committed Dec 17, 2024
1 parent 5c79780 commit e73abfd
Show file tree
Hide file tree
Showing 10 changed files with 921 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .spi.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 1
builder:
configs:
- documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload]
- documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload, NIOHTTPResponsiveness]
42 changes: 42 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,44 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIOEmbedded", package: "swift-nio"),
]
),
.target(
name: "NIOHTTPResponsiveness",
dependencies: [
"NIOHTTPTypes",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "HTTPTypes", package: "swift-http-types"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
.testTarget(
name: "NIOHTTPResponsivenessTests",
dependencies: [
"NIOHTTPResponsiveness",
"NIOHTTPTypes",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "HTTPTypes", package: "swift-http-types"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
.executableTarget(
name: "NIOHTTPResponsivenessServer",
dependencies: [
"NIOHTTPResponsiveness",
"NIOHTTPTypesHTTP1",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
]

let package = Package(
Expand All @@ -209,13 +247,17 @@ let package = Package(
.library(name: "NIOHTTPTypesHTTP1", targets: ["NIOHTTPTypesHTTP1"]),
.library(name: "NIOHTTPTypesHTTP2", targets: ["NIOHTTPTypesHTTP2"]),
.library(name: "NIOResumableUpload", targets: ["NIOResumableUpload"]),
.library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.67.0"),
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.27.0"),
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.4.0"),

],
targets: targets
)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ On the [`nio-extras-0.1`](https://github.com/apple/swift-nio-extras/tree/nio-ext
- [`HTTP2FramePayloadToHTTPClientCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the client side.
- [`HTTP2FramePayloadToHTTPServerCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the server side.
- [`HTTPResumableUploadHandler`](Sources/NIOResumableUpload/HTTPResumableUploadHandler.swift) A `ChannelHandler` that translates HTTP resumable uploads to regular uploads.
- [`HTTPDrippingDownloadHandler`](Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift) A `ChannelHandler` that sends a configurable stream of zeroes to a client.
- [`HTTPReceiveDiscardHandler`](Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift) A `ChannelHandler` that receives arbitrary bytes from a client and discards them.
228 changes: 228 additions & 0 deletions Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes
import NIOCore
import NIOHTTPTypes

/// HTTP request handler sending a configurable stream of zeroes
public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler {
public typealias InboundIn = HTTPRequestPart
public typealias OutboundOut = HTTPResponsePart
public typealias OutboundIn = Never

// Predefine buffer to reuse over and over again when sending chunks to requester. NIO allows
// us to give it reference counted buffers. Reusing like this allows us to avoid allocations.
static let downloadBodyChunk = ByteBuffer(repeating: 0, count: 65536)

private var frequency: TimeAmount
private var size: Int
private var count: Int
private var delay: TimeAmount
private var code: HTTPResponse.Status

private enum Phase {
/// We haven't gotten the request head - nothing to respond to
case waitingOnHead
/// We got the request head and are delaying the response
case delayingResponse
/// We're dripping response chunks to the peer, tracking how many chunks we have left
case dripping(DrippingState)
/// We either sent everything to the client or the request ended short
case done
}

private struct DrippingState {
var chunksLeft: Int
var currentChunkBytesLeft: Int
}

private var phase = Phase.waitingOnHead
private var scheduled: Scheduled<Void>?
private var pendingRead = false
private var pendingWrite = false
private var activelyWritingChunk = false

/// Initializes an `HTTPDrippingDownloadHandler`.
/// - Parameters:
/// - count: How many chunks should be sent. Note that the underlying HTTP
/// stack may split or combine these chunks into data frames as
/// they see fit.
/// - size: How large each chunk should be
/// - frequency: How much time to wait between sending each chunk
/// - delay: How much time to wait before sending the first chunk
/// - code: What HTTP status code to send
public init(
count: Int = 0,
size: Int = 0,
frequency: TimeAmount = .zero,
delay: TimeAmount = .zero,
code: HTTPResponse.Status = .ok
) {
self.frequency = frequency
self.size = size
self.count = count
self.delay = delay
self.code = code
}

public convenience init?(queryArgsString: Substring.UTF8View) {
self.init()

let pairs = queryArgsString.split(separator: UInt8(ascii: "&"))
for pair in pairs {
var pairParts = pair.split(separator: UInt8(ascii: "="), maxSplits: 1).makeIterator()
guard let first = pairParts.next(), let second = pairParts.next() else {
continue
}

guard let secondNum = Int(Substring(second)) else {
return nil
}

switch Substring(first) {
case "frequency":
self.frequency = .seconds(Int64(secondNum))
case "size":
self.size = secondNum
case "count":
self.count = secondNum
case "delay":
self.delay = .seconds(Int64(secondNum))
case "code":
self.code = HTTPResponse.Status(code: secondNum)
default:
continue
}
}
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let part = self.unwrapInboundIn(data)

switch part {
case .head:
self.phase = .delayingResponse

if self.delay == TimeAmount.zero {
// If no delay, we might as well start responding immediately
self.onResponseDelayCompleted(context: context)
} else {
let this = NIOLoopBound(self, eventLoop: context.eventLoop)
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
self.scheduled = context.eventLoop.scheduleTask(in: self.delay) {
this.value.onResponseDelayCompleted(context: loopBoundContext.value)
}
}
case .body, .end:
return
}
}

private func onResponseDelayCompleted(context: ChannelHandlerContext) {
guard case .delayingResponse = self.phase else {
return
}

var head = HTTPResponse(status: self.code)

// If the length isn't too big, let's include a content length header
if case (let contentLength, false) = self.size.multipliedReportingOverflow(by: self.count) {
head.headerFields = HTTPFields(dictionaryLiteral: (.contentLength, "\(contentLength)"))
}

context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
self.phase = .dripping(
DrippingState(
chunksLeft: self.count,
currentChunkBytesLeft: self.size
)
)

self.writeChunk(context: context)
}

public func channelInactive(context: ChannelHandlerContext) {
self.phase = .done
self.scheduled?.cancel()
}

public func channelWritabilityChanged(context: ChannelHandlerContext) {
if case .dripping = self.phase, self.pendingWrite && context.channel.isWritable {
self.writeChunk(context: context)
}
}

private func writeChunk(context: ChannelHandlerContext) {
// Make sure we don't accidentally reenter
if self.activelyWritingChunk {
return
}
self.activelyWritingChunk = true
defer {
self.activelyWritingChunk = false
}

// If we're not dripping the response body, there's nothing to do
guard case .dripping(var drippingState) = self.phase else {
return
}

// If we've sent all chunks, send end and be done
if drippingState.chunksLeft < 1 {
self.phase = .done
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return
}

var dataWritten = false
while drippingState.currentChunkBytesLeft > 0, context.channel.isWritable {
let toSend = min(drippingState.currentChunkBytesLeft, HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes)
let buffer = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice(
at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex,
length: toSend
)!
context.write(self.wrapOutboundOut(.body(buffer)), promise: nil)
drippingState.currentChunkBytesLeft -= toSend
dataWritten = true
}
if dataWritten {
context.flush()
}

// If we weren't able to send the full chunk, it's because the channel isn't writable. We yield until it is
if drippingState.currentChunkBytesLeft > 0 {
self.pendingWrite = true
self.phase = .dripping(drippingState)
return
}

// We sent the full chunk. If we have no more chunks to write, we're done!
drippingState.chunksLeft -= 1
if drippingState.chunksLeft == 0 {
self.phase = .done
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return
}

// More chunks to write.. Kick off timer
drippingState.currentChunkBytesLeft = self.size
self.phase = .dripping(drippingState)
let this = NIOLoopBound(self, eventLoop: context.eventLoop)
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
self.scheduled = context.eventLoop.scheduleTask(in: self.frequency) {
this.value.writeChunk(context: loopBoundContext.value)
}
}
}
90 changes: 90 additions & 0 deletions Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes
import NIOCore
import NIOHTTPTypes

/// HTTP request handler that receives arbitrary bytes and discards them
public final class HTTPReceiveDiscardHandler: ChannelInboundHandler {

public typealias InboundIn = HTTPRequestPart
public typealias OutboundOut = HTTPResponsePart

private let expectation: Int?
private var expectationViolated = false
private var received = 0

/// Initializes `HTTPReceiveDiscardHandler`
/// - Parameter expectation: how many bytes should be expected. If more
/// bytes are received than expected, an error status code will
/// be sent to the client
public init(expectation: Int?) {
self.expectation = expectation
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let part = self.unwrapInboundIn(data)

switch part {
case .head:
return
case .body(let buffer):
self.received += buffer.readableBytes

// If the expectation is violated, send 4xx
if let expectation = self.expectation, self.received > expectation {
self.onExpectationViolated(context: context, expectation: expectation)
}
case .end:
if self.expectationViolated {
// Already flushed a response, nothing else to do
return
}

if let expectation = self.expectation, self.received != expectation {
self.onExpectationViolated(context: context, expectation: expectation)
return
}

let responseBody = ByteBuffer(string: "Received \(self.received) bytes")
self.writeSimpleResponse(context: context, status: .ok, body: responseBody)
}
}

private func onExpectationViolated(context: ChannelHandlerContext, expectation: Int) {
self.expectationViolated = true

let body = ByteBuffer(
string:
"Received in excess of expectation; expected(\(expectation)) received(\(self.received))"
)
self.writeSimpleResponse(context: context, status: .ok, body: body)
}

private func writeSimpleResponse(
context: ChannelHandlerContext,
status: HTTPResponse.Status,
body: ByteBuffer
) {
let bodyLen = body.readableBytes
let responseHead = HTTPResponse(
status: status,
headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)"))
)
context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
context.write(self.wrapOutboundOut(.body(body)), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
Loading

0 comments on commit e73abfd

Please sign in to comment.