Skip to content

Commit

Permalink
rollback internal pg transaction on file download fail (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxcapades authored Nov 13, 2024
1 parent 296c8f7 commit 2d0326b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ interface CacheDB {
fun openTransaction(): vdi.component.db.cache.CacheDBTransaction
}

inline fun vdi.component.db.cache.CacheDB.withTransaction(fn: (vdi.component.db.cache.CacheDBTransaction) -> Unit) =
inline fun <T> vdi.component.db.cache.CacheDB.withTransaction(fn: (vdi.component.db.cache.CacheDBTransaction) -> T) =
openTransaction().use {
try {
fn(it)
it.commit()
fn(it).apply { it.commit() }
} catch (e: Throwable) {
it.rollback()
throw e
Expand Down
2 changes: 1 addition & 1 deletion lib/reconciler/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {

testImplementation(kotlin("test"))
testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2")
testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("org.mockito:mockito-core:5.4.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.2")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fun createDataset(
throw BadRequestException("unrecognized target project")
}

CacheDB().withTransaction {
val (tempDirectory, uploadFile) = CacheDB().withTransaction {
it.tryInsertDataset(DatasetImpl(
datasetID = datasetID,
typeName = datasetMeta.type.name,
Expand Down Expand Up @@ -92,32 +92,32 @@ fun createDataset(
dataUpdated = OriginTimestamp,
metaUpdated = OriginTimestamp
))
}

// TODO: Post release!
// The following call represents a 'unified' path for handling uploads
// whether they are direct or via URL. This leaves us open to proxy
// timeouts in the case of URL uploads if the file transfer between the
// VDI service and the remote server takes too long.
//
// To handle this, the 'unified' path will need to be split into two
// paths:
// - Direct upload path
// - URL upload path
//
// For the direct upload path, the upload file will need to be copied to
// a new temp directory (as the rest service thread will delete the
// original upload file). Then the new thread can be forked and the
// file uploaded to MinIO.
//
// For the URL upload path, the URL will need to be validated before
// starting the new thread. Then the new thread will be forked and the
// target file downloaded into a temp directory in that thread before
// being uploaded to MinIO (also in that forked thread).
val (tempDirectory, uploadFile) = try { entity.fetchDatasetFile() }
catch (e: Throwable) {
CacheDB().withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) }
throw e
// TODO: Post release!
// The following call represents a 'unified' path for handling uploads
// whether they are direct or via URL. This leaves us open to proxy
// timeouts in the case of URL uploads if the file transfer between the
// VDI service and the remote server takes too long.
//
// To handle this, the 'unified' path will need to be split into two
// paths:
// - Direct upload path
// - URL upload path
//
// For the direct upload path, the upload file will need to be copied to
// a new temp directory (as the rest service thread will delete the
// original upload file). Then the new thread can be forked and the
// file uploaded to MinIO.
//
// For the URL upload path, the URL will need to be validated before
// starting the new thread. Then the new thread will be forked and the
// target file downloaded into a temp directory in that thread before
// being uploaded to MinIO (also in that forked thread).
try { entity.fetchDatasetFile() }
catch (e: Throwable) {
CacheDB().withTransaction { it.updateImportControl(datasetID, DatasetImportStatus.Failed) }
throw e
}
}

Metrics.Upload.queueSize.inc()
Expand Down

0 comments on commit 2d0326b

Please sign in to comment.