Skip to content

Commit

Permalink
Merge pull request #193 from VEuPathDB/threaded-uploads
Browse files Browse the repository at this point in the history
Async Dataset Uploads
  • Loading branch information
Foxcapades authored Jan 26, 2024
2 parents 01a169f + 1a54593 commit b4766b9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vdi.component.metrics

import io.prometheus.client.Counter
import io.prometheus.client.Gauge
import io.prometheus.client.Histogram

object Metrics {
Expand Down Expand Up @@ -170,4 +171,9 @@ object Metrics {
900.0, // 15 minutes
)
.register()

val uploadQueueSize: Gauge = Gauge.build()
.name("dataset_upload_queue_size")
.help("Number of dataset uploads currently queued to be processed.")
.register()
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler
dataFiles.forEach { dataFile -> sizes[dataFile.name] = dataFile.fileSize() }

TempFiles.withTempFile { tempFile ->
Zip.compress(tempFile, dataFiles, Zip.Level(0u))
Zip.compress(tempFile, dataFiles)
dd.putDataFile(InstallZipName) { tempFile.inputStream() }
}

Expand Down
1 change: 1 addition & 0 deletions modules/rest-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation(project(":components:plugin-mapping"))
implementation(project(":components:pruner"))
implementation(project(":components:s3"))
implementation(project(":components:metrics"))

implementation("org.veupathdb.vdi:vdi-component-json")
implementation("org.veupathdb.vdi:vdi-component-common")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ import org.veupathdb.vdi.lib.db.cache.model.DatasetImpl
import org.veupathdb.vdi.lib.db.cache.model.DatasetImportStatus
import org.veupathdb.vdi.lib.db.cache.model.DatasetMetaImpl
import org.veupathdb.vdi.lib.handler.mapping.PluginHandlers
import vdi.component.metrics.Metrics
import java.net.URL
import java.nio.file.Path
import java.time.OffsetDateTime
import java.util.concurrent.Executors
import kotlin.io.path.*
import kotlin.math.max

private val log = LoggerFactory.getLogger("create-dataset.kt")

private val WorkPool = Executors.newFixedThreadPool(10)

@OptIn(ExperimentalPathApi::class)
fun createDataset(
userID: UserID,
Expand Down Expand Up @@ -93,31 +97,80 @@ fun createDataset(
))
}

// TODO: Post release!
// The following call represents a 'unified' path for handling uploads
// whether they are direct or via URL. This leaves us open to proxy
// timeouts in the case of URL uploads if the file transfer between the
// VDI service and the remote server takes too long.
//
// To handle this, the 'unified' path will need to be split into two
// paths:
// - Direct upload path
// - URL upload path
//
// For the direct upload path, the upload file will need to be copied to
// a new temp directory (as the rest service thread will delete the
// original upload file). Then the new thread can be forked and the
// file uploaded to MinIO.
//
// For the URL upload path, the URL will need to be validated before
// starting the new thread. Then the new thread will be forked and the
// target file downloaded into a temp directory in that thread before
// being uploaded to MinIO (also in that forked thread).
val (tempDirectory, uploadFile) = entity.getDatasetFile()

WorkPool.submit {
Metrics.uploadQueueSize.inc()
try {
uploadFiles(userID, datasetID, tempDirectory, uploadFile, datasetMeta)
} finally {
Metrics.uploadQueueSize.dec()
tempDirectory.deleteRecursively()
}
}
}

@OptIn(ExperimentalPathApi::class)
private fun uploadFiles(
userID: UserID,
datasetID: DatasetID,
tempDirectory: Path,
uploadFile: Path,
datasetMeta: VDIDatasetMeta,
) {
// Get a handle on the temp file that will be uploaded to the S3 store (MinIO)
TempFiles.withTempDirectory { directory ->
TempFiles.withTempPath { archive ->
val paths = entity.getDatasetFile()

try {
log.debug("Verifying file sizes for dataset {}/{} to ensure the user quota is not exceeded.", userID, datasetID)
verifyFileSize(paths.second, userID)
verifyFileSize(uploadFile, userID)

log.debug("Repacking input file for dataset {}/{}.", userID, datasetID)
val sizes = paths.second.repack(into = archive, using = directory)
val sizes = uploadFile.repack(into = archive, using = directory)

log.debug("uploading raw user data to S3 for new dataset {} by user {}", datasetID, userID)
log.debug("uploading raw user data to S3 for new dataset {}/{}", userID, datasetID)
DatasetStore.putUserUpload(userID, datasetID, archive::inputStream)

CacheDB.withTransaction { it.insertUploadFiles(datasetID, sizes) }
} catch (e: Throwable) {
log.error("user dataset upload to minio failed: ", e)
CacheDB.withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) }
throw e
} finally {
paths.second.deleteIfExists()
paths.first?.deleteRecursively()
uploadFile.deleteIfExists()
tempDirectory.deleteRecursively()
}
}
}

log.debug("uploading dataset metadata to S3 for new dataset {} by user {}", datasetID, userID)
DatasetStore.putDatasetMeta(userID, datasetID, datasetMeta)
try {
log.debug("uploading dataset metadata to S3 for new dataset {}/{}", userID, datasetID)
DatasetStore.putDatasetMeta(userID, datasetID, datasetMeta)
} catch (e: Throwable) {
log.error("user dataset meta file upload to minio failed: ", e)
CacheDB.withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) }
throw e
}
}

private fun verifyFileSize(file: Path, userID: UserID) {
Expand Down Expand Up @@ -195,6 +248,8 @@ private fun Path.repack(into: Path, using: Path): Map<String, Long> {
* @return A map of upload files and their sizes.
*/
private fun Path.repackZip(into: Path, using: Path): Map<String, Long> {
log.trace("repacking zip file {} into {}", this, into)

// Map of file names to sizes that will be stored in the postgres database.
val files = HashMap<String, Long>(12)

Expand Down Expand Up @@ -235,7 +290,7 @@ private fun Path.repackZip(into: Path, using: Path): Map<String, Long> {

log.info("Compressing file from {} into {}", unpacked, into)
// recompress the files as a tgz file
Zip.compress(into, unpacked, Zip.Level(0u))
Zip.compress(into, unpacked)

return files
}
Expand All @@ -253,6 +308,8 @@ private fun Path.repackZip(into: Path, using: Path): Map<String, Long> {
* @return A map of upload files and their sizes.
*/
private fun Path.repackTar(into: Path, using: Path): Map<String, Long> {
log.trace("repacking tar {} into {}", this, into)

// Output map of files to sizes that will be written to the postgres DB.
val sizes = HashMap<String, Long>(12)

Expand All @@ -272,17 +329,35 @@ private fun Path.repackTar(into: Path, using: Path): Map<String, Long> {
}

private fun Path.repackRaw(into: Path): Map<String, Long> {
log.trace("repacking raw file {} into {}", this, into)
val sizes = HashMap<String, Long>(1)
Zip.compress(into, listOf(this))
sizes[this.name] = this.fileSize()
return sizes
}

/**
* Resolves the upload dataset file and places it in a new temp directory.
*
* If the file was uploaded directly, that upload file will be copied into a new
* temp directory to avoid the rest-server deleting the upload on request
* completion.
*
* If the request provided a URL to a target file, that file will be downloaded
* into a new temp directory.
*
* @return A [Pair] containing the temp directory path first and the temp file
* path second.
*/
@OptIn(ExperimentalPathApi::class)
private fun DatasetPostRequest.getDatasetFile(): Pair<Path?, Path> =
private fun DatasetPostRequest.getDatasetFile(): Pair<Path, Path> =
if (file != null) {
// If the user uploaded a file, then use that
null to file.toPath()
val (tempDir, tempFile) = TempFiles.makeTempPath(file.name)

file.copyTo(tempFile.toFile(), true)

tempDir to tempFile
} else {
// If the user gave us a URL then we have to download the contents of that
// URL to a local file to be uploaded. This is done to catch errors with
Expand Down

0 comments on commit b4766b9

Please sign in to comment.