Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing option builders for resumable uploads and cache options #739

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.jan.supabase.storage

import io.github.jan.supabase.storage.resumable.Fingerprint
import io.github.jan.supabase.storage.resumable.ResumableClient
import io.github.jan.supabase.storage.resumable.ResumableUpload
import io.ktor.util.cio.readChannel
import io.ktor.utils.io.discard
import java.io.File
Expand All @@ -14,18 +15,18 @@ import kotlin.io.path.fileSize
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param file The file to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, file: File, upsert: Boolean = false) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePath, file.length(), path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, file: File, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePath, file.length(), path, options)

/**
* Creates a new resumable upload or continues an existing one.
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param file The file to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, file: Path, upsert: Boolean = false) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePathString(), file.fileSize(), path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, file: Path, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePathString(), file.fileSize(), path, options)

/**
* Reads pending uploads from the cache and creates a new [ResumableUpload] for each of them. This done in parallel, so you can start the uploads independently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import io.ktor.utils.io.jvm.javaio.toByteReadChannel
* Creates a new upload or continues an existing one from the given [uri]
* @param path The path to upload the file to
* @param uri The uri of the file to upload (make sure you have access to it)
* @param upsert Whether to overwrite an existing file
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, uri: Uri, upsert: Boolean = false) = createOrContinueUpload(uri.createByteReader(), uri.toString(), uri.contentSize, path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, uri: Uri, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload(uri.createByteReader(), uri.toString(), uri.contentSize, path, options)

@SuppressLint("Recycle")
private suspend fun Uri.createByteReader(): suspend (Long) -> ByteReadChannel = { offset: Long ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ sealed interface BucketApi {
* Creates a signed url to upload without authentication.
* These urls are valid for 2 hours.
* @param path The path to create an url for
* @param upsert Whether to upsert the file if it already exists
*/
suspend fun createSignedUploadUrl(path: String): UploadSignedUrl
suspend fun createSignedUploadUrl(path: String, upsert: Boolean = false): UploadSignedUrl

/**
* Creates a signed url to download without authentication. The url will expire after [expiresIn]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ internal class BucketApiImpl(override val bucketId: String, val storage: Storage
)
}

override suspend fun createSignedUploadUrl(path: String): UploadSignedUrl {
val result = storage.api.post("object/upload/sign/$bucketId/$path")
override suspend fun createSignedUploadUrl(path: String, upsert: Boolean): UploadSignedUrl {
val result = storage.api.post("object/upload/sign/$bucketId/$path") {
header(UPSERT_HEADER, upsert.toString())
}
val urlPath = result.body<JsonObject>()["url"]?.jsonPrimitive?.content?.substring(1)
?: error("Expected a url in create upload signed url response")
val url = Url(storage.resolveUrl(urlPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ import kotlinx.serialization.Serializable
* @param path The storage path
* @param bucketId The bucket id
* @param expiresAt The time the url expires
* @param upsert Whether the entry should be updated if it already exists
* @param contentType The content type of the file
*/
@Serializable
data class ResumableCacheEntry(val url: String, val path: String, val bucketId: String, val expiresAt: Instant)
data class ResumableCacheEntry(
val url: String,
val path: String,
val bucketId: String,
val expiresAt: Instant,
val upsert: Boolean = false, //for compatibility with the old cache
val contentType: String = "application/octet-stream"
)

/**
* A pair of a [Fingerprint] and a [ResumableCacheEntry]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.github.jan.supabase.auth.Auth
import io.github.jan.supabase.logging.d
import io.github.jan.supabase.storage.BucketApi
import io.github.jan.supabase.storage.Storage
import io.github.jan.supabase.storage.UploadOptionBuilder
import io.github.jan.supabase.storage.resumable.ResumableClient.Companion.TUS_VERSION
import io.github.jan.supabase.storage.storage
import io.ktor.client.request.bearerAuth
Expand Down Expand Up @@ -41,18 +42,18 @@ sealed interface ResumableClient {
* @param channel A function that takes the offset of the upload and returns a [ByteReadChannel] that reads the data to upload from the given offset
* @param size The size of the data to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun createOrContinueUpload(channel: suspend (offset: Long) -> ByteReadChannel, source: String, size: Long, path: String, upsert: Boolean = false): ResumableUpload
suspend fun createOrContinueUpload(channel: suspend (offset: Long) -> ByteReadChannel, source: String, size: Long, path: String, options: UploadOptionBuilder.() -> Unit = {}): ResumableUpload

/**
* Creates a new resumable upload or continues an existing one.
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param data The data to upload as a [ByteArray]
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun createOrContinueUpload(data: ByteArray, source: String, path: String, upsert: Boolean = false) = createOrContinueUpload({ ByteReadChannel(data).apply { discard(it) } }, source, data.size.toLong(), path)
suspend fun createOrContinueUpload(data: ByteArray, source: String, path: String, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ ByteReadChannel(data).apply { discard(it) } }, source, data.size.toLong(), path)

/**
* Reads pending uploads from the cache and creates a new [ResumableUpload] for each of them. This done in parallel, so you can start the downloads independently.
Expand Down Expand Up @@ -94,23 +95,24 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
source: String,
size: Long,
path: String,
upsert: Boolean
options: UploadOptionBuilder.() -> Unit
): ResumableUpload {
val cachedEntry = cache.get(Fingerprint(source, size))
if(cachedEntry != null) {
Storage.logger.d { "Found cached upload for $path" }
return resumeUpload(channel, cachedEntry, source, path, size)
}
return createUpload(channel, source, path, size, upsert)
return createUpload(channel, source, path, size, options)
}

private suspend fun createUpload(channel: suspend (Long) -> ByteReadChannel, source: String, path: String, size: Long, upsert: Boolean): ResumableUploadImpl {
private suspend fun createUpload(channel: suspend (Long) -> ByteReadChannel, source: String, path: String, size: Long, options: UploadOptionBuilder.() -> Unit): ResumableUploadImpl {
val uploadOptions = UploadOptionBuilder(storageApi.supabaseClient.storage.serializer).apply(options)
val response = httpClient.post(url) {
header("Upload-Metadata", encodeMetadata(createMetadata(path)))
header("Upload-Metadata", encodeMetadata(createMetadata(path, uploadOptions.contentType)))
bearerAuth(accessTokenOrApiKey())
header("Upload-Length", size)
header("Tus-Resumable", TUS_VERSION)
header("x-upsert", upsert.toString())
header("x-upsert", uploadOptions.upsert)
}
when(response.status) {
HttpStatusCode.Conflict -> error("Specified path already exists. Consider setting upsert to true")
Expand All @@ -120,7 +122,7 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
}
val uploadUrl = response.headers["Location"] ?: error("No upload url found")
val fingerprint = Fingerprint(source, size)
val cacheEntry = ResumableCacheEntry(uploadUrl, path, storageApi.bucketId, Clock.System.now() + 1.days)
val cacheEntry = ResumableCacheEntry(uploadUrl, path, storageApi.bucketId, Clock.System.now() + 1.days, uploadOptions.upsert, uploadOptions.contentType.toString())
cache.set(fingerprint, cacheEntry)
return ResumableUploadImpl(fingerprint, path, cacheEntry, channel, 0, chunkSize, uploadUrl, httpClient, storageApi, { retrieveServerOffset(uploadUrl, path) }) {
cache.remove(fingerprint)
Expand All @@ -132,7 +134,10 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
if(Clock.System.now() > entry.expiresAt) {
Storage.logger.d { "Upload url for $path expired. Creating new one" }
cache.remove(fingerprint)
return createUpload(channel, source, path, size, false)
return createUpload(channel, source, path, size) {
upsert = entry.upsert
contentType = ContentType.parse(entry.contentType)
}
}
val offset = retrieveServerOffset(entry.url, path)
if(offset < size) {
Expand All @@ -156,10 +161,10 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va

private fun accessTokenOrApiKey() = storageApi.supabaseClient.pluginManager.getPluginOrNull(Auth)?.currentAccessTokenOrNull() ?: storageApi.supabaseClient.supabaseKey

private fun createMetadata(path: String): Map<String, String> = buildMap {
private fun createMetadata(path: String, contentType: ContentType? = null): Map<String, String> = buildMap {
put("bucketName", storageApi.bucketId)
put("objectName", path)
put("contentType", ContentType.defaultForFilePath(path).toString())
put("contentType", contentType?.toString() ?: ContentType.defaultForFilePath(path).toString())
}

@OptIn(ExperimentalEncodingApi::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class UploadViewModel(
file.dataProducer,
file.path ?: file.name,
file.getSize() ?: error("Invalid file"),
path,
true
)
path
) {
upsert = true
}
uploads[upload.fingerprint] = upload
uploadItems.value = uploadItems.value.map {
if(it.fingerprint == upload.fingerprint) UploadState.Loaded(upload.fingerprint, upload.stateFlow.value) else it
Expand Down