Skip to content

Commit

Permalink
Remove case sensitivity from comparisons to be compatible with MinIO …
Browse files Browse the repository at this point in the history
…sorts (#188)
  • Loading branch information
dmgaldi authored Jan 19, 2024
1 parent 2410f7d commit 60e6779
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ FROM
${schema}.sync_control s
INNER JOIN ${schema}.dataset d
ON d.dataset_id = s.dataset_id
ORDER BY CONCAT(CONCAT(d.owner,'/'), s.dataset_id)
ORDER BY UPPER(CONCAT(CONCAT(d.owner,'/'), s.dataset_id))
"""

internal fun Connection.selectAllSyncControl(schema: String): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM
vdi.sync_control AS s
INNER JOIN vdi.datasets AS d
USING (dataset_id)
ORDER BY CONCAT(d.owner_id,'/',s.dataset_id)
ORDER BY UPPER(CONCAT(d.owner_id,'/',s.dataset_id))
"""

internal fun Connection.selectAllSyncControl(): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,30 @@ object Metrics {
)
.register()

val failedReconciliation: Counter = Counter.build()
.name("dataset_reconiler_failed")
.help("Count of failed reconciler runs.")
.labelNames("target_name")
.register()

val reconcilerDatasetDeleted: Counter = Counter.build()
.name("dataset_reconciler_deleted")
.help("Count of datasets deleted by reconciler.")
.labelNames("target_name")
.register()

val reconcilerDatasetSynced: Counter = Counter.build()
.name("dataset_reconciler_synced")
.help("Count of datasets synced by reconciler.")
.labelNames("target_name")
.register()

val malformedDatasetFound: Counter = Counter.build()
.name("malformed_dataset_found")
.help("A Malformed dataset was found during reconciliation.")
.labelNames("target_name")
.register()

val reconcilerTimes: Histogram = Histogram.build()
.name("dataset_reconciler_times")
.help("Dataset reconciler run times.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.UserID
import org.veupathdb.vdi.lib.common.field.toUserIDOrNull
import org.veupathdb.vdi.lib.common.model.*
import org.veupathdb.vdi.lib.s3.datasets.exception.MalformedDatasetException
import org.veupathdb.vdi.lib.s3.datasets.paths.S3DatasetPathFactory
import org.veupathdb.vdi.lib.s3.datasets.paths.S3Paths
import vdi.constants.InstallZipName
Expand Down Expand Up @@ -161,7 +162,7 @@ private fun S3Object.toDatasetFile(pathFactory: S3DatasetPathFactory): DatasetFi
)
this.path.contains(pathFactory.datasetDeleteFlagFile()) -> DatasetDeleteFlagFileImpl(this)
this.path.contains(pathFactory.datasetUploadsDir()) -> DatasetUploadFileImpl(this)
else -> throw IllegalStateException("Unable to create a dataset file from path " + this.path)
else -> throw MalformedDatasetException("Unrecognized file path in S3: " + this.path)
}
return datasetFile
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.veupathdb.vdi.lib.s3.datasets.exception

class MalformedDatasetException(message: String?) : Exception(message) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ReconcilerImpl(private val config: ReconcilerConfig) :
contextData = ThreadContextData( // Add log4j context to reconciler to distinguish targets in logs.
map = mapOf(
Pair(
"reconciler",
"workerID",
target.name
)
), Stack()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import org.veupathdb.vdi.lib.kafka.model.triggers.UpdateMetaTrigger
import org.veupathdb.vdi.lib.kafka.router.KafkaRouter
import org.veupathdb.vdi.lib.s3.datasets.DatasetDirectory
import org.veupathdb.vdi.lib.s3.datasets.DatasetManager
import org.veupathdb.vdi.lib.s3.datasets.exception.MalformedDatasetException
import vdi.component.metrics.Metrics

/**
* Component for synchronizing the dataset object store (the source of truth for datasets) with a target database.
Expand All @@ -30,6 +32,7 @@ class ReconcilerInstance(
} catch (e: Exception) {
// Don't re-throw error, ensure exception is logged and soldier on for future reconciliation.
logger().error("Failure running reconciler for " + targetDB.name, e)
Metrics.failedReconciliation.labels(targetDB.name).inc()
}
}

Expand All @@ -42,11 +45,20 @@ class ReconcilerInstance(

nextTargetDataset = if (targetIterator.hasNext()) targetIterator.next() else null

// Iterate through datasets in S3
// Iterate through datasets in S3.
while (sourceIterator.hasNext()) {

// Pop the next DatasetDirectory instance from the S3 stream.
val sourceDatasetDir: DatasetDirectory = sourceIterator.next()
// Try to read a dataset from S3.
var sourceDatasetDir: DatasetDirectory
try {
// Pop the next DatasetDirectory instance from the S3 stream.
sourceDatasetDir = sourceIterator.next()
} catch (e: MalformedDatasetException) {
// Skip the dataset if it's malformed for some reason. As things settle down, we may want to clean it up in MinIO?
logger().error("Found a malformed dataset in S3. Skipping dataset and continuing on.", e)
Metrics.malformedDatasetFound.labels(targetDB.name).inc()
continue
}

logger().info("Checking dataset ${sourceDatasetDir.ownerID}/${sourceDatasetDir.datasetID} for ${targetDB.name}")

Expand All @@ -60,8 +72,12 @@ class ReconcilerInstance(
// If target dataset stream is "ahead" of source stream, delete
// the datasets from the target stream until we are aligned
// again (or the target stream is consumed).
if (sourceDatasetDir.datasetID.toString() > nextTargetDataset!!.second.datasetID.toString()) {
while (nextTargetDataset != null && sourceDatasetDir.datasetID.toString() > nextTargetDataset!!.second.datasetID.toString()) {
if (sourceDatasetDir.datasetID.toString().compareTo(nextTargetDataset!!.second.datasetID.toString(), true) > 0) {

// Delete datasets until and advance target iterator until streams are aligned.
while (nextTargetDataset != null && sourceDatasetDir.datasetID.toString().compareTo(nextTargetDataset!!.second.datasetID.toString(), true) > 0) {
logger().info("Attempting to delete dataset with owner ${sourceDatasetDir.ownerID} and ID ${sourceDatasetDir.datasetID} " +
"because ${nextTargetDataset!!.second.datasetID} is lexigraphically greater than our ID.")
tryDeleteDataset(targetDB, nextTargetDataset!!.first, nextTargetDataset!!.second.datasetID)
nextTargetDataset = if (targetIterator.hasNext()) targetIterator.next() else null
}
Expand All @@ -77,6 +93,7 @@ class ReconcilerInstance(
// Dataset is in source, but not in target. Send an event.
sendSyncIfRelevant(sourceDatasetDir)
} else {
// Dataset is in source and target. Check dates to see if sync is needed.
if (isOutOfSync(sourceDatasetDir, nextTargetDataset!!.second)) {
sendSyncIfRelevant(sourceDatasetDir)
}
Expand All @@ -101,6 +118,8 @@ class ReconcilerInstance(

private fun tryDeleteDataset(targetDB: ReconcilerTarget, datasetType: VDIDatasetType, datasetID: DatasetID) {
try {
logger().info("Trying to delete dataset $datasetID.")
Metrics.reconcilerDatasetDeleted.labels(targetDB.name).inc()
targetDB.deleteDataset(datasetID = datasetID, datasetType = datasetType)
} catch (e: Exception) {
// Swallow exception and alert if unable to delete. Reconciler can safely recover, but the dataset
Expand Down Expand Up @@ -135,6 +154,7 @@ class ReconcilerInstance(
logger().info("Sending sync event for ${sourceDatasetDir.datasetID}")
// An update-meta event should trigger synchronization of all dataset components.
kafkaRouter.sendUpdateMetaTrigger(UpdateMetaTrigger(sourceDatasetDir.ownerID, sourceDatasetDir.datasetID))
Metrics.reconcilerDatasetSynced.labels(targetDB.name).inc()
}

private fun consumeEntireSourceStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ReconcilerTest {
@DisplayName("Test insert one, delete one at the end")
fun test1() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()

Expand Down Expand Up @@ -82,6 +83,7 @@ class ReconcilerTest {
@DisplayName("Test single dataset out of sync")
fun test2() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter)
Expand Down Expand Up @@ -116,6 +118,7 @@ class ReconcilerTest {
@DisplayName("Test single dataset in sync")
fun test3() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter)
Expand Down Expand Up @@ -145,6 +148,7 @@ class ReconcilerTest {
@DisplayName("Test target DB missing all datasets")
fun test4() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter)
Expand All @@ -167,6 +171,7 @@ class ReconcilerTest {
@DisplayName("Test delete one in the middle")
fun test5() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()

Expand Down Expand Up @@ -225,6 +230,7 @@ class ReconcilerTest {
@DisplayName("Test delete last datasets in target stream, then sync last source")
fun test6() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter)
Expand Down Expand Up @@ -276,6 +282,43 @@ class ReconcilerTest {
assertEquals(listOf("22345678123456781234567812345678", "32345678123456781234567812345678", "42345678123456781234567812345678"), deletedIDs)
}

@Test
@DisplayName("Test case sensitivity")
fun test7() {
val cacheDb = mock<ReconcilerTarget>()
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter)

`when`(cacheDb.type).thenReturn(ReconcilerTargetType.Cache)

`when`(cacheDb.streamSortedSyncControlRecords()).thenReturn(
closeableIterator(listOf(
Pair(
VDIDatasetTypeImpl("Stub", "Stub"), VDISyncControlRecord(
datasetID = DatasetID("Vbz2OgjnKsR"),
sharesUpdated = UpdateTime,
dataUpdated = UpdateTime,
metaUpdated = UpdateTime
)
),
).iterator())
)
doReturn(
listOf(
mockDatasetDirectory(111L, "eA1WkZhiGbE", UpdateTime.plusDays(1)),
mockDatasetDirectory(111L, "Vbz2OgjnKsR", UpdateTime.plusDays(1)),
).stream()
).`when`(datasetManager).streamAllDatasets()
recon.reconcile()
val deletedIDs = mockingDetails(cacheDb).invocations
.filter { it.method.name == "deleteDataset" }
.map { it.getArgument<DatasetID>(1).toString() }

assertEquals(listOf(), deletedIDs)
}

private fun closeableIterator(iterator: Iterator<Pair<VDIDatasetType, VDISyncControlRecord>>): CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
return object: CloseableIterator<Pair<VDIDatasetType, VDISyncControlRecord>> {
override fun close() {
Expand Down
11 changes: 1 addition & 10 deletions src/main/resources/log4j2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ Configuration:
target: SYSTEM_OUT
PatternLayout:
pattern: "%highlight{%d{yyyy-MM-dd HH:mm:ss.SSS} (wid:%X{workerID}) [rid:%5X{traceId}] %-5level %c - %m%n}"
- name: Reconciler_Appender
target: SYSTEM_OUT
PatternLayout:
pattern: "%highlight{%d{yyyy-MM-dd HH:mm:ss.SSS} [reconciler:%5X{reconciler}] %-5level %c - %m%n}"

Loggers:
Root:
Expand All @@ -32,9 +28,4 @@ Configuration:
level: error
additivity: false
AppenderRef:
- ref: Console_Appender
- name: org.veupathdb.vdi.lib.reconciler
level: debug
additivity: false
AppenderRef:
- ref: Reconciler_Appender
- ref: Console_Appender

0 comments on commit 60e6779

Please sign in to comment.