From bd2f630006ed901c30c33c51528f7d90e59e447f Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 20 Nov 2023 16:37:21 +0100 Subject: [PATCH 1/4] threading api --- .../server/api/helper/IndexThreadPool.kt | 107 ++++++++++++++++++ .../engine/server/api/rest/handlers/Index.kt | 28 ++++- 2 files changed, 132 insertions(+), 3 deletions(-) create mode 100644 vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt new file mode 100644 index 00000000..bc56af4e --- /dev/null +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt @@ -0,0 +1,107 @@ +package org.vitrivr.engine.server.api.helper + +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Duration +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.temporal.Temporal +import java.util.* + + +private val logger: KLogger = KotlinLogging.logger {} + +class IndexThreadPool { + companion object { + val MAX_DURATION = Duration.ofHours(1); + private val threads = mutableMapOf>() + + fun addThreadAndStart(threadId: String, thread: Thread): String { + val id = addThread(threadId, thread); + val sid = startThread(id); + return sid; + } + + fun addThread(threadId: String, thread: Thread): String { + val started = LocalDateTime.now(); + threads[threadId] = Pair(thread, started); + return threadId; + } + + fun startThread(uuid: String): String { + val thread = threads[uuid]; + if (thread != null) { + val (t, s) = thread; + val started = LocalDateTime.now(); + t.start(); + threads[uuid] = Pair(t, started); + return uuid; + } + return ""; + } + + fun addThread(thread: Thread): String { + val myUuid = UUID.randomUUID().toString(); + return addThread(myUuid, thread); + } + + + fun getThreadState(uuid: String): Thread.State? { + val thread = threads[uuid]; + if (thread != null) { + val (t, started) = thread; + return t.state + } + return null; + } + + fun getAllThreadIds(): List { + return threads.keys.toList(); + } + + + fun terminateThread(uuid: String) { + val thread = threads[uuid]; + if (thread != null) { + val (t, started) = thread; + t.interrupt(); + threads.remove(uuid); + } + } + + fun cleanThreadPool(): List { + + val ids = terminateThreads(ThreadState.DEAD); + val idx = terminateThreads(ThreadState.DEPRECATED); + return ids + idx; + } + + enum class ThreadState { + ALL, DEAD, DEPRECATED + } + + fun terminateThreads(terminate: ThreadState): List { + val ids = mutableListOf(); + threads.forEach { (uuid, thread) -> + when (terminate) { + ThreadState.ALL -> { + ids.add(uuid); + } + ThreadState.DEAD -> { + if (!thread.first.isAlive) { + ids.add(uuid); } + } + ThreadState.DEPRECATED -> { + if (Duration.between(thread.second, LocalDateTime.now()).compareTo(MAX_DURATION) > 0) { + ids.add(uuid); + } + } + } + } + ids.forEach() { id -> + terminateThread(id); + } + return ids; + } + } +} \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt index d04e3a29..0fdd6d39 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt @@ -1,15 +1,22 @@ package org.vitrivr.engine.server.api.rest.handlers +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging import io.javalin.http.Context import io.javalin.openapi.* import io.javalin.util.FileUtil +import org.slf4j.event.LoggingEvent import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.model.metamodel.Schema +import org.vitrivr.engine.server.api.helper.IndexThreadPool import org.vitrivr.engine.server.api.rest.model.ErrorStatus import org.vitrivr.engine.server.api.rest.model.ErrorStatusException import java.nio.file.Path import kotlin.io.path.deleteIfExists + +private val logger: KLogger = KotlinLogging.logger {} + /** * * @author Ralph Gasser @@ -53,13 +60,28 @@ fun executeIngest(ctx: Context, schema: Schema) { val stream = filestream.stream() val pipelineBuilder = pipelineName?.let { schema.getPipelineBuilder(it) } ?: throw ErrorStatusException(400, "Invalid request: Pipeline '$pipelineName' does not exist.") + // ASYNC UUID val pipeline = pipelineBuilder.getApiPipeline(stream) - val server = ExecutionServer().extract(pipeline) + val id = IndexThreadPool.addThreadAndStart( + threadId, Thread { + try { + + ExecutionServer().extract(pipeline) + logger.debug { "Thread ${Thread.currentThread().id} finished" } + + } catch (e: Exception) { + throw ErrorStatusException(400, "Invalid request: ${e.message}") + } + } + ) + val ids = IndexThreadPool.cleanThreadPool() + ctx.json(mapOf("id" to id, "cleaned ids" to ids)) + } catch (e: Exception) { throw ErrorStatusException(400, "Invalid request: ${e.message}") } finally { - filestream.forEach() { - file -> file.deleteIfExists() + filestream.forEach() { file -> + file.deleteIfExists() } basepath.deleteIfExists() } From 53733c633c26e94869a8979c2d159ed036447f9c Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 20 Nov 2023 16:37:21 +0100 Subject: [PATCH 2/4] threading api --- .../server/api/helper/IndexThreadPool.kt | 107 ++++++++++++++++++ .../engine/server/api/rest/handlers/Index.kt | 28 ++++- 2 files changed, 132 insertions(+), 3 deletions(-) create mode 100644 vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt new file mode 100644 index 00000000..bc56af4e --- /dev/null +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt @@ -0,0 +1,107 @@ +package org.vitrivr.engine.server.api.helper + +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Duration +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.temporal.Temporal +import java.util.* + + +private val logger: KLogger = KotlinLogging.logger {} + +class IndexThreadPool { + companion object { + val MAX_DURATION = Duration.ofHours(1); + private val threads = mutableMapOf>() + + fun addThreadAndStart(threadId: String, thread: Thread): String { + val id = addThread(threadId, thread); + val sid = startThread(id); + return sid; + } + + fun addThread(threadId: String, thread: Thread): String { + val started = LocalDateTime.now(); + threads[threadId] = Pair(thread, started); + return threadId; + } + + fun startThread(uuid: String): String { + val thread = threads[uuid]; + if (thread != null) { + val (t, s) = thread; + val started = LocalDateTime.now(); + t.start(); + threads[uuid] = Pair(t, started); + return uuid; + } + return ""; + } + + fun addThread(thread: Thread): String { + val myUuid = UUID.randomUUID().toString(); + return addThread(myUuid, thread); + } + + + fun getThreadState(uuid: String): Thread.State? { + val thread = threads[uuid]; + if (thread != null) { + val (t, started) = thread; + return t.state + } + return null; + } + + fun getAllThreadIds(): List { + return threads.keys.toList(); + } + + + fun terminateThread(uuid: String) { + val thread = threads[uuid]; + if (thread != null) { + val (t, started) = thread; + t.interrupt(); + threads.remove(uuid); + } + } + + fun cleanThreadPool(): List { + + val ids = terminateThreads(ThreadState.DEAD); + val idx = terminateThreads(ThreadState.DEPRECATED); + return ids + idx; + } + + enum class ThreadState { + ALL, DEAD, DEPRECATED + } + + fun terminateThreads(terminate: ThreadState): List { + val ids = mutableListOf(); + threads.forEach { (uuid, thread) -> + when (terminate) { + ThreadState.ALL -> { + ids.add(uuid); + } + ThreadState.DEAD -> { + if (!thread.first.isAlive) { + ids.add(uuid); } + } + ThreadState.DEPRECATED -> { + if (Duration.between(thread.second, LocalDateTime.now()).compareTo(MAX_DURATION) > 0) { + ids.add(uuid); + } + } + } + } + ids.forEach() { id -> + terminateThread(id); + } + return ids; + } + } +} \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt index d04e3a29..0fdd6d39 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt @@ -1,15 +1,22 @@ package org.vitrivr.engine.server.api.rest.handlers +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging import io.javalin.http.Context import io.javalin.openapi.* import io.javalin.util.FileUtil +import org.slf4j.event.LoggingEvent import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.model.metamodel.Schema +import org.vitrivr.engine.server.api.helper.IndexThreadPool import org.vitrivr.engine.server.api.rest.model.ErrorStatus import org.vitrivr.engine.server.api.rest.model.ErrorStatusException import java.nio.file.Path import kotlin.io.path.deleteIfExists + +private val logger: KLogger = KotlinLogging.logger {} + /** * * @author Ralph Gasser @@ -53,13 +60,28 @@ fun executeIngest(ctx: Context, schema: Schema) { val stream = filestream.stream() val pipelineBuilder = pipelineName?.let { schema.getPipelineBuilder(it) } ?: throw ErrorStatusException(400, "Invalid request: Pipeline '$pipelineName' does not exist.") + // ASYNC UUID val pipeline = pipelineBuilder.getApiPipeline(stream) - val server = ExecutionServer().extract(pipeline) + val id = IndexThreadPool.addThreadAndStart( + threadId, Thread { + try { + + ExecutionServer().extract(pipeline) + logger.debug { "Thread ${Thread.currentThread().id} finished" } + + } catch (e: Exception) { + throw ErrorStatusException(400, "Invalid request: ${e.message}") + } + } + ) + val ids = IndexThreadPool.cleanThreadPool() + ctx.json(mapOf("id" to id, "cleaned ids" to ids)) + } catch (e: Exception) { throw ErrorStatusException(400, "Invalid request: ${e.message}") } finally { - filestream.forEach() { - file -> file.deleteIfExists() + filestream.forEach() { file -> + file.deleteIfExists() } basepath.deleteIfExists() } From 3bd8db961016af42efae8ac687827279d87d3991 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Nov 2023 14:53:41 +0100 Subject: [PATCH 3/4] Changes Execution Server to an singleton for each schema. Extraction jobs were submitted over an blocking queue. e.g. `schema.getExecutionServer().enqueueIndexJob(pipeline)` --- .../pipeline/execution/ExecutionServer.kt | 66 +++++++++++++++---- .../engine/core/model/metamodel/Schema.kt | 6 ++ .../kotlin/org/vitrivr/engine/server/Main.kt | 3 +- .../server/api/cli/commands/SchemaCommand.kt | 2 +- .../engine/server/api/rest/handlers/Index.kt | 25 ++----- .../src/main/resources/log4j2.xml | 8 +-- 6 files changed, 71 insertions(+), 39 deletions(-) diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt index 13618242..ed3945e6 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt @@ -5,13 +5,18 @@ import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.takeWhile +import org.vitrivr.engine.core.config.pipeline.Pipeline +import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import org.vitrivr.engine.core.model.metamodel.Schema import org.vitrivr.engine.core.operators.Operator import org.vitrivr.engine.core.operators.ingest.AbstractSegmenter import org.vitrivr.engine.core.operators.ingest.Extractor -import org.vitrivr.engine.core.config.pipeline.Pipeline -import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import java.util.UUID +import java.util.concurrent.BlockingQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingDeque + private val logger: KLogger = KotlinLogging.logger {} @@ -20,35 +25,70 @@ private val logger: KLogger = KotlinLogging.logger {} * @author Ralph Gasser * @version 1.0.0 */ -class ExecutionServer { +class ExecutionServer private constructor(schema: Schema){ + + companion object { + + @Volatile private var instances: MutableMap = mutableMapOf() + + fun getInstance(schema: Schema) = + instances[schema] ?: synchronized(this) { // synchronized to avoid concurrency problem + instances[schema] ?: ExecutionServer(schema).also { instances[schema] = it } + } + } + + /** The [ExecutorService] used to execution [] */ private val executor: ExecutorService = Executors.newCachedThreadPool() /** The [CoroutineDispatcher] used for execution. */ private val dispatcher: CoroutineDispatcher = this.executor.asCoroutineDispatcher() - private lateinit var operators: List> + var indexJobQueue: BlockingQueue> = LinkedBlockingDeque() + + init { + this.run() + } + + fun isPending(uuid: UUID): Int { + return this.indexJobQueue.indexOf(this.indexJobQueue.find { it.second == uuid }) + } + + fun enqueueIndexJob(pipeline: Pipeline): UUID { + val uuid = UUID.randomUUID() + return this.enqueueIndexJob(pipeline, uuid) + } + + fun enqueueIndexJob(pipeline: Pipeline, uuid: UUID): UUID{ + this.indexJobQueue.add(Pair(pipeline, uuid)) + return uuid; + } /** * Executes an extraction job using a [List] of [Extractor]s. * * @param extractors The [List] of [Extractor]s to execute. */ - fun extract(pipeline: Pipeline) = runBlocking { + private fun extract(pipeline: Pipeline) = runBlocking { val scope = CoroutineScope(this@ExecutionServer.dispatcher) val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(scope).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } } jobs.forEach { it.join() } } - fun addOperatorPipeline(operatorPipeline: PipelineBuilder){ - this.operators = operatorPipeline.getPipeline().getLeaves(); - } - - fun execute() = runBlocking { - val scope = CoroutineScope(this@ExecutionServer.dispatcher) - val jobs = this@ExecutionServer.operators.map { e -> scope.launch { e.toFlow(scope).takeWhile() { it != AbstractSegmenter.TerminalRetrievable }.collect() } } - jobs.forEach { it.join() } + private fun run() { + Thread { + val running = true + while (running) { + val pipeline = indexJobQueue.take() + try { + this.extract(pipeline.first) + logger.debug { "Extraction with pipeline '${pipeline.second}' finished." } + } catch (e: Exception) { + logger.error { "Error while executing extraction job: ${e.message}" } + } + } + }.start() } /** diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt index e99aeae9..c8f1553d 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/model/metamodel/Schema.kt @@ -5,6 +5,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import org.vitrivr.engine.core.config.IndexConfig import org.vitrivr.engine.core.config.pipeline.Pipeline import org.vitrivr.engine.core.config.pipeline.PipelineBuilder +import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.context.IndexContext import org.vitrivr.engine.core.context.QueryContext import org.vitrivr.engine.core.database.Connection @@ -42,6 +43,9 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab /** The [List] of [Exporter]s contained in this [Schema]. */ private val exporters: MutableList = mutableListOf() + /** The [List] of [Pipeline]s contained in this [Schema]. */ + private val executionServer: ExecutionServer = ExecutionServer.getInstance(this) + /** The [List] of [Pipeline]s contained in this [Schema]. */ private val extractionPipelines: MutableMap = mutableMapOf() @@ -117,6 +121,8 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab fun getPipelineBuilder(key: String): PipelineBuilder = this.extractionPipelines[key] ?: throw IllegalArgumentException("No pipeline with key '$key' found in schema '$name'.") + fun getExecutionServer(): ExecutionServer = this.executionServer + /** * Closes this [Schema] and the associated database [Connection]. */ diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt index ca421280..a760fc6c 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/Main.kt @@ -39,7 +39,6 @@ fun main(args: Array) { } /* Initialize retrieval runtime. */ - val executionServer = ExecutionServer() val runtime = RetrievalRuntime() /* Prepare Javalin endpoint. */ @@ -85,7 +84,7 @@ fun main(args: Array) { /* Prepare CLI endpoint. */ val cli = Cli(manager) for (schema in manager.listSchemas()) { - cli.register(SchemaCommand(schema, executionServer)) + cli.register(SchemaCommand(schema, schema.getExecutionServer())) } /* Start the Javalin and CLI. */ diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt index ed275863..0be44406 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/cli/commands/SchemaCommand.kt @@ -99,7 +99,7 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer val config = IndexConfig.read(Paths.get(IndexConfig.DEFAULT_PIPELINE_PATH)) ?: return val pipelineBuilder = PipelineBuilder.forConfig(this.schema, config) val pipeline = pipelineBuilder.getPipeline() - this.server.extract(pipeline) + schema.getExecutionServer().enqueueIndexJob(pipeline) } } } \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt index 0fdd6d39..b9d06e7c 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt @@ -5,13 +5,11 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.javalin.http.Context import io.javalin.openapi.* import io.javalin.util.FileUtil -import org.slf4j.event.LoggingEvent -import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer import org.vitrivr.engine.core.model.metamodel.Schema -import org.vitrivr.engine.server.api.helper.IndexThreadPool import org.vitrivr.engine.server.api.rest.model.ErrorStatus import org.vitrivr.engine.server.api.rest.model.ErrorStatusException import java.nio.file.Path +import java.util.* import kotlin.io.path.deleteIfExists @@ -49,8 +47,9 @@ fun executeIngest(ctx: Context, schema: Schema) { } val filestream: MutableList = mutableListOf() // folder with threadId to avoid deleting files from other threads - val threadId = Thread.currentThread().hashCode().toString() + Thread.currentThread().id.toString() - val basepath = Path.of("upload/$threadId/") + val uuid = UUID.randomUUID(); + val basepath = Path.of("upload/$uuid/") + try { ctx.uploadedFiles("data").forEach { uploadedFile -> val path = Path.of("$basepath/${uploadedFile.filename()}") @@ -60,22 +59,10 @@ fun executeIngest(ctx: Context, schema: Schema) { val stream = filestream.stream() val pipelineBuilder = pipelineName?.let { schema.getPipelineBuilder(it) } ?: throw ErrorStatusException(400, "Invalid request: Pipeline '$pipelineName' does not exist.") - // ASYNC UUID val pipeline = pipelineBuilder.getApiPipeline(stream) - val id = IndexThreadPool.addThreadAndStart( - threadId, Thread { - try { - - ExecutionServer().extract(pipeline) - logger.debug { "Thread ${Thread.currentThread().id} finished" } - } catch (e: Exception) { - throw ErrorStatusException(400, "Invalid request: ${e.message}") - } - } - ) - val ids = IndexThreadPool.cleanThreadPool() - ctx.json(mapOf("id" to id, "cleaned ids" to ids)) + schema.getExecutionServer().enqueueIndexJob(pipeline, uuid) + ctx.json(mapOf("id" to uuid)) } catch (e: Exception) { throw ErrorStatusException(400, "Invalid request: ${e.message}") diff --git a/vitrivr-engine-server/src/main/resources/log4j2.xml b/vitrivr-engine-server/src/main/resources/log4j2.xml index c63723df..40763013 100644 --- a/vitrivr-engine-server/src/main/resources/log4j2.xml +++ b/vitrivr-engine-server/src/main/resources/log4j2.xml @@ -33,15 +33,15 @@ Valid values for this attribute are "off", "trace", "debug", "info", "warn", "er - + - + - + - + From e0d3ccbac51257941e5c9c7657e125d4e9b20c1e Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Nov 2023 15:12:38 +0100 Subject: [PATCH 4/4] Added status endpoint --- .../pipeline/execution/ExecutionServer.kt | 2 + .../server/api/helper/IndexThreadPool.kt | 107 ------------------ .../vitrivr/engine/server/api/rest/Routes.kt | 8 +- .../engine/server/api/rest/handlers/Index.kt | 40 +++++++ 4 files changed, 46 insertions(+), 111 deletions(-) delete mode 100644 vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt index ed3945e6..99846e8b 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/config/pipeline/execution/ExecutionServer.kt @@ -87,6 +87,8 @@ class ExecutionServer private constructor(schema: Schema){ } catch (e: Exception) { logger.error { "Error while executing extraction job: ${e.message}" } } + // wait + Thread.sleep(10000) } }.start() } diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt deleted file mode 100644 index bc56af4e..00000000 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/helper/IndexThreadPool.kt +++ /dev/null @@ -1,107 +0,0 @@ -package org.vitrivr.engine.server.api.helper - -import io.github.oshai.kotlinlogging.KLogger -import io.github.oshai.kotlinlogging.KotlinLogging -import java.time.Duration -import java.time.LocalDate -import java.time.LocalDateTime -import java.time.temporal.Temporal -import java.util.* - - -private val logger: KLogger = KotlinLogging.logger {} - -class IndexThreadPool { - companion object { - val MAX_DURATION = Duration.ofHours(1); - private val threads = mutableMapOf>() - - fun addThreadAndStart(threadId: String, thread: Thread): String { - val id = addThread(threadId, thread); - val sid = startThread(id); - return sid; - } - - fun addThread(threadId: String, thread: Thread): String { - val started = LocalDateTime.now(); - threads[threadId] = Pair(thread, started); - return threadId; - } - - fun startThread(uuid: String): String { - val thread = threads[uuid]; - if (thread != null) { - val (t, s) = thread; - val started = LocalDateTime.now(); - t.start(); - threads[uuid] = Pair(t, started); - return uuid; - } - return ""; - } - - fun addThread(thread: Thread): String { - val myUuid = UUID.randomUUID().toString(); - return addThread(myUuid, thread); - } - - - fun getThreadState(uuid: String): Thread.State? { - val thread = threads[uuid]; - if (thread != null) { - val (t, started) = thread; - return t.state - } - return null; - } - - fun getAllThreadIds(): List { - return threads.keys.toList(); - } - - - fun terminateThread(uuid: String) { - val thread = threads[uuid]; - if (thread != null) { - val (t, started) = thread; - t.interrupt(); - threads.remove(uuid); - } - } - - fun cleanThreadPool(): List { - - val ids = terminateThreads(ThreadState.DEAD); - val idx = terminateThreads(ThreadState.DEPRECATED); - return ids + idx; - } - - enum class ThreadState { - ALL, DEAD, DEPRECATED - } - - fun terminateThreads(terminate: ThreadState): List { - val ids = mutableListOf(); - threads.forEach { (uuid, thread) -> - when (terminate) { - ThreadState.ALL -> { - ids.add(uuid); - } - ThreadState.DEAD -> { - if (!thread.first.isAlive) { - ids.add(uuid); } - } - ThreadState.DEPRECATED -> { - if (Duration.between(thread.second, LocalDateTime.now()).compareTo(MAX_DURATION) > 0) { - ids.add(uuid); - } - } - } - } - ids.forEach() { id -> - terminateThread(id); - } - return ids; - } - } -} \ No newline at end of file diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt index cfac3b42..fc6494b3 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/Routes.kt @@ -3,10 +3,7 @@ package org.vitrivr.engine.server.api.rest import io.javalin.apibuilder.ApiBuilder.* import org.vitrivr.engine.core.model.metamodel.SchemaManager import org.vitrivr.engine.query.execution.RetrievalRuntime -import org.vitrivr.engine.server.api.rest.handlers.executeIngest -import org.vitrivr.engine.server.api.rest.handlers.executeQuery -import org.vitrivr.engine.server.api.rest.handlers.fetchExportData -import org.vitrivr.engine.server.api.rest.handlers.listSchemas +import org.vitrivr.engine.server.api.rest.handlers.* import org.vitrivr.engine.server.config.ApiConfig @@ -36,6 +33,9 @@ fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRunti path(schema.name) { if (config.index) { post("index") { ctx -> executeIngest(ctx, schema) } + path("index") { + get("{id}") { ctx -> executeIngestStatus(ctx, schema) } + } } if (config.retrieval) { diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt index b9d06e7c..8e4d3447 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/Index.kt @@ -18,6 +18,7 @@ private val logger: KLogger = KotlinLogging.logger {} /** * * @author Ralph Gasser + * @author Raphael * @version 1.0 */ @OpenApi( @@ -72,4 +73,43 @@ fun executeIngest(ctx: Context, schema: Schema) { } basepath.deleteIfExists() } +} + +/** + * + * @author Raphael + * @version 1.0 + */ +@OpenApi( + path = "/api/{schema}/index/{id}", + methods = [HttpMethod.GET], + summary = "Indexes an item, adding it to the defined schema.", + operationId = "postExecuteIngest", + tags = ["Ingest"], + pathParams = [ + OpenApiParam( + "schema", + type = String::class, + description = "The name of the schema to execute a query for.", + required = true + ), OpenApiParam( + "id", + type = String::class, + description = "The id querying the state.", + required = true + ) + ], + responses = [ + OpenApiResponse("200", [OpenApiContent(Any::class)]), + OpenApiResponse("400", [OpenApiContent(ErrorStatus::class)]) + ] +) +fun executeIngestStatus(ctx: Context, schema: Schema) { + val id = try { + UUID.fromString(ctx.pathParam("id")) + } catch (e: Exception) { + throw ErrorStatusException(400, "Invalid request: ${e.message}") + } + val status = schema.getExecutionServer().isPending(id) + ctx.json(mapOf("status" to status)) } \ No newline at end of file