diff --git a/.spi.yml b/.spi.yml index b65d0fd9..38cb5aa9 100644 --- a/.spi.yml +++ b/.spi.yml @@ -1,4 +1,4 @@ version: 1 builder: configs: - - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2] + - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload] diff --git a/Package.swift b/Package.swift index 56e57858..bc01785d 100644 --- a/Package.swift +++ b/Package.swift @@ -3,7 +3,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors +// Copyright (c) 2017-2024 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -170,6 +170,33 @@ var targets: [PackageDescription.Target] = [ "NIOHTTPTypesHTTP2" ] ), + .target( + name: "NIOResumableUpload", + dependencies: [ + "NIOHTTPTypes", + .product(name: "HTTPTypes", package: "swift-http-types"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "StructuredFieldValues", package: "swift-http-structured-headers"), + .product(name: "Atomics", package: "swift-atomics"), + ] + ), + .executableTarget( + name: "NIOResumableUploadDemo", + dependencies: [ + "NIOResumableUpload", + "NIOHTTPTypesHTTP1", + .product(name: "HTTPTypes", package: "swift-http-types"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), + ] + ), + .testTarget( + name: "NIOResumableUploadTests", + dependencies: [ + "NIOResumableUpload", + .product(name: "NIOEmbedded", package: "swift-nio"), + ] + ), ] let package = Package( @@ -181,11 +208,14 @@ let package = Package( .library(name: "NIOHTTPTypes", targets: ["NIOHTTPTypes"]), .library(name: "NIOHTTPTypesHTTP1", targets: ["NIOHTTPTypesHTTP1"]), .library(name: "NIOHTTPTypesHTTP2", targets: ["NIOHTTPTypesHTTP2"]), + .library(name: "NIOResumableUpload", targets: ["NIOResumableUpload"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.67.0"), .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"), - .package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"), + .package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"), + .package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"), ], targets: targets ) diff --git a/README.md b/README.md index 67145547..a400b17b 100644 --- a/README.md +++ b/README.md @@ -58,3 +58,4 @@ On the [`nio-extras-0.1`](https://github.com/apple/swift-nio-extras/tree/nio-ext - [`HTTPToHTTP1ServerCodec`](Sources/NIOHTTPTypesHTTP1/HTTPToHTTP1Codec.swift) A `ChannelHandler` that translates shared HTTP types into HTTP/1 messages for the server side for compatibility purposes. - [`HTTP2FramePayloadToHTTPClientCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the client side. - [`HTTP2FramePayloadToHTTPServerCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the server side. +- [`HTTPResumableUploadHandler`](Sources/NIOResumableUpload/HTTPResumableUploadHandler.swift) A `ChannelHandler` that translates HTTP resumable uploads to regular uploads. diff --git a/Sources/NIOResumableUpload/HTTPResumableUpload.swift b/Sources/NIOResumableUpload/HTTPResumableUpload.swift new file mode 100644 index 00000000..c694a6c5 --- /dev/null +++ b/Sources/NIOResumableUpload/HTTPResumableUpload.swift @@ -0,0 +1,490 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2023-2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOConcurrencyHelpers +import NIOCore +import NIOHTTPTypes + +/// `HTTPResumableUpload` tracks a logical upload. It manages an `HTTPResumableUploadChannel` and +/// connects a series of `HTTPResumableUploadHandler` objects to this channel. +final class HTTPResumableUpload { + private let context: HTTPResumableUploadContext + private let channelConfigurator: (Channel) -> Void + + private var eventLoop: EventLoop! + private var uploadHandler: HTTPResumableUploadHandler? + private let uploadHandlerChannel: NIOLockedValueBox = .init(nil) + private var uploadChannel: HTTPResumableUploadChannel? + + /// The resumption path containing the unique token identifying the upload. + private(set) var resumePath: String? + /// The current upload offset. + private var offset: Int64 = 0 + /// The total length of the upload (if known). + private var uploadLength: Int64? + /// The current request is an upload creation request. + private var requestIsCreation: Bool = false + /// The end of the current request is the end of the upload. + private var requestIsComplete: Bool = true + /// Whether the request is OPTIONS + private var requestIsOptions: Bool = false + /// The interop version of the current request + private var interopVersion: HTTPResumableUploadProtocol.InteropVersion = .latest + /// Whether you have received the entire upload. + private var uploadComplete: Bool = false + /// The response has started. + private var responseStarted: Bool = false + /// The child channel enqueued a read while no upload handler was present. + private var pendingRead: Bool = false + /// Last error that the upload handler delivered. + private var pendingError: Error? + /// Idle time since the last upload handler detached. + private var idleTimer: Scheduled? + + init( + context: HTTPResumableUploadContext, + channelConfigurator: @escaping (Channel) -> Void + ) { + self.context = context + self.channelConfigurator = channelConfigurator + } + + private func createChannel(handler: HTTPResumableUploadHandler, parent: Channel) -> HTTPResumableUploadChannel { + let channel = HTTPResumableUploadChannel( + upload: self, + parent: parent, + channelConfigurator: self.channelConfigurator + ) + channel.start() + self.uploadChannel = channel + return channel + } + + private func destroyChannel(error: Error?) { + if let uploadChannel = self.uploadChannel { + self.context.stopUpload(self) + self.uploadChannel = nil + uploadChannel.end(error: error) + } + } + + private func respondAndDetach(_ response: HTTPResponse, handler: HTTPResumableUploadHandler) { + handler.write(.head(response), promise: nil) + handler.writeAndFlush(.end(nil), promise: nil) + if handler === self.uploadHandler { + detachUploadHandler(close: false) + } + } +} + +// For `HTTPResumableUploadHandler`. +extension HTTPResumableUpload { + /// `HTTPResumableUpload` runs on the same event loop as the initial upload handler that started the upload. + /// - Parameter eventLoop: The event loop to schedule work in. + func scheduleOnEventLoop(_ eventLoop: EventLoop) { + eventLoop.assertInEventLoop() + assert(self.eventLoop == nil) + self.eventLoop = eventLoop + } + + private func runInEventLoop(_ work: @escaping () -> Void) { + if self.eventLoop.inEventLoop { + work() + } else { + self.eventLoop.execute(work) + } + } + + private func runInEventLoop(checkHandler handler: HTTPResumableUploadHandler, _ work: @escaping () -> Void) { + self.runInEventLoop { + if self.uploadHandler === handler { + work() + } + } + } + + func attachUploadHandler(_ handler: HTTPResumableUploadHandler, channel: Channel) { + self.eventLoop.preconditionInEventLoop() + + self.pendingError = nil + self.idleTimer?.cancel() + self.idleTimer = nil + + self.uploadHandler = handler + self.uploadHandlerChannel.withLockedValue { $0 = channel } + self.uploadChannel?.writabilityChanged() + + if self.pendingRead { + self.pendingRead = false + handler.read() + } + } + + private func detachUploadHandler(close: Bool) { + self.eventLoop.preconditionInEventLoop() + + if let uploadHandler = self.uploadHandler { + self.uploadHandler = nil + self.uploadHandlerChannel.withLockedValue { $0 = nil } + self.uploadChannel?.writabilityChanged() + if close { + uploadHandler.close(mode: .all, promise: nil) + } + uploadHandler.detach() + + if self.uploadChannel != nil { + self.idleTimer?.cancel() + self.idleTimer = self.eventLoop.scheduleTask(in: self.context.timeout) { + let error = self.pendingError ?? HTTPResumableUploadError.timeoutWaitingForResumption + self.uploadChannel?.end(error: error) + self.uploadChannel = nil + } + } + } + } + + private func offsetRetrieving( + otherHandler: HTTPResumableUploadHandler, + version: HTTPResumableUploadProtocol.InteropVersion + ) { + self.runInEventLoop { + self.detachUploadHandler(close: true) + let response = HTTPResumableUploadProtocol.offsetRetrievingResponse( + offset: self.offset, + complete: self.uploadComplete, + version: version + ) + self.respondAndDetach(response, handler: otherHandler) + } + } + + private func saveUploadLength(complete: Bool, contentLength: Int64?, uploadLength: Int64?) -> Bool { + let computedUploadLength = complete ? contentLength.map { self.offset + $0 } : nil + if let knownUploadLength = self.uploadLength { + if let computedUploadLength, knownUploadLength != computedUploadLength { + return false + } + } else { + self.uploadLength = computedUploadLength + } + if let knownUploadLength = self.uploadLength { + if let uploadLength, knownUploadLength != uploadLength { + return false + } + } else { + self.uploadLength = uploadLength + } + return true + } + + private func uploadAppending( + otherHandler: HTTPResumableUploadHandler, + channel: Channel, + offset: Int64, + complete: Bool, + contentLength: Int64?, + uploadLength: Int64?, + version: HTTPResumableUploadProtocol.InteropVersion + ) { + self.runInEventLoop { + let conflict: Bool + if self.uploadHandler == nil && self.offset == offset && !self.responseStarted { + conflict = !self.saveUploadLength( + complete: complete, + contentLength: contentLength, + uploadLength: uploadLength + ) + } else { + conflict = true + } + guard !conflict else { + self.detachUploadHandler(close: true) + self.destroyChannel(error: HTTPResumableUploadError.badResumption) + let response = HTTPResumableUploadProtocol.conflictResponse( + offset: self.offset, + complete: self.uploadComplete, + version: version + ) + self.respondAndDetach(response, handler: otherHandler) + return + } + self.requestIsCreation = false + self.requestIsComplete = complete + self.interopVersion = version + self.attachUploadHandler(otherHandler, channel: channel) + } + } + + private func uploadCancellation() { + self.runInEventLoop { + self.detachUploadHandler(close: true) + self.destroyChannel(error: HTTPResumableUploadError.uploadCancelled) + } + } + + private func receiveHead(handler: HTTPResumableUploadHandler, channel: Channel, request: HTTPRequest) { + self.eventLoop.preconditionInEventLoop() + + do { + guard let (type, version) = try HTTPResumableUploadProtocol.identifyRequest(request, in: self.context) + else { + let channel = self.createChannel(handler: handler, parent: channel) + channel.receive(.head(request)) + return + } + self.interopVersion = version + switch type { + case .uploadCreation(let complete, let contentLength, let uploadLength): + self.requestIsCreation = true + self.requestIsComplete = complete + self.uploadLength = uploadLength + if !self.saveUploadLength(complete: complete, contentLength: contentLength, uploadLength: uploadLength) + { + let response = HTTPResumableUploadProtocol.conflictResponse( + offset: self.offset, + complete: self.uploadComplete, + version: version + ) + self.respondAndDetach(response, handler: handler) + return + } + let resumePath = self.context.startUpload(self) + self.resumePath = resumePath + + let informationalResponse = HTTPResumableUploadProtocol.featureDetectionResponse( + resumePath: resumePath, + in: self.context, + version: version + ) + handler.writeAndFlush(.head(informationalResponse), promise: nil) + + let strippedRequest = HTTPResumableUploadProtocol.stripRequest(request) + let channel = self.createChannel(handler: handler, parent: channel) + channel.receive(.head(strippedRequest)) + case .offsetRetrieving: + if let path = request.path, let upload = self.context.findUpload(path: path) { + self.uploadHandler = nil + self.uploadHandlerChannel.withLockedValue { $0 = nil } + upload.offsetRetrieving(otherHandler: handler, version: version) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .uploadAppending(let offset, let complete, let contentLength, let uploadLength): + if let path = request.path, let upload = self.context.findUpload(path: path) { + handler.upload = upload + self.uploadHandler = nil + self.uploadHandlerChannel.withLockedValue { $0 = nil } + upload.uploadAppending( + otherHandler: handler, + channel: channel, + offset: offset, + complete: complete, + contentLength: contentLength, + uploadLength: uploadLength, + version: version + ) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .uploadCancellation: + if let path = request.path, let upload = self.context.findUpload(path: path) { + upload.uploadCancellation() + let response = HTTPResumableUploadProtocol.cancelledResponse(version: version) + self.respondAndDetach(response, handler: handler) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .options: + self.requestIsOptions = true + let channel = self.createChannel(handler: handler, parent: channel) + channel.receive(.head(request)) + } + } catch { + let response = HTTPResumableUploadProtocol.badRequestResponse() + self.respondAndDetach(response, handler: handler) + } + } + + private func receiveBody(handler: HTTPResumableUploadHandler, body: ByteBuffer) { + self.eventLoop.preconditionInEventLoop() + + self.offset += Int64(body.readableBytes) + + if let uploadLength = self.uploadLength, self.offset > uploadLength { + let response = HTTPResumableUploadProtocol.conflictResponse( + offset: self.offset, + complete: self.uploadComplete, + version: self.interopVersion + ) + self.respondAndDetach(response, handler: handler) + return + } + self.uploadChannel?.receive(.body(body)) + } + + private func receiveEnd(handler: HTTPResumableUploadHandler, trailers: HTTPFields?) { + self.eventLoop.preconditionInEventLoop() + + if let resumePath = self.resumePath { + if self.requestIsComplete { + self.uploadComplete = true + self.uploadChannel?.receive(.end(trailers)) + } else { + let response = HTTPResumableUploadProtocol.incompleteResponse( + offset: self.offset, + resumePath: resumePath, + forUploadCreation: self.requestIsCreation, + in: self.context, + version: self.interopVersion + ) + self.respondAndDetach(response, handler: handler) + } + } else { + self.uploadChannel?.receive(.end(trailers)) + } + } + + func receive(handler: HTTPResumableUploadHandler, channel: Channel, part: HTTPRequestPart) { + self.runInEventLoop(checkHandler: handler) { + switch part { + case .head(let request): + self.receiveHead(handler: handler, channel: channel, request: request) + case .body(let body): + self.receiveBody(handler: handler, body: body) + case .end(let trailers): + self.receiveEnd(handler: handler, trailers: trailers) + } + } + } + + func receiveComplete(handler: HTTPResumableUploadHandler) { + self.runInEventLoop(checkHandler: handler) { + self.uploadChannel?.receiveComplete() + } + } + + func writabilityChanged(handler: HTTPResumableUploadHandler) { + self.runInEventLoop(checkHandler: handler) { + self.uploadChannel?.writabilityChanged() + } + } + + func end(handler: HTTPResumableUploadHandler, error: Error?) { + self.runInEventLoop(checkHandler: handler) { + if !self.uploadComplete && self.resumePath != nil { + self.pendingError = error + self.detachUploadHandler(close: false) + } else { + self.destroyChannel(error: error) + self.detachUploadHandler(close: false) + } + } + } +} + +// For `HTTPResumableUploadChannel`. +extension HTTPResumableUpload { + var parentChannel: Channel? { + self.uploadHandlerChannel.withLockedValue { $0 } + } + + func write(_ part: HTTPResponsePart, promise: EventLoopPromise?) { + self.eventLoop.preconditionInEventLoop() + + guard let uploadHandler = self.uploadHandler else { + promise?.fail(HTTPResumableUploadError.parentNotPresent) + self.destroyChannel(error: HTTPResumableUploadError.parentNotPresent) + return + } + + self.responseStarted = true + if let resumePath = self.resumePath { + switch part { + case .head(let head): + let response = HTTPResumableUploadProtocol.processResponse( + head, + offset: self.offset, + resumePath: resumePath, + forUploadCreation: self.requestIsCreation, + in: self.context, + version: self.interopVersion + ) + uploadHandler.write(.head(response), promise: promise) + case .body, .end: + uploadHandler.write(part, promise: promise) + } + } else { + if self.requestIsOptions { + switch part { + case .head(let head): + let response = HTTPResumableUploadProtocol.processOptionsResponse(head) + uploadHandler.write(.head(response), promise: promise) + case .body, .end: + uploadHandler.write(part, promise: promise) + } + } else { + uploadHandler.write(part, promise: promise) + } + } + } + + func flush() { + self.eventLoop.preconditionInEventLoop() + + guard let uploadHandler = self.uploadHandler else { + self.destroyChannel(error: HTTPResumableUploadError.parentNotPresent) + return + } + + uploadHandler.flush() + } + + func read() { + self.eventLoop.preconditionInEventLoop() + + if let handler = self.uploadHandler { + handler.read() + } else { + self.pendingRead = true + } + } + + func close(mode: CloseMode, promise: EventLoopPromise?) { + self.eventLoop.preconditionInEventLoop() + + precondition(mode != .input) + self.destroyChannel(error: nil) + self.uploadHandler?.close(mode: mode, promise: promise) + self.uploadHandler?.detach() + self.uploadHandler = nil + self.idleTimer?.cancel() + self.idleTimer = nil + } +} + +/// Errors produced by resumable upload. +enum HTTPResumableUploadError: Error { + /// An upload cancelation request received. + case uploadCancelled + /// No upload handler is attached. + case parentNotPresent + /// Timed out waiting for an upload handler to attach. + case timeoutWaitingForResumption + /// A resumption request from the client is invalid. + case badResumption +} diff --git a/Sources/NIOResumableUpload/HTTPResumableUploadChannel.swift b/Sources/NIOResumableUpload/HTTPResumableUploadChannel.swift new file mode 100644 index 00000000..16125dde --- /dev/null +++ b/Sources/NIOResumableUpload/HTTPResumableUploadChannel.swift @@ -0,0 +1,254 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2023-2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +import NIOCore +import NIOHTTPTypes + +/// The child channel that persists across upload resumption attempts, delivering data as if it is +/// a single HTTP upload. +final class HTTPResumableUploadChannel: Channel, ChannelCore { + let upload: HTTPResumableUpload + + let allocator: ByteBufferAllocator + + private let closePromise: EventLoopPromise + + var closeFuture: EventLoopFuture { + self.closePromise.futureResult + } + + private var _pipeline: ChannelPipeline! + + var pipeline: ChannelPipeline { + self._pipeline + } + + var localAddress: SocketAddress? { + self.parent?.localAddress + } + + var remoteAddress: SocketAddress? { + self.parent?.remoteAddress + } + + var parent: Channel? { + self.upload.parentChannel + } + + var isWritable: Bool { + self.parent?.isWritable ?? false + } + + private let _isActiveAtomic: ManagedAtomic = .init(true) + + var isActive: Bool { + self._isActiveAtomic.load(ordering: .relaxed) + } + + var _channelCore: ChannelCore { + self + } + + let eventLoop: EventLoop + + private var autoRead: Bool + + init( + upload: HTTPResumableUpload, + parent: Channel, + channelConfigurator: (Channel) -> Void + ) { + self.upload = upload + self.allocator = parent.allocator + self.closePromise = parent.eventLoop.makePromise() + self.eventLoop = parent.eventLoop + // Only support Channels that implement sync options + self.autoRead = try! parent.syncOptions!.getOption(ChannelOptions.autoRead) + self._pipeline = ChannelPipeline(channel: self) + channelConfigurator(self) + } + + func setOption