Skip to content

Commit

Permalink
Merge pull request #326 from VEuPathDB/issue-259
Browse files Browse the repository at this point in the history
Admin Reconciler Endpoint
  • Loading branch information
Foxcapades authored Aug 9, 2024
2 parents 0de5d96 + 67aee95 commit 293423f
Show file tree
Hide file tree
Showing 30 changed files with 323 additions and 139 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ services:
RECONCILER_FULL_ENABLED: ${RECONCILER_FULL_ENABLED}
RECONCILER_FULL_RUN_INTERVAL: ${RECONCILER_FULL_RUN_INTERVAL}
RECONCILER_SLIM_RUN_INTERVAL: ${RECONCILER_SLIM_RUN_INTERVAL}
RECONCILER_DELETES_ENABLED: ${RECONCILER_DELETES_ENABLED}

#
# Wildcard Plugin Variables
Expand Down
5 changes: 5 additions & 0 deletions docs/env-vars.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ VDI system
| RECONCILER_SLIM_RUN_INTERVAL
| Duration
| Interval at which the slim reconciliation process will run.

|
| RECONCILER_DELETES_ENABLED
| boolean
| Whether the reconciler should perform delete operations.
|===


Expand Down
9 changes: 7 additions & 2 deletions docs/vdi-api.html

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ object AppDatabaseRegistry {

operator fun get(key: String): AppDBRegistryEntry? = dataSources[key]

operator fun iterator() =
operator fun iterator() = asSequence().iterator()

fun size() = dataSources.size

fun asSequence() =
dataSources.entries
.asSequence()
.map { (key, value) -> key to value }
.iterator()

fun size() = dataSources.size

fun require(key: String): AppDBRegistryEntry =
get(key) ?: throw IllegalStateException("required AppDB connection $key was not registered with AppDatabases")
Expand Down Expand Up @@ -80,7 +81,7 @@ object AppDatabaseRegistry {
TNS: {}
Pool Size: {}
Port: {}
DBNamme: {}
DBName: {}
User/Schema: {}""",
env.name,
env.name,
Expand Down
10 changes: 10 additions & 0 deletions lib/env/src/main/kotlin/vdi/component/env/EnvKey.kt
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,16 @@ object EnvKey {
* Required: no
*/
const val SlimRunInterval = "RECONCILER_SLIM_RUN_INTERVAL"

/**
* Whether the reconciler should perform delete operations. If set to
* `false`, the reconciler will only log when a delete would have taken
* place.
*
* Type: Boolean
* Required: no
*/
const val DeletesEnabled = "RECONCILER_DELETES_ENABLED"
}

object ReconciliationTriggerHandler {
Expand Down
38 changes: 38 additions & 0 deletions lib/reconciler/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
plugins {
kotlin("jvm")
}

dependencies {
implementation(platform(project(":platform")))


implementation(project(":lib:app-db"))
implementation(project(":lib:cache-db"))
implementation(project(":lib:env"))
implementation(project(":lib:plugin-client"))
implementation(project(":lib:kafka"))
implementation(project(":lib:metrics"))
implementation(project(":lib:module-core"))
implementation(project(":lib:plugin-mapping"))
implementation(project(":lib:s3"))

implementation("org.veupathdb.vdi:vdi-component-common")

implementation("org.veupathdb.lib.s3:s34k-minio")

implementation("org.apache.logging.log4j:log4j-api-kotlin")

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")

testImplementation(kotlin("test"))
testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2")
testImplementation("org.mockito:mockito-core:5.2.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.2")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}

tasks.test {
useJUnitPlatform()

testLogging.showStandardStreams = true
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
Expand Down
111 changes: 111 additions & 0 deletions lib/reconciler/src/main/kotlin/vdi/lib/reconciler/Reconciler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package vdi.lib.reconciler

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.apache.logging.log4j.kotlin.CoroutineThreadContext
import org.apache.logging.log4j.kotlin.ThreadContextData
import org.apache.logging.log4j.kotlin.logger
import org.veupathdb.lib.s3.s34k.S3Api
import vdi.component.db.app.AppDatabaseRegistry
import vdi.component.kafka.router.KafkaRouter
import vdi.component.kafka.router.KafkaRouterFactory
import vdi.component.metrics.Metrics
import vdi.component.modules.AbortCB
import vdi.component.s3.DatasetManager

object Reconciler {
private val logger = logger().delegate

private val config = ReconcilerConfig()

private val active = Mutex()

private var initialized = false

private lateinit var datasetManager: DatasetManager

private lateinit var kafkaRouter: KafkaRouter

fun initialize(abortCB: AbortCB) {
// We lock while initializing to avoid double init.
if (active.isLocked)
return

// If we've already successfully initialized, nothing to do.
if (initialized)
return

// Initialize
runBlocking {
active.withLock {
try {
val s3 = S3Api.newClient(config.s3Config)
val bucket = s3.buckets[config.s3Bucket]
?: throw IllegalStateException("S3 bucket ${config.s3Bucket} does not exist!")

datasetManager = DatasetManager(bucket)
} catch (e: Throwable) {
logger.error("failed to init dataset manager", e)
abortCB(e.message)
}

try {
kafkaRouter = KafkaRouterFactory(config.kafkaRouterConfig).newKafkaRouter()
} catch (e: Throwable) {
logger.error("failed to init kafka router", e)
abortCB(e.message)
}
}
}
}

suspend fun runFull(): Boolean {
if (active.isLocked)
return false

val targets = AppDatabaseRegistry.asSequence()
.map { (project, _) -> AppDBTarget(project, project) }
.toMutableList<ReconcilerTarget>()
targets.add(CacheDBTarget())

val timer = Metrics.Reconciler.Full.reconcilerTimes.startTimer()

logger.info("running full reconciler for ${targets.size} targets")

coroutineScope {
targets.forEach {
launch(CoroutineThreadContext(ThreadContextData(mapOf("workerID" to workerName(false, it))))) {
ReconcilerInstance(it, datasetManager, kafkaRouter, false, config.deletesEnabled).reconcile()
}
}
}

timer.observeDuration()

return true
}

suspend fun runSlim(): Boolean {
if (active.isLocked)
return false

val timer = Metrics.Reconciler.Slim.executionTime.startTimer()

val target = CacheDBTarget()

coroutineScope {
launch(CoroutineThreadContext(ThreadContextData(mapOf("workerID" to workerName(true, target))))) {
ReconcilerInstance(target, datasetManager, kafkaRouter, true, false).reconcile()
}
}

timer.observeDuration()

return true
}

private fun workerName(slim: Boolean, tgt: ReconcilerTarget) = if (slim) "slim-${tgt.name}" else "full-${tgt.name}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package vdi.lib.reconciler

import org.veupathdb.lib.s3.s34k.S3Config
import org.veupathdb.lib.s3.s34k.fields.BucketName
import org.veupathdb.vdi.lib.common.env.Environment
import org.veupathdb.vdi.lib.common.env.optBool
import org.veupathdb.vdi.lib.common.env.require
import vdi.component.env.EnvKey
import vdi.component.kafka.router.KafkaRouterConfig
import vdi.component.s3.util.S3Config

internal data class ReconcilerConfig(
val kafkaRouterConfig: KafkaRouterConfig,
val s3Config: S3Config,
val s3Bucket: BucketName,
val deletesEnabled: Boolean,
) {
constructor() : this(System.getenv())

constructor(env: Environment) : this(
kafkaRouterConfig = KafkaRouterConfig(env, "reconciler"),
s3Config = S3Config(env),
s3Bucket = BucketName(env.require(EnvKey.S3.BucketName)),
deletesEnabled = env.optBool(EnvKey.Reconciler.DeletesEnabled) ?: DefaultDeletesEnabled,
)

companion object {
const val DefaultDeletesEnabled = true
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

import org.apache.logging.log4j.kotlin.logger
import org.veupathdb.vdi.lib.common.field.DatasetID
Expand Down Expand Up @@ -27,15 +27,15 @@ internal class ReconcilerInstance(
private val datasetManager: DatasetManager,
private val kafkaRouter: KafkaRouter,
private val slim: Boolean,
private val deleteDryMode: Boolean = false
private val deletesEnabled: Boolean
) {
private val log = logger().delegate

private var nextTargetDataset: VDIReconcilerTargetRecord? = null

val name = targetDB.name

suspend fun reconcile() {
internal suspend fun reconcile() {
try {
tryReconcile()

Expand Down Expand Up @@ -200,7 +200,7 @@ internal class ReconcilerInstance(

try {
Metrics.Reconciler.Full.reconcilerDatasetDeleted.labels(targetDB.name).inc()
if (!deleteDryMode) {
if (deletesEnabled) {
log.info("trying to delete dataset {}/{}", record.ownerID, record.datasetID)
targetDB.deleteDataset(record)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

import org.veupathdb.vdi.lib.common.model.VDIReconcilerTargetRecord
import org.veupathdb.vdi.lib.common.util.CloseableIterator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

internal enum class ReconcilerTargetType { Cache, Install }
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package vdi.daemon.reconciler
package vdi.lib.reconciler

internal class UnsupportedTypeException(message: String) : Exception(message)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@file:Suppress("NOTHING_TO_INLINE")

package vdi.daemon.reconciler
package vdi.lib.reconciler

import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.DisplayName
Expand Down Expand Up @@ -31,7 +31,7 @@ class ReconcilerTest {
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()

val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, true)

`when`(cacheDb.streamSortedSyncControlRecords()).thenReturn(
closeableIterator(
Expand Down Expand Up @@ -64,7 +64,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, false)

`when`(cacheDb.streamSortedSyncControlRecords())
.thenReturn(closeableIterator(listOf(makeTargetRecord(111, "12345678123456781234567812345678")).iterator()))
Expand All @@ -86,7 +86,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, false)

`when`(cacheDb.streamSortedSyncControlRecords())
.thenReturn(closeableIterator(listOf(makeTargetRecord(111, "12345678123456781234567812345678")).iterator()))
Expand All @@ -104,7 +104,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, false)

`when`(cacheDb.streamSortedSyncControlRecords()).thenReturn(
closeableIterator(emptyList<VDIReconcilerTargetRecord>().iterator())
Expand All @@ -127,7 +127,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, true)

`when`(cacheDb.streamSortedSyncControlRecords()).thenReturn(
closeableIterator(listOf(
Expand Down Expand Up @@ -160,7 +160,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, true)

`when`(cacheDb.type).thenReturn(ReconcilerTargetType.Cache)

Expand Down Expand Up @@ -194,7 +194,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, false)

`when`(cacheDb.type).thenReturn(ReconcilerTargetType.Cache)

Expand Down Expand Up @@ -226,7 +226,7 @@ class ReconcilerTest {
`when`(cacheDb.name).thenReturn("CacheDB")
val datasetManager = mock<DatasetManager>()
val kafkaRouter = mock<KafkaRouter>()
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false)
val recon = ReconcilerInstance(cacheDb, datasetManager, kafkaRouter, false, false)

`when`(cacheDb.type).thenReturn(ReconcilerTargetType.Cache)

Expand Down
Loading

0 comments on commit 293423f

Please sign in to comment.