Skip to content

Commit

Permalink
Support v5 and v6
Browse files Browse the repository at this point in the history
  • Loading branch information
guoye-zhang committed Nov 3, 2024
1 parent 95cbcdc commit 9f8400e
Show file tree
Hide file tree
Showing 3 changed files with 712 additions and 118 deletions.
202 changes: 129 additions & 73 deletions Sources/NIOResumableUpload/HTTPResumableUpload.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ final class HTTPResumableUpload {
private(set) var resumePath: String?
/// The current upload offset.
private var offset: Int64 = 0
/// The total size of the upload (if known).
private var uploadSize: Int64?
/// The total length of the upload (if known).
private var uploadLength: Int64?
/// The current request is an upload creation request.
private var requestIsCreation: Bool = false
/// The end of the current request is the end of the upload.
private var requestIsComplete: Bool = true
/// The interop version of the current request
private var interopVersion: HTTPResumableUploadProtocol.InteropVersion = .latest
/// Whether you have received the entire upload.
private var uploadComplete: Bool = false
/// The response has started.
Expand Down Expand Up @@ -150,39 +152,57 @@ extension HTTPResumableUpload {
}
}

private func offsetRetrieving(otherHandler: HTTPResumableUploadHandler) {
private func offsetRetrieving(
otherHandler: HTTPResumableUploadHandler,
version: HTTPResumableUploadProtocol.InteropVersion
) {
self.runInEventLoop {
self.detachUploadHandler(close: true)
let response = HTTPResumableUploadProtocol.offsetRetrievingResponse(
offset: self.offset,
complete: self.uploadComplete
complete: self.uploadComplete,
version: version
)
self.respondAndDetach(response, handler: otherHandler)
}
}

private func saveUploadLength(complete: Bool, contentLength: Int64?, uploadLength: Int64?) -> Bool {
let computedUploadLength = complete ? contentLength.map { self.offset + $0 } : nil
if let knownUploadLength = self.uploadLength {
if let computedUploadLength, knownUploadLength != computedUploadLength {
return false
}
} else {
self.uploadLength = computedUploadLength
}
if let knownUploadLength = self.uploadLength {
if let uploadLength, knownUploadLength != uploadLength {
return false
}
} else {
self.uploadLength = uploadLength
}
return true
}

private func uploadAppending(
otherHandler: HTTPResumableUploadHandler,
channel: Channel,
offset: Int64,
complete: Bool,
contentLength: Int64?,
complete: Bool
uploadLength: Int64?,
version: HTTPResumableUploadProtocol.InteropVersion
) {
self.runInEventLoop {
let conflict: Bool
if self.uploadHandler == nil && self.offset == offset && !self.responseStarted {
if let uploadSize = self.uploadSize {
if let contentLength {
conflict = complete && uploadSize != offset + contentLength
} else {
conflict = true
}
} else {
if let contentLength, complete {
self.uploadSize = offset + contentLength
}
conflict = false
}
conflict = !self.saveUploadLength(
complete: complete,
contentLength: contentLength,
uploadLength: uploadLength
)
} else {
conflict = true
}
Expand All @@ -191,13 +211,15 @@ extension HTTPResumableUpload {
self.destroyChannel(error: HTTPResumableUploadError.badResumption)
let response = HTTPResumableUploadProtocol.conflictResponse(
offset: self.offset,
complete: self.uploadComplete
complete: self.uploadComplete,
version: version
)
self.respondAndDetach(response, handler: otherHandler)
return
}
self.requestIsCreation = false
self.requestIsComplete = complete
self.interopVersion = version
self.attachUploadHandler(otherHandler, channel: channel)
}
}
Expand All @@ -212,70 +234,102 @@ extension HTTPResumableUpload {
private func receiveHead(handler: HTTPResumableUploadHandler, channel: Channel, request: HTTPRequest) {
self.eventLoop.preconditionInEventLoop()

switch HTTPResumableUploadProtocol.identifyRequest(request, in: self.context) {
case .notSupported:
let channel = self.createChannel(handler: handler, parent: channel)
channel.receive(.head(request))
case .uploadCreation(let complete, let contentLength):
self.requestIsCreation = true
self.requestIsComplete = complete
self.uploadSize = complete ? contentLength : nil
let resumePath = self.context.startUpload(self)
self.resumePath = resumePath

let informationalResponse = HTTPResumableUploadProtocol.featureDetectionResponse(
resumePath: resumePath,
in: self.context
)
handler.writeAndFlush(.head(informationalResponse), promise: nil)

let strippedRequest = HTTPResumableUploadProtocol.stripRequest(request)
let channel = self.createChannel(handler: handler, parent: channel)
channel.receive(.head(strippedRequest))
case .offsetRetrieving:
if let path = request.path, let upload = self.context.findUpload(path: path) {
self.uploadHandler = nil
self.uploadHandlerChannel.withLockedValue { $0 = nil }
upload.offsetRetrieving(otherHandler: handler)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse()
self.respondAndDetach(response, handler: handler)
do {
guard let (type, version) = try HTTPResumableUploadProtocol.identifyRequest(request, in: self.context)
else {
let channel = self.createChannel(handler: handler, parent: channel)
channel.receive(.head(request))
return
}
case .uploadAppending(let offset, let complete, let contentLength):
if let path = request.path, let upload = self.context.findUpload(path: path) {
handler.upload = upload
self.uploadHandler = nil
self.uploadHandlerChannel.withLockedValue { $0 = nil }
upload.uploadAppending(
otherHandler: handler,
channel: channel,
offset: offset,
contentLength: contentLength,
complete: complete
self.interopVersion = version
switch type {
case .uploadCreation(let complete, let contentLength, let uploadLength):
self.requestIsCreation = true
self.requestIsComplete = complete
self.uploadLength = uploadLength
if !self.saveUploadLength(complete: complete, contentLength: contentLength, uploadLength: uploadLength)
{
let response = HTTPResumableUploadProtocol.conflictResponse(
offset: self.offset,
complete: self.uploadComplete,
version: version
)
self.respondAndDetach(response, handler: handler)
return
}
let resumePath = self.context.startUpload(self)
self.resumePath = resumePath

let informationalResponse = HTTPResumableUploadProtocol.featureDetectionResponse(
resumePath: resumePath,
in: self.context,
version: version
)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse()
self.respondAndDetach(response, handler: handler)
}
case .uploadCancellation:
if let path = request.path, let upload = self.context.findUpload(path: path) {
upload.uploadCancellation()
let response = HTTPResumableUploadProtocol.cancelledResponse()
self.respondAndDetach(response, handler: handler)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse()
handler.writeAndFlush(.head(informationalResponse), promise: nil)

let strippedRequest = HTTPResumableUploadProtocol.stripRequest(request)
let channel = self.createChannel(handler: handler, parent: channel)
channel.receive(.head(strippedRequest))
case .offsetRetrieving:
if let path = request.path, let upload = self.context.findUpload(path: path) {
self.uploadHandler = nil
self.uploadHandlerChannel.withLockedValue { $0 = nil }
upload.offsetRetrieving(otherHandler: handler, version: version)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse(version: version)
self.respondAndDetach(response, handler: handler)
}
case .uploadAppending(let offset, let complete, let contentLength, let uploadLength):
if let path = request.path, let upload = self.context.findUpload(path: path) {
handler.upload = upload
self.uploadHandler = nil
self.uploadHandlerChannel.withLockedValue { $0 = nil }
upload.uploadAppending(
otherHandler: handler,
channel: channel,
offset: offset,
complete: complete,
contentLength: contentLength,
uploadLength: uploadLength,
version: version
)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse(version: version)
self.respondAndDetach(response, handler: handler)
}
case .uploadCancellation:
if let path = request.path, let upload = self.context.findUpload(path: path) {
upload.uploadCancellation()
let response = HTTPResumableUploadProtocol.cancelledResponse(version: version)
self.respondAndDetach(response, handler: handler)
} else {
let response = HTTPResumableUploadProtocol.notFoundResponse(version: version)
self.respondAndDetach(response, handler: handler)
}
case .options:
let response = HTTPResumableUploadProtocol.optionsResponse(version: version)
self.respondAndDetach(response, handler: handler)
}
case .invalid:
} catch {
let response = HTTPResumableUploadProtocol.badRequestResponse()
self.respondAndDetach(response, handler: handler)
}
}

private func receiveBody(body: ByteBuffer) {
private func receiveBody(handler: HTTPResumableUploadHandler, body: ByteBuffer) {
self.eventLoop.preconditionInEventLoop()

self.offset += Int64(body.readableBytes)

if let uploadLength = self.uploadLength, self.offset > uploadLength {
let response = HTTPResumableUploadProtocol.conflictResponse(
offset: self.offset,
complete: self.uploadComplete,
version: self.interopVersion
)
self.respondAndDetach(response, handler: handler)
return
}
self.uploadChannel?.receive(.body(body))
}

Expand All @@ -291,7 +345,8 @@ extension HTTPResumableUpload {
offset: self.offset,
resumePath: resumePath,
forUploadCreation: self.requestIsCreation,
in: self.context
in: self.context,
version: self.interopVersion
)
self.respondAndDetach(response, handler: handler)
}
Expand All @@ -306,7 +361,7 @@ extension HTTPResumableUpload {
case .head(let request):
self.receiveHead(handler: handler, channel: channel, request: request)
case .body(let body):
self.receiveBody(body: body)
self.receiveBody(handler: handler, body: body)
case .end(let trailers):
self.receiveEnd(handler: handler, trailers: trailers)
}
Expand Down Expand Up @@ -362,7 +417,8 @@ extension HTTPResumableUpload {
offset: self.offset,
resumePath: resumePath,
forUploadCreation: self.requestIsCreation,
in: self.context
in: self.context,
version: self.interopVersion
)
uploadHandler.write(.head(response), promise: promise)
case .body, .end:
Expand Down
Loading

0 comments on commit 9f8400e

Please sign in to comment.