diff --git a/Sources/NIOResumableUpload/HTTPResumableUpload.swift b/Sources/NIOResumableUpload/HTTPResumableUpload.swift index c6e1e4f6..9264c9fd 100644 --- a/Sources/NIOResumableUpload/HTTPResumableUpload.swift +++ b/Sources/NIOResumableUpload/HTTPResumableUpload.swift @@ -32,12 +32,14 @@ final class HTTPResumableUpload { private(set) var resumePath: String? /// The current upload offset. private var offset: Int64 = 0 - /// The total size of the upload (if known). - private var uploadSize: Int64? + /// The total length of the upload (if known). + private var uploadLength: Int64? /// The current request is an upload creation request. private var requestIsCreation: Bool = false /// The end of the current request is the end of the upload. private var requestIsComplete: Bool = true + /// The interop version of the current request + private var interopVersion: HTTPResumableUploadProtocol.InteropVersion = .latest /// Whether you have received the entire upload. private var uploadComplete: Bool = false /// The response has started. @@ -150,39 +152,57 @@ extension HTTPResumableUpload { } } - private func offsetRetrieving(otherHandler: HTTPResumableUploadHandler) { + private func offsetRetrieving( + otherHandler: HTTPResumableUploadHandler, + version: HTTPResumableUploadProtocol.InteropVersion + ) { self.runInEventLoop { self.detachUploadHandler(close: true) let response = HTTPResumableUploadProtocol.offsetRetrievingResponse( offset: self.offset, - complete: self.uploadComplete + complete: self.uploadComplete, + version: version ) self.respondAndDetach(response, handler: otherHandler) } } + private func saveUploadLength(complete: Bool, contentLength: Int64?, uploadLength: Int64?) -> Bool { + let computedUploadLength = complete ? contentLength.map { self.offset + $0 } : nil + if let knownUploadLength = self.uploadLength { + if let computedUploadLength, knownUploadLength != computedUploadLength { + return false + } + } else { + self.uploadLength = computedUploadLength + } + if let knownUploadLength = self.uploadLength { + if let uploadLength, knownUploadLength != uploadLength { + return false + } + } else { + self.uploadLength = uploadLength + } + return true + } + private func uploadAppending( otherHandler: HTTPResumableUploadHandler, channel: Channel, offset: Int64, + complete: Bool, contentLength: Int64?, - complete: Bool + uploadLength: Int64?, + version: HTTPResumableUploadProtocol.InteropVersion ) { self.runInEventLoop { let conflict: Bool if self.uploadHandler == nil && self.offset == offset && !self.responseStarted { - if let uploadSize = self.uploadSize { - if let contentLength { - conflict = complete && uploadSize != offset + contentLength - } else { - conflict = true - } - } else { - if let contentLength, complete { - self.uploadSize = offset + contentLength - } - conflict = false - } + conflict = !self.saveUploadLength( + complete: complete, + contentLength: contentLength, + uploadLength: uploadLength + ) } else { conflict = true } @@ -191,13 +211,15 @@ extension HTTPResumableUpload { self.destroyChannel(error: HTTPResumableUploadError.badResumption) let response = HTTPResumableUploadProtocol.conflictResponse( offset: self.offset, - complete: self.uploadComplete + complete: self.uploadComplete, + version: version ) self.respondAndDetach(response, handler: otherHandler) return } self.requestIsCreation = false self.requestIsComplete = complete + self.interopVersion = version self.attachUploadHandler(otherHandler, channel: channel) } } @@ -212,70 +234,102 @@ extension HTTPResumableUpload { private func receiveHead(handler: HTTPResumableUploadHandler, channel: Channel, request: HTTPRequest) { self.eventLoop.preconditionInEventLoop() - switch HTTPResumableUploadProtocol.identifyRequest(request, in: self.context) { - case .notSupported: - let channel = self.createChannel(handler: handler, parent: channel) - channel.receive(.head(request)) - case .uploadCreation(let complete, let contentLength): - self.requestIsCreation = true - self.requestIsComplete = complete - self.uploadSize = complete ? contentLength : nil - let resumePath = self.context.startUpload(self) - self.resumePath = resumePath - - let informationalResponse = HTTPResumableUploadProtocol.featureDetectionResponse( - resumePath: resumePath, - in: self.context - ) - handler.writeAndFlush(.head(informationalResponse), promise: nil) - - let strippedRequest = HTTPResumableUploadProtocol.stripRequest(request) - let channel = self.createChannel(handler: handler, parent: channel) - channel.receive(.head(strippedRequest)) - case .offsetRetrieving: - if let path = request.path, let upload = self.context.findUpload(path: path) { - self.uploadHandler = nil - self.uploadHandlerChannel.withLockedValue { $0 = nil } - upload.offsetRetrieving(otherHandler: handler) - } else { - let response = HTTPResumableUploadProtocol.notFoundResponse() - self.respondAndDetach(response, handler: handler) + do { + guard let (type, version) = try HTTPResumableUploadProtocol.identifyRequest(request, in: self.context) + else { + let channel = self.createChannel(handler: handler, parent: channel) + channel.receive(.head(request)) + return } - case .uploadAppending(let offset, let complete, let contentLength): - if let path = request.path, let upload = self.context.findUpload(path: path) { - handler.upload = upload - self.uploadHandler = nil - self.uploadHandlerChannel.withLockedValue { $0 = nil } - upload.uploadAppending( - otherHandler: handler, - channel: channel, - offset: offset, - contentLength: contentLength, - complete: complete + self.interopVersion = version + switch type { + case .uploadCreation(let complete, let contentLength, let uploadLength): + self.requestIsCreation = true + self.requestIsComplete = complete + self.uploadLength = uploadLength + if !self.saveUploadLength(complete: complete, contentLength: contentLength, uploadLength: uploadLength) + { + let response = HTTPResumableUploadProtocol.conflictResponse( + offset: self.offset, + complete: self.uploadComplete, + version: version + ) + self.respondAndDetach(response, handler: handler) + return + } + let resumePath = self.context.startUpload(self) + self.resumePath = resumePath + + let informationalResponse = HTTPResumableUploadProtocol.featureDetectionResponse( + resumePath: resumePath, + in: self.context, + version: version ) - } else { - let response = HTTPResumableUploadProtocol.notFoundResponse() - self.respondAndDetach(response, handler: handler) - } - case .uploadCancellation: - if let path = request.path, let upload = self.context.findUpload(path: path) { - upload.uploadCancellation() - let response = HTTPResumableUploadProtocol.cancelledResponse() - self.respondAndDetach(response, handler: handler) - } else { - let response = HTTPResumableUploadProtocol.notFoundResponse() + handler.writeAndFlush(.head(informationalResponse), promise: nil) + + let strippedRequest = HTTPResumableUploadProtocol.stripRequest(request) + let channel = self.createChannel(handler: handler, parent: channel) + channel.receive(.head(strippedRequest)) + case .offsetRetrieving: + if let path = request.path, let upload = self.context.findUpload(path: path) { + self.uploadHandler = nil + self.uploadHandlerChannel.withLockedValue { $0 = nil } + upload.offsetRetrieving(otherHandler: handler, version: version) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .uploadAppending(let offset, let complete, let contentLength, let uploadLength): + if let path = request.path, let upload = self.context.findUpload(path: path) { + handler.upload = upload + self.uploadHandler = nil + self.uploadHandlerChannel.withLockedValue { $0 = nil } + upload.uploadAppending( + otherHandler: handler, + channel: channel, + offset: offset, + complete: complete, + contentLength: contentLength, + uploadLength: uploadLength, + version: version + ) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .uploadCancellation: + if let path = request.path, let upload = self.context.findUpload(path: path) { + upload.uploadCancellation() + let response = HTTPResumableUploadProtocol.cancelledResponse(version: version) + self.respondAndDetach(response, handler: handler) + } else { + let response = HTTPResumableUploadProtocol.notFoundResponse(version: version) + self.respondAndDetach(response, handler: handler) + } + case .options: + let response = HTTPResumableUploadProtocol.optionsResponse(version: version) self.respondAndDetach(response, handler: handler) } - case .invalid: + } catch { let response = HTTPResumableUploadProtocol.badRequestResponse() self.respondAndDetach(response, handler: handler) } } - private func receiveBody(body: ByteBuffer) { + private func receiveBody(handler: HTTPResumableUploadHandler, body: ByteBuffer) { self.eventLoop.preconditionInEventLoop() self.offset += Int64(body.readableBytes) + + if let uploadLength = self.uploadLength, self.offset > uploadLength { + let response = HTTPResumableUploadProtocol.conflictResponse( + offset: self.offset, + complete: self.uploadComplete, + version: self.interopVersion + ) + self.respondAndDetach(response, handler: handler) + return + } self.uploadChannel?.receive(.body(body)) } @@ -291,7 +345,8 @@ extension HTTPResumableUpload { offset: self.offset, resumePath: resumePath, forUploadCreation: self.requestIsCreation, - in: self.context + in: self.context, + version: self.interopVersion ) self.respondAndDetach(response, handler: handler) } @@ -306,7 +361,7 @@ extension HTTPResumableUpload { case .head(let request): self.receiveHead(handler: handler, channel: channel, request: request) case .body(let body): - self.receiveBody(body: body) + self.receiveBody(handler: handler, body: body) case .end(let trailers): self.receiveEnd(handler: handler, trailers: trailers) } @@ -362,7 +417,8 @@ extension HTTPResumableUpload { offset: self.offset, resumePath: resumePath, forUploadCreation: self.requestIsCreation, - in: self.context + in: self.context, + version: self.interopVersion ) uploadHandler.write(.head(response), promise: promise) case .body, .end: diff --git a/Sources/NIOResumableUpload/HTTPResumableUploadProtocol.swift b/Sources/NIOResumableUpload/HTTPResumableUploadProtocol.swift index 826fe878..9657b4b8 100644 --- a/Sources/NIOResumableUpload/HTTPResumableUploadProtocol.swift +++ b/Sources/NIOResumableUpload/HTTPResumableUploadProtocol.swift @@ -20,18 +20,32 @@ import StructuredFieldValues /// Draft document: /// https://datatracker.ietf.org/doc/draft-ietf-httpbis-resumable-upload/01/ enum HTTPResumableUploadProtocol { - private static let currentInteropVersion = "3" + enum InteropVersion: Int, Comparable { + case v3 = 3 + case v5 = 5 + case v6 = 6 - static func featureDetectionResponse(resumePath: String, in context: HTTPResumableUploadContext) -> HTTPResponse { + static let latest: Self = .v6 + + static func < (lhs: Self, rhs: Self) -> Bool { + lhs.rawValue < rhs.rawValue + } + } + + static func featureDetectionResponse( + resumePath: String, + in context: HTTPResumableUploadContext, + version: InteropVersion + ) -> HTTPResponse { var response = HTTPResponse(status: .init(code: 104, reasonPhrase: "Upload Resumption Supported")) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" response.headerFields[.location] = context.origin + resumePath return response } - static func offsetRetrievingResponse(offset: Int64, complete: Bool) -> HTTPResponse { + static func offsetRetrievingResponse(offset: Int64, complete: Bool, version: InteropVersion) -> HTTPResponse { var response = HTTPResponse(status: .noContent) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" response.headerFields.uploadIncomplete = !complete response.headerFields.uploadOffset = offset response.headerFields[.cacheControl] = "no-store" @@ -42,10 +56,11 @@ enum HTTPResumableUploadProtocol { offset: Int64, resumePath: String, forUploadCreation: Bool, - in context: HTTPResumableUploadContext + in context: HTTPResumableUploadContext, + version: InteropVersion ) -> HTTPResponse { var response = HTTPResponse(status: .created) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" if forUploadCreation { response.headerFields[.location] = context.origin + resumePath } @@ -54,22 +69,29 @@ enum HTTPResumableUploadProtocol { return response } - static func cancelledResponse() -> HTTPResponse { + static func optionsResponse(version: InteropVersion) -> HTTPResponse { + var response = HTTPResponse(status: .ok) + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" + response.headerFields.uploadLimit = .init(minSize: 0) + return response + } + + static func cancelledResponse(version: InteropVersion) -> HTTPResponse { var response = HTTPResponse(status: .noContent) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" return response } - static func notFoundResponse() -> HTTPResponse { + static func notFoundResponse(version: InteropVersion) -> HTTPResponse { var response = HTTPResponse(status: .notFound) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" response.headerFields[.contentLength] = "0" return response } - static func conflictResponse(offset: Int64, complete: Bool) -> HTTPResponse { + static func conflictResponse(offset: Int64, complete: Bool, version: InteropVersion) -> HTTPResponse { var response = HTTPResponse(status: .conflict) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" response.headerFields.uploadIncomplete = !complete response.headerFields.uploadOffset = offset response.headerFields[.contentLength] = "0" @@ -78,55 +100,95 @@ enum HTTPResumableUploadProtocol { static func badRequestResponse() -> HTTPResponse { var response = HTTPResponse(status: .badRequest) - response.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + response.headerFields[.uploadDraftInteropVersion] = "\(InteropVersion.latest.rawValue)" response.headerFields[.contentLength] = "0" return response } enum RequestType { - case notSupported - case uploadCreation(complete: Bool, contentLength: Int64?) + case uploadCreation(complete: Bool, contentLength: Int64?, uploadLength: Int64?) case offsetRetrieving - case uploadAppending(offset: Int64, complete: Bool, contentLength: Int64?) + case uploadAppending(offset: Int64, complete: Bool, contentLength: Int64?, uploadLength: Int64?) case uploadCancellation - case invalid + case options + } + + enum InvalidRequestError: Error { + case unsupportedInteropVersion + case unknownMethod + case invalidPath + case missingHeaderField + case extraHeaderField } - static func identifyRequest(_ request: HTTPRequest, in context: HTTPResumableUploadContext) -> RequestType { - if request.headerFields[.uploadDraftInteropVersion] != self.currentInteropVersion { - return .notSupported + static func identifyRequest( + _ request: HTTPRequest, + in context: HTTPResumableUploadContext + ) throws -> (RequestType, InteropVersion)? { + guard let versionValue = request.headerFields[.uploadDraftInteropVersion] else { + return nil + } + guard let versionNumber = Int(versionValue), + let version = InteropVersion(rawValue: versionNumber) + else { + throw InvalidRequestError.unsupportedInteropVersion + } + let complete: Bool? + if version >= .v5 { + complete = request.headerFields.uploadComplete + } else { + complete = request.headerFields.uploadIncomplete.map { !$0 } } - let complete = request.headerFields.uploadIncomplete.map { !$0 } let offset = request.headerFields.uploadOffset let contentLength = request.headerFields[.contentLength].flatMap(Int64.init) + let uploadLength = request.headerFields.uploadLength + if request.method == .options { + guard complete == nil && offset == nil && uploadLength == nil else { + throw InvalidRequestError.extraHeaderField + } + return (.options, version) + } if let path = request.path, context.isResumption(path: path) { switch request.method { case .head: - guard complete == nil && offset == nil else { - return .invalid + guard complete == nil && offset == nil && uploadLength == nil else { + throw InvalidRequestError.extraHeaderField } - return .offsetRetrieving + return (.offsetRetrieving, version) case .patch: guard let offset else { - return .invalid + throw InvalidRequestError.missingHeaderField } - return .uploadAppending(offset: offset, complete: complete ?? true, contentLength: contentLength) + if version >= .v6 && request.headerFields[.contentType] != "application/partial-upload" { + throw InvalidRequestError.missingHeaderField + } + return ( + .uploadAppending( + offset: offset, + complete: complete ?? true, + contentLength: contentLength, + uploadLength: uploadLength + ), version + ) case .delete: - guard complete == nil && offset == nil else { - return .invalid + guard complete == nil && offset == nil && uploadLength == nil else { + throw InvalidRequestError.extraHeaderField } - return .uploadCancellation + return (.uploadCancellation, version) default: - return .invalid + throw InvalidRequestError.unknownMethod } } else { if let complete { if let offset, offset != 0 { - return .invalid + throw InvalidRequestError.invalidPath } - return .uploadCreation(complete: complete, contentLength: contentLength) + return ( + .uploadCreation(complete: complete, contentLength: contentLength, uploadLength: uploadLength), + version + ) } else { - return .notSupported + return nil } } } @@ -143,10 +205,11 @@ enum HTTPResumableUploadProtocol { offset: Int64, resumePath: String, forUploadCreation: Bool, - in context: HTTPResumableUploadContext + in context: HTTPResumableUploadContext, + version: InteropVersion ) -> HTTPResponse { var finalResponse = response - finalResponse.headerFields[.uploadDraftInteropVersion] = self.currentInteropVersion + finalResponse.headerFields[.uploadDraftInteropVersion] = "\(version.rawValue)" if forUploadCreation { finalResponse.headerFields[.location] = context.origin + resumePath } @@ -158,16 +221,48 @@ enum HTTPResumableUploadProtocol { extension HTTPField.Name { fileprivate static let uploadDraftInteropVersion = Self("Upload-Draft-Interop-Version")! + fileprivate static let uploadComplete = Self("Upload-Complete")! fileprivate static let uploadIncomplete = Self("Upload-Incomplete")! fileprivate static let uploadOffset = Self("Upload-Offset")! + fileprivate static let uploadLength = Self("Upload-Length")! + fileprivate static let uploadLimit = Self("Upload-Limit")! } extension HTTPFields { - private struct UploadIncompleteFieldValue: StructuredFieldValue { + private struct BoolFieldValue: StructuredFieldValue { static var structuredFieldType: StructuredFieldValues.StructuredFieldType { .item } var item: Bool } + fileprivate var uploadComplete: Bool? { + get { + guard let headerValue = self[.uploadComplete] else { + return nil + } + do { + let value = try StructuredFieldValueDecoder().decode( + BoolFieldValue.self, + from: Array(headerValue.utf8) + ) + return value.item + } catch { + return nil + } + } + + set { + if let newValue { + let value = String( + decoding: try! StructuredFieldValueEncoder().encode(BoolFieldValue(item: newValue)), + as: UTF8.self + ) + self[.uploadComplete] = value + } else { + self[.uploadComplete] = nil + } + } + } + fileprivate var uploadIncomplete: Bool? { get { guard let headerValue = self[.uploadIncomplete] else { @@ -175,7 +270,7 @@ extension HTTPFields { } do { let value = try StructuredFieldValueDecoder().decode( - UploadIncompleteFieldValue.self, + BoolFieldValue.self, from: Array(headerValue.utf8) ) return value.item @@ -187,7 +282,7 @@ extension HTTPFields { set { if let newValue { let value = String( - decoding: try! StructuredFieldValueEncoder().encode(UploadIncompleteFieldValue(item: newValue)), + decoding: try! StructuredFieldValueEncoder().encode(BoolFieldValue(item: newValue)), as: UTF8.self ) self[.uploadIncomplete] = value @@ -197,7 +292,7 @@ extension HTTPFields { } } - private struct UploadOffsetFieldValue: StructuredFieldValue { + private struct Int64FieldValue: StructuredFieldValue { static var structuredFieldType: StructuredFieldValues.StructuredFieldType { .item } var item: Int64 } @@ -209,7 +304,7 @@ extension HTTPFields { } do { let value = try StructuredFieldValueDecoder().decode( - UploadOffsetFieldValue.self, + Int64FieldValue.self, from: Array(headerValue.utf8) ) return value.item @@ -221,7 +316,7 @@ extension HTTPFields { set { if let newValue { let value = String( - decoding: try! StructuredFieldValueEncoder().encode(UploadOffsetFieldValue(item: newValue)), + decoding: try! StructuredFieldValueEncoder().encode(Int64FieldValue(item: newValue)), as: UTF8.self ) self[.uploadOffset] = value @@ -230,4 +325,79 @@ extension HTTPFields { } } } + + fileprivate var uploadLength: Int64? { + get { + guard let headerValue = self[.uploadLength] else { + return nil + } + do { + let value = try StructuredFieldValueDecoder().decode( + Int64FieldValue.self, + from: Array(headerValue.utf8) + ) + return value.item + } catch { + return nil + } + } + + set { + if let newValue { + let value = String( + decoding: try! StructuredFieldValueEncoder().encode(Int64FieldValue(item: newValue)), + as: UTF8.self + ) + self[.uploadLength] = value + } else { + self[.uploadLength] = nil + } + } + } + + fileprivate struct UploadLimitFieldValue: StructuredFieldValue { + static var structuredFieldType: StructuredFieldValues.StructuredFieldType { .dictionary } + var maxSize: Int64? + var minSize: Int64? + var maxAppendSize: Int64? + var minAppendSize: Int64? + var expires: Int64? + + enum CodingKeys: String, CodingKey { + case maxSize = "max-size" + case minSize = "min-size" + case maxAppendSize = "max-append-size" + case minAppendSize = "min-append-size" + case expires = "expires" + } + } + + fileprivate var uploadLimit: UploadLimitFieldValue? { + get { + guard let headerValue = self[.uploadLimit] else { + return nil + } + do { + let value = try StructuredFieldValueDecoder().decode( + UploadLimitFieldValue.self, + from: Array(headerValue.utf8) + ) + return value + } catch { + return nil + } + } + + set { + if let newValue { + let value = String( + decoding: try! StructuredFieldValueEncoder().encode(newValue), + as: UTF8.self + ) + self[.uploadLimit] = value + } else { + self[.uploadLimit] = nil + } + } + } } diff --git a/Tests/NIOResumableUploadTests/NIOResumableUploadTests.swift b/Tests/NIOResumableUploadTests/NIOResumableUploadTests.swift index 44b88492..c44f7e26 100644 --- a/Tests/NIOResumableUploadTests/NIOResumableUploadTests.swift +++ b/Tests/NIOResumableUploadTests/NIOResumableUploadTests.swift @@ -67,7 +67,33 @@ final class NIOResumableUploadTests: XCTestCase { XCTAssertTrue(try channel.finish().isClean) } - func testResumableUploadUninterrupted() throws { + func testOptions() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .options, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "6" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.end(nil)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status, .ok) + XCTAssertEqual(response.headerFields[.uploadLimit], "min-size=0") + guard let responsePart = try channel.readOutbound(as: HTTPResponsePart.self), case .end = responsePart else { + XCTFail("Part is not response end") + return + } + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadUninterruptedV3() throws { let channel = EmbeddedChannel() let recorder = InboundRecorder() @@ -99,7 +125,72 @@ final class NIOResumableUploadTests: XCTestCase { XCTAssertTrue(try channel.finish().isClean) } - func testResumableUploadInterrupted() throws { + func testResumableUploadUninterruptedV5() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "5" + request.headerFields[.uploadComplete] = "?1" + request.headerFields[.contentLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "Hello"))) + try channel.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 3) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "Hello"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.end(nil)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + XCTAssertNotNil(response.headerFields[.location]) + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadUninterruptedV6() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "6" + request.headerFields[.uploadComplete] = "?1" + request.headerFields[.contentLength] = "5" + request.headerFields[.uploadLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "Hello"))) + try channel.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 3) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "Hello"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.end(nil)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + XCTAssertNotNil(response.headerFields[.location]) + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadInterruptedV3() throws { let channel = EmbeddedChannel() let recorder = InboundRecorder() @@ -161,7 +252,136 @@ final class NIOResumableUploadTests: XCTestCase { XCTAssertTrue(try channel.finish().isClean) } - func testResumableUploadChunked() throws { + func testResumableUploadInterruptedV5() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "5" + request.headerFields[.uploadComplete] = "?1" + request.headerFields[.contentLength] = "5" + request.headerFields[.uploadLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "He"))) + channel.pipeline.fireErrorCaught(POSIXError(.ENOTCONN)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + let location = try XCTUnwrap(response.headerFields[.location]) + let resumptionPath = try XCTUnwrap(URLComponents(string: location)?.path) + + let channel2 = EmbeddedChannel() + try channel2.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request2 = HTTPRequest(method: .head, scheme: "https", authority: "example.com", path: resumptionPath) + request2.headerFields[.uploadDraftInteropVersion] = "3" + try channel2.writeInbound(HTTPRequestPart.head(request2)) + try channel2.writeInbound(HTTPRequestPart.end(nil)) + let responsePart2 = try channel2.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response2) = responsePart2 else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response2.status.code, 204) + XCTAssertEqual(response2.headerFields[.uploadOffset], "2") + XCTAssertEqual(try channel2.readOutbound(as: HTTPResponsePart.self), .end(nil)) + XCTAssertTrue(try channel2.finish().isClean) + + let channel3 = EmbeddedChannel() + try channel3.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request3 = HTTPRequest(method: .patch, scheme: "https", authority: "example.com", path: resumptionPath) + request3.headerFields[.uploadDraftInteropVersion] = "5" + request3.headerFields[.uploadComplete] = "?1" + request3.headerFields[.uploadOffset] = "2" + request3.headerFields[.contentLength] = "3" + request3.headerFields[.uploadLength] = "5" + try channel3.writeInbound(HTTPRequestPart.head(request3)) + try channel3.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "llo"))) + try channel3.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 4) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "He"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.body(ByteBuffer(string: "llo"))) + XCTAssertEqual(recorder.receivedFrames[3], HTTPRequestPart.end(nil)) + XCTAssertTrue(try channel3.finish().isClean) + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadInterruptedV6() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "6" + request.headerFields[.uploadComplete] = "?1" + request.headerFields[.contentLength] = "5" + request.headerFields[.uploadLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "He"))) + channel.pipeline.fireErrorCaught(POSIXError(.ENOTCONN)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + let location = try XCTUnwrap(response.headerFields[.location]) + let resumptionPath = try XCTUnwrap(URLComponents(string: location)?.path) + + let channel2 = EmbeddedChannel() + try channel2.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request2 = HTTPRequest(method: .head, scheme: "https", authority: "example.com", path: resumptionPath) + request2.headerFields[.uploadDraftInteropVersion] = "3" + try channel2.writeInbound(HTTPRequestPart.head(request2)) + try channel2.writeInbound(HTTPRequestPart.end(nil)) + let responsePart2 = try channel2.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response2) = responsePart2 else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response2.status.code, 204) + XCTAssertEqual(response2.headerFields[.uploadOffset], "2") + XCTAssertEqual(try channel2.readOutbound(as: HTTPResponsePart.self), .end(nil)) + XCTAssertTrue(try channel2.finish().isClean) + + let channel3 = EmbeddedChannel() + try channel3.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request3 = HTTPRequest(method: .patch, scheme: "https", authority: "example.com", path: resumptionPath) + request3.headerFields[.uploadDraftInteropVersion] = "6" + request3.headerFields[.uploadComplete] = "?1" + request3.headerFields[.uploadOffset] = "2" + request3.headerFields[.contentLength] = "3" + request3.headerFields[.uploadLength] = "5" + request3.headerFields[.contentType] = "application/partial-upload" + try channel3.writeInbound(HTTPRequestPart.head(request3)) + try channel3.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "llo"))) + try channel3.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 4) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "He"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.body(ByteBuffer(string: "llo"))) + XCTAssertEqual(recorder.receivedFrames[3], HTTPRequestPart.end(nil)) + XCTAssertTrue(try channel3.finish().isClean) + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadChunkedV3() throws { let channel = EmbeddedChannel() let recorder = InboundRecorder() @@ -230,10 +450,158 @@ final class NIOResumableUploadTests: XCTestCase { XCTAssertTrue(try channel3.finish().isClean) XCTAssertTrue(try channel.finish().isClean) } + + func testResumableUploadChunkedV5() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "5" + request.headerFields[.uploadComplete] = "?0" + request.headerFields[.contentLength] = "2" + request.headerFields[.uploadLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "He"))) + try channel.writeInbound(HTTPRequestPart.end(nil)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + let location = try XCTUnwrap(response.headerFields[.location]) + let resumptionPath = try XCTUnwrap(URLComponents(string: location)?.path) + + let finalResponsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let finalResponse) = finalResponsePart else { + XCTFail("Part is not final response headers") + return + } + XCTAssertEqual(finalResponse.status.code, 201) + XCTAssertEqual(try channel.readOutbound(as: HTTPResponsePart.self), .end(nil)) + + let channel2 = EmbeddedChannel() + try channel2.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request2 = HTTPRequest(method: .head, scheme: "https", authority: "example.com", path: resumptionPath) + request2.headerFields[.uploadDraftInteropVersion] = "5" + try channel2.writeInbound(HTTPRequestPart.head(request2)) + try channel2.writeInbound(HTTPRequestPart.end(nil)) + let responsePart2 = try channel2.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response2) = responsePart2 else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response2.status.code, 204) + XCTAssertEqual(response2.headerFields[.uploadOffset], "2") + XCTAssertEqual(try channel2.readOutbound(as: HTTPResponsePart.self), .end(nil)) + XCTAssertTrue(try channel2.finish().isClean) + + let channel3 = EmbeddedChannel() + try channel3.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request3 = HTTPRequest(method: .patch, scheme: "https", authority: "example.com", path: resumptionPath) + request3.headerFields[.uploadDraftInteropVersion] = "5" + request3.headerFields[.uploadComplete] = "?1" + request3.headerFields[.uploadOffset] = "2" + request3.headerFields[.contentLength] = "3" + request3.headerFields[.uploadLength] = "5" + try channel3.writeInbound(HTTPRequestPart.head(request3)) + try channel3.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "llo"))) + try channel3.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 4) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "He"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.body(ByteBuffer(string: "llo"))) + XCTAssertEqual(recorder.receivedFrames[3], HTTPRequestPart.end(nil)) + XCTAssertTrue(try channel3.finish().isClean) + XCTAssertTrue(try channel.finish().isClean) + } + + func testResumableUploadChunkedV6() throws { + let channel = EmbeddedChannel() + let recorder = InboundRecorder() + + let context = HTTPResumableUploadContext(origin: "https://example.com") + try channel.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [recorder])).wait() + + var request = HTTPRequest(method: .post, scheme: "https", authority: "example.com", path: "/") + request.headerFields[.uploadDraftInteropVersion] = "6" + request.headerFields[.uploadComplete] = "?0" + request.headerFields[.contentLength] = "2" + request.headerFields[.uploadLength] = "5" + try channel.writeInbound(HTTPRequestPart.head(request)) + try channel.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "He"))) + try channel.writeInbound(HTTPRequestPart.end(nil)) + + let responsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response) = responsePart else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response.status.code, 104) + let location = try XCTUnwrap(response.headerFields[.location]) + let resumptionPath = try XCTUnwrap(URLComponents(string: location)?.path) + + let finalResponsePart = try channel.readOutbound(as: HTTPResponsePart.self) + guard case .head(let finalResponse) = finalResponsePart else { + XCTFail("Part is not final response headers") + return + } + XCTAssertEqual(finalResponse.status.code, 201) + XCTAssertEqual(try channel.readOutbound(as: HTTPResponsePart.self), .end(nil)) + + let channel2 = EmbeddedChannel() + try channel2.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request2 = HTTPRequest(method: .head, scheme: "https", authority: "example.com", path: resumptionPath) + request2.headerFields[.uploadDraftInteropVersion] = "6" + try channel2.writeInbound(HTTPRequestPart.head(request2)) + try channel2.writeInbound(HTTPRequestPart.end(nil)) + let responsePart2 = try channel2.readOutbound(as: HTTPResponsePart.self) + guard case .head(let response2) = responsePart2 else { + XCTFail("Part is not response headers") + return + } + XCTAssertEqual(response2.status.code, 204) + XCTAssertEqual(response2.headerFields[.uploadOffset], "2") + XCTAssertEqual(try channel2.readOutbound(as: HTTPResponsePart.self), .end(nil)) + XCTAssertTrue(try channel2.finish().isClean) + + let channel3 = EmbeddedChannel() + try channel3.pipeline.addHandler(HTTPResumableUploadHandler(context: context, handlers: [])).wait() + var request3 = HTTPRequest(method: .patch, scheme: "https", authority: "example.com", path: resumptionPath) + request3.headerFields[.uploadDraftInteropVersion] = "6" + request3.headerFields[.uploadComplete] = "?1" + request3.headerFields[.uploadOffset] = "2" + request3.headerFields[.contentLength] = "3" + request3.headerFields[.uploadLength] = "5" + request3.headerFields[.contentType] = "application/partial-upload" + try channel3.writeInbound(HTTPRequestPart.head(request3)) + try channel3.writeInbound(HTTPRequestPart.body(ByteBuffer(string: "llo"))) + try channel3.writeInbound(HTTPRequestPart.end(nil)) + + XCTAssertEqual(recorder.receivedFrames.count, 4) + var expectedRequest = request + expectedRequest.headerFields[.uploadIncomplete] = nil + XCTAssertEqual(recorder.receivedFrames[0], HTTPRequestPart.head(expectedRequest)) + XCTAssertEqual(recorder.receivedFrames[1], HTTPRequestPart.body(ByteBuffer(string: "He"))) + XCTAssertEqual(recorder.receivedFrames[2], HTTPRequestPart.body(ByteBuffer(string: "llo"))) + XCTAssertEqual(recorder.receivedFrames[3], HTTPRequestPart.end(nil)) + XCTAssertTrue(try channel3.finish().isClean) + XCTAssertTrue(try channel.finish().isClean) + } } extension HTTPField.Name { fileprivate static let uploadDraftInteropVersion = Self("Upload-Draft-Interop-Version")! + fileprivate static let uploadComplete = Self("Upload-Complete")! fileprivate static let uploadIncomplete = Self("Upload-Incomplete")! fileprivate static let uploadOffset = Self("Upload-Offset")! + fileprivate static let uploadLength = Self("Upload-Length")! + fileprivate static let uploadLimit = Self("Upload-Limit")! }