Skip to content

Commit

Permalink
Fix the sort to use owner ID (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmgaldi authored Jan 22, 2024
1 parent 8b58fd5 commit 059441a
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.veupathdb.vdi.lib.db.app
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.ProjectID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.app.model.*
Expand Down Expand Up @@ -84,7 +84,7 @@ interface AppDBAccessor {
* @return Stream of dataset control records sorted by user ID and then dataset ID. The stream
* must be closed to release the db connection.
*/
fun streamAllSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>>
fun streamAllSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord>

/**
* Retrieves all datasets that have an install message record for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.slf4j.LoggerFactory
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.ProjectID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.app.model.*
Expand Down Expand Up @@ -55,7 +55,7 @@ internal class AppDBAccessorImpl(
return con.use { it.selectDatasetProjectLinks(schema, datasetID) }
}

override fun streamAllSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
override fun streamAllSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord> {
log.debug("Streaming all sync control records")
return con.selectAllSyncControl(schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.slf4j.LoggerFactory
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.ProjectID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.app.model.*
Expand Down Expand Up @@ -198,7 +198,7 @@ class AppDBTransactionImpl(
return connection.selectDatasetProjectLinks(schema, datasetID)
}

override fun streamAllSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
override fun streamAllSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord> {
return connection.selectAllSyncControl(schema)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.veupathdb.vdi.lib.db.app.sql

import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetTypeImpl
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import java.sql.Connection
Expand All @@ -20,39 +21,41 @@ SELECT
, s.dataset_id
, d.type_name
, d.type_version
, d.owner
FROM
${schema}.sync_control s
INNER JOIN ${schema}.dataset d
ON d.dataset_id = s.dataset_id
ORDER BY UPPER(CONCAT(CONCAT(d.owner,'/'), s.dataset_id))
"""

internal fun Connection.selectAllSyncControl(schema: String): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
internal fun Connection.selectAllSyncControl(schema: String): CloseableIterator<VDIReconcilerTargetRecord> {
val ps = prepareStatement(sql(schema))
val rs = ps.executeQuery()
return RecordIterator(rs, this, ps)
}

class RecordIterator(val rs: ResultSet,
private val con: Connection,
private val ps: PreparedStatement): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
private val ps: PreparedStatement): CloseableIterator<VDIReconcilerTargetRecord> {

override fun hasNext(): Boolean {
return rs.next();
}

override fun next(): Pair<VDIDatasetType, VDISyncControlRecord> {
return Pair(
VDIDatasetTypeImpl(
override fun next(): VDIReconcilerTargetRecord {
return VDIReconcilerTargetRecord(
type=VDIDatasetTypeImpl(
name = rs.getString("type_name"),
version = rs.getString("type_version")
), VDISyncControlRecord(
),
owner = UserID(rs.getLong("owner")),
syncControlRecord = VDISyncControlRecord(
datasetID = DatasetID(rs.getString("dataset_id")),
sharesUpdated = rs.getObject("shares_update_time", OffsetDateTime::class.java),
dataUpdated = rs.getObject("data_update_time", OffsetDateTime::class.java),
metaUpdated = rs.getObject("meta_update_time", OffsetDateTime::class.java)
)
)
))
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.veupathdb.vdi.lib.common.env.*
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.cache.model.*
Expand Down Expand Up @@ -173,7 +174,7 @@ object CacheDB {
* @return Stream of dataset control records sorted by user ID and then dataset ID. The stream
* must be closed to release the db connection.
*/
fun selectAllSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
fun selectAllSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord> {
log.debug("selecting all sync control records")
return connection.selectAllSyncControl()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.veupathdb.vdi.lib.db.cache.sql.select

import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDIDatasetTypeImpl
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import java.sql.Connection
Expand All @@ -19,31 +21,36 @@ SELECT
, s.dataset_id
, d.type_name
, d.type_version
, d.owner
FROM
vdi.sync_control AS s
INNER JOIN vdi.datasets AS d
USING (dataset_id)
ORDER BY UPPER(CONCAT(d.owner_id,'/',s.dataset_id))
"""

internal fun Connection.selectAllSyncControl(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
internal fun Connection.selectAllSyncControl(): CloseableIterator<VDIReconcilerTargetRecord> {
val ps = prepareStatement(SQL)
val rs = ps.executeQuery()
return RecordIterator(rs, this, ps)
}

class RecordIterator(val rs: ResultSet, val connection: Connection, val preparedStatement: PreparedStatement): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
class RecordIterator(val rs: ResultSet,
val connection: Connection,
val preparedStatement: PreparedStatement): CloseableIterator<VDIReconcilerTargetRecord> {

override fun hasNext(): Boolean {
return rs.next();
}

override fun next(): Pair<VDIDatasetType, VDISyncControlRecord> {
return Pair(
VDIDatasetTypeImpl(
override fun next(): VDIReconcilerTargetRecord {
return VDIReconcilerTargetRecord(
type = VDIDatasetTypeImpl(
name = rs.getString("type_name"),
version = rs.getString("type_version")
), VDISyncControlRecord(
),
owner = UserID(rs.getLong("owner")),
syncControlRecord = VDISyncControlRecord(
datasetID = DatasetID(rs.getString("dataset_id")),
sharesUpdated = rs.getObject("shares_update_time", OffsetDateTime::class.java),
dataUpdated = rs.getObject("data_update_time", OffsetDateTime::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,15 @@ object Metrics {
.labelNames("target_name")
.register()

val missingInTarget: Counter = Counter.build()
.name("dataset_reconciler_missing_in_target")
.help("Count of datasets the reconciler finds are missing in the target DB.")
.labelNames("target_name")
.register()

val malformedDatasetFound: Counter = Counter.build()
.name("malformed_dataset_found")
.help("A Malformed dataset was found during reconciliation.")
.labelNames("target_name")
.register()

val reconcilerTimes: Histogram = Histogram.build()
Expand Down
1 change: 1 addition & 0 deletions components/s3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {
dependencies {
implementation(platform(project(":platform")))

implementation(project(":components:metrics"))
implementation(project(":components:constants"))

implementation("org.veupathdb.vdi:vdi-component-common")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package org.veupathdb.vdi.lib.s3.datasets

import org.slf4j.LoggerFactory
import org.veupathdb.lib.s3.s34k.buckets.S3Bucket
import org.veupathdb.lib.s3.s34k.objects.S3Object
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.field.toUserIDOrNull
import org.veupathdb.vdi.lib.s3.datasets.exception.MalformedDatasetException
import org.veupathdb.vdi.lib.s3.datasets.paths.S3DatasetPathFactory
import org.veupathdb.vdi.lib.s3.datasets.paths.S3PathFactory
import org.veupathdb.vdi.lib.s3.datasets.paths.S3Paths
import vdi.component.metrics.Metrics

import java.util.*
import java.util.stream.Stream
import java.util.stream.StreamSupport
Expand Down Expand Up @@ -75,6 +79,7 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
private class DatasetDirIterator(
private val objectStream: Iterator<S3Object>
) : Iterator<DatasetDirectory> {
private val log = LoggerFactory.getLogger(javaClass)
private var stagedObjects: List<S3Object> = mutableListOf()
private var currentDataset: DatasetDirectory? = initFirstDataset()

Expand Down Expand Up @@ -107,25 +112,35 @@ class DatasetManager(private val s3Bucket: S3Bucket) {
if (initialDatasetID != currDatasetID) {
// We've found a new dataset, construct our currentDataset with objects seen so far and reset objects variable
// to contain the new dataset's object.
currentDataset = EagerlyLoadedDatasetDirectory(
stagedObjects,
initialUserID,
initialDatasetID,
S3DatasetPathFactory(initialUserID, initialDatasetID)
)
try {
currentDataset = EagerlyLoadedDatasetDirectory(
stagedObjects,
initialUserID,
initialDatasetID,
S3DatasetPathFactory(initialUserID, initialDatasetID)
)
} catch (e: MalformedDatasetException) {
Metrics.malformedDatasetFound.inc()
log.warn("Found a malformed dataset with ID $initialDatasetID.")
}
// Set staged objects to contain the object belonging to new dataset.
stagedObjects = mutableListOf(s3Object)
return currentDataset
}
stagedObjects = stagedObjects + s3Object
if (!objectStream.hasNext()) {
// Special handling if there's a single directory in the stream.
currentDataset = EagerlyLoadedDatasetDirectory(
stagedObjects,
initialUserID,
initialDatasetID,
S3DatasetPathFactory(initialUserID, initialDatasetID)
)
try {
// Special handling if there's a single directory in the stream.
currentDataset = EagerlyLoadedDatasetDirectory(
stagedObjects,
initialUserID,
initialDatasetID,
S3DatasetPathFactory(initialUserID, initialDatasetID)
)
} catch (e: MalformedDatasetException) {
Metrics.malformedDatasetFound.inc()
log.warn("Found a malformed dataset with ID $initialDatasetID.")
}
// Stream is exhausted. Indicate as much.
stagedObjects = emptyList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.veupathdb.vdi.lib.reconciler

import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.app.AppDB
import org.veupathdb.vdi.lib.handler.mapping.PluginHandlers
Expand All @@ -15,7 +15,7 @@ class AppDBTarget(

override val type = ReconcilerTargetType.Install

override fun streamSortedSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
override fun streamSortedSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord> {
return AppDB.accessor(projectID)!!.streamAllSyncControlRecords()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.veupathdb.vdi.lib.reconciler

import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.model.VDIDatasetType
import org.veupathdb.vdi.lib.common.model.VDISyncControlRecord
import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
import org.veupathdb.vdi.lib.db.cache.CacheDB
import org.veupathdb.vdi.lib.db.cache.CacheDBTransaction
Expand All @@ -12,7 +12,7 @@ class CacheDBTarget : ReconcilerTarget {

override val type = ReconcilerTargetType.Cache

override fun streamSortedSyncControlRecords(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
override fun streamSortedSyncControlRecords(): CloseableIterator<VDIReconcilerTargetRecord> {
return CacheDB.selectAllSyncControlRecords()
}

Expand Down
Loading

0 comments on commit 059441a

Please sign in to comment.