diff --git a/components/metrics/src/main/kotlin/vdi/component/metrics/Metrics.kt b/components/metrics/src/main/kotlin/vdi/component/metrics/Metrics.kt index 97cfce3c..ba135787 100644 --- a/components/metrics/src/main/kotlin/vdi/component/metrics/Metrics.kt +++ b/components/metrics/src/main/kotlin/vdi/component/metrics/Metrics.kt @@ -1,6 +1,7 @@ package vdi.component.metrics import io.prometheus.client.Counter +import io.prometheus.client.Gauge import io.prometheus.client.Histogram object Metrics { @@ -170,4 +171,9 @@ object Metrics { 900.0, // 15 minutes ) .register() + + val uploadQueueSize: Gauge = Gauge.build() + .name("dataset_upload_queue_size") + .help("Number of dataset uploads currently queued to be processed.") + .register() } \ No newline at end of file diff --git a/modules/import-trigger-handler/src/main/kotlin/vdi/module/handler/imports/triggers/ImportTriggerHandlerImpl.kt b/modules/import-trigger-handler/src/main/kotlin/vdi/module/handler/imports/triggers/ImportTriggerHandlerImpl.kt index 2ad10fe5..96c296c0 100644 --- a/modules/import-trigger-handler/src/main/kotlin/vdi/module/handler/imports/triggers/ImportTriggerHandlerImpl.kt +++ b/modules/import-trigger-handler/src/main/kotlin/vdi/module/handler/imports/triggers/ImportTriggerHandlerImpl.kt @@ -233,7 +233,7 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler dataFiles.forEach { dataFile -> sizes[dataFile.name] = dataFile.fileSize() } TempFiles.withTempFile { tempFile -> - Zip.compress(tempFile, dataFiles, Zip.Level(0u)) + Zip.compress(tempFile, dataFiles) dd.putDataFile(InstallZipName) { tempFile.inputStream() } } diff --git a/modules/rest-service/build.gradle.kts b/modules/rest-service/build.gradle.kts index 75750051..dd7fc9d0 100644 --- a/modules/rest-service/build.gradle.kts +++ b/modules/rest-service/build.gradle.kts @@ -55,6 +55,7 @@ dependencies { implementation(project(":components:plugin-mapping")) implementation(project(":components:pruner")) implementation(project(":components:s3")) + implementation(project(":components:metrics")) implementation("org.veupathdb.vdi:vdi-component-json") implementation("org.veupathdb.vdi:vdi-component-common") diff --git a/modules/rest-service/src/main/kotlin/org/veupathdb/service/vdi/service/datasets/create-dataset.kt b/modules/rest-service/src/main/kotlin/org/veupathdb/service/vdi/service/datasets/create-dataset.kt index 9ca098b5..e13a1f64 100644 --- a/modules/rest-service/src/main/kotlin/org/veupathdb/service/vdi/service/datasets/create-dataset.kt +++ b/modules/rest-service/src/main/kotlin/org/veupathdb/service/vdi/service/datasets/create-dataset.kt @@ -25,14 +25,18 @@ import org.veupathdb.vdi.lib.db.cache.model.DatasetImpl import org.veupathdb.vdi.lib.db.cache.model.DatasetImportStatus import org.veupathdb.vdi.lib.db.cache.model.DatasetMetaImpl import org.veupathdb.vdi.lib.handler.mapping.PluginHandlers +import vdi.component.metrics.Metrics import java.net.URL import java.nio.file.Path import java.time.OffsetDateTime +import java.util.concurrent.Executors import kotlin.io.path.* import kotlin.math.max private val log = LoggerFactory.getLogger("create-dataset.kt") +private val WorkPool = Executors.newFixedThreadPool(10) + @OptIn(ExperimentalPathApi::class) fun createDataset( userID: UserID, @@ -93,31 +97,80 @@ fun createDataset( )) } + // TODO: Post release! + // The following call represents a 'unified' path for handling uploads + // whether they are direct or via URL. This leaves us open to proxy + // timeouts in the case of URL uploads if the file transfer between the + // VDI service and the remote server takes too long. + // + // To handle this, the 'unified' path will need to be split into two + // paths: + // - Direct upload path + // - URL upload path + // + // For the direct upload path, the upload file will need to be copied to + // a new temp directory (as the rest service thread will delete the + // original upload file). Then the new thread can be forked and the + // file uploaded to MinIO. + // + // For the URL upload path, the URL will need to be validated before + // starting the new thread. Then the new thread will be forked and the + // target file downloaded into a temp directory in that thread before + // being uploaded to MinIO (also in that forked thread). + val (tempDirectory, uploadFile) = entity.getDatasetFile() + + WorkPool.submit { + Metrics.uploadQueueSize.inc() + try { + uploadFiles(userID, datasetID, tempDirectory, uploadFile, datasetMeta) + } finally { + Metrics.uploadQueueSize.dec() + tempDirectory.deleteRecursively() + } + } +} + +@OptIn(ExperimentalPathApi::class) +private fun uploadFiles( + userID: UserID, + datasetID: DatasetID, + tempDirectory: Path, + uploadFile: Path, + datasetMeta: VDIDatasetMeta, +) { // Get a handle on the temp file that will be uploaded to the S3 store (MinIO) TempFiles.withTempDirectory { directory -> TempFiles.withTempPath { archive -> - val paths = entity.getDatasetFile() - try { log.debug("Verifying file sizes for dataset {}/{} to ensure the user quota is not exceeded.", userID, datasetID) - verifyFileSize(paths.second, userID) + verifyFileSize(uploadFile, userID) log.debug("Repacking input file for dataset {}/{}.", userID, datasetID) - val sizes = paths.second.repack(into = archive, using = directory) + val sizes = uploadFile.repack(into = archive, using = directory) - log.debug("uploading raw user data to S3 for new dataset {} by user {}", datasetID, userID) + log.debug("uploading raw user data to S3 for new dataset {}/{}", userID, datasetID) DatasetStore.putUserUpload(userID, datasetID, archive::inputStream) CacheDB.withTransaction { it.insertUploadFiles(datasetID, sizes) } + } catch (e: Throwable) { + log.error("user dataset upload to minio failed: ", e) + CacheDB.withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) } + throw e } finally { - paths.second.deleteIfExists() - paths.first?.deleteRecursively() + uploadFile.deleteIfExists() + tempDirectory.deleteRecursively() } } } - log.debug("uploading dataset metadata to S3 for new dataset {} by user {}", datasetID, userID) - DatasetStore.putDatasetMeta(userID, datasetID, datasetMeta) + try { + log.debug("uploading dataset metadata to S3 for new dataset {}/{}", userID, datasetID) + DatasetStore.putDatasetMeta(userID, datasetID, datasetMeta) + } catch (e: Throwable) { + log.error("user dataset meta file upload to minio failed: ", e) + CacheDB.withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) } + throw e + } } private fun verifyFileSize(file: Path, userID: UserID) { @@ -195,6 +248,8 @@ private fun Path.repack(into: Path, using: Path): Map { * @return A map of upload files and their sizes. */ private fun Path.repackZip(into: Path, using: Path): Map { + log.trace("repacking zip file {} into {}", this, into) + // Map of file names to sizes that will be stored in the postgres database. val files = HashMap(12) @@ -235,7 +290,7 @@ private fun Path.repackZip(into: Path, using: Path): Map { log.info("Compressing file from {} into {}", unpacked, into) // recompress the files as a tgz file - Zip.compress(into, unpacked, Zip.Level(0u)) + Zip.compress(into, unpacked) return files } @@ -253,6 +308,8 @@ private fun Path.repackZip(into: Path, using: Path): Map { * @return A map of upload files and their sizes. */ private fun Path.repackTar(into: Path, using: Path): Map { + log.trace("repacking tar {} into {}", this, into) + // Output map of files to sizes that will be written to the postgres DB. val sizes = HashMap(12) @@ -272,17 +329,35 @@ private fun Path.repackTar(into: Path, using: Path): Map { } private fun Path.repackRaw(into: Path): Map { + log.trace("repacking raw file {} into {}", this, into) val sizes = HashMap(1) Zip.compress(into, listOf(this)) sizes[this.name] = this.fileSize() return sizes } +/** + * Resolves the upload dataset file and places it in a new temp directory. + * + * If the file was uploaded directly, that upload file will be copied into a new + * temp directory to avoid the rest-server deleting the upload on request + * completion. + * + * If the request provided a URL to a target file, that file will be downloaded + * into a new temp directory. + * + * @return A [Pair] containing the temp directory path first and the temp file + * path second. + */ @OptIn(ExperimentalPathApi::class) -private fun DatasetPostRequest.getDatasetFile(): Pair = +private fun DatasetPostRequest.getDatasetFile(): Pair = if (file != null) { // If the user uploaded a file, then use that - null to file.toPath() + val (tempDir, tempFile) = TempFiles.makeTempPath(file.name) + + file.copyTo(tempFile.toFile(), true) + + tempDir to tempFile } else { // If the user gave us a URL then we have to download the contents of that // URL to a local file to be uploaded. This is done to catch errors with