Skip to content

Commit

Permalink
The ExecutionServer now exposes methods to execute a query pipeline (…
Browse files Browse the repository at this point in the history
…to be implemented).

Signed-off-by: Ralph Gasser <[email protected]>
  • Loading branch information
ppanopticon committed Nov 30, 2023
1 parent 6d38cd8 commit 5009550
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.IndexContextFactory
import org.vitrivr.engine.core.config.operators.*
import org.vitrivr.engine.core.config.pipeline.execution.IndexingPipeline
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.model.content.element.ContentElement
import org.vitrivr.engine.core.model.metamodel.Analyser
Expand All @@ -13,7 +14,6 @@ import org.vitrivr.engine.core.model.retrievable.Retrievable
import org.vitrivr.engine.core.operators.Operator
import org.vitrivr.engine.core.operators.ingest.*
import org.vitrivr.engine.core.util.extension.loadServiceForName
import java.io.ObjectInputFilter
import java.util.stream.Stream

private val logger: KLogger = KotlinLogging.logger {}
Expand All @@ -26,20 +26,20 @@ private val logger: KLogger = KotlinLogging.logger {}
* Pipleine setup
* Enumerator: Source -> Decoder: ContentElement<*> -> Transformer: ContentElement<*> -> Segmenter: Ingested -> Extractor: Ingested -> Exporter: Ingested
*/
class PipelineBuilder(val schema: Schema, val config: IndexConfig) {
class ExtractionPipelineBuilder(val schema: Schema, val config: IndexConfig) {

companion object {
/**
* Creates a new [PipelineBuilder] using the provided [Schema] and [IndexConfig].
* Creates a new [ExtractionPipelineBuilder] using the provided [Schema] and [IndexConfig].
*
* @param schema The [Schema] to create a [PipelineBuilder] for.
* @param config The [IndexConfig] to create a [PipelineBuilder] for.
* @param schema The [Schema] to create a [ExtractionPipelineBuilder] for.
* @param config The [IndexConfig] to create a [ExtractionPipelineBuilder] for.
*/
fun forConfig(schema: Schema, config: IndexConfig) = PipelineBuilder(schema, config)
fun forConfig(schema: Schema, config: IndexConfig) = ExtractionPipelineBuilder(schema, config)
}

/** List of leaf operators held by this [PipelineBuilder]. */
private var pipeline: Pipeline? = null
/** List of leaf operators held by this [ExtractionPipelineBuilder]. */
private var pipeline: IndexingPipeline? = null
private val context: IndexContext

init {
Expand Down Expand Up @@ -233,18 +233,18 @@ class PipelineBuilder(val schema: Schema, val config: IndexConfig) {
}

/**
* Returns the [Operator] pipeline constructed by this [PipelineBuilder].
* Returns the [Operator] pipeline constructed by this [ExtractionPipelineBuilder].
*
* @return [List] of leaf [Operator]s.
*/
fun getPipeline(): Pipeline {
this.pipeline = Pipeline()
fun getPipeline(): IndexingPipeline {
this.pipeline = IndexingPipeline()
this.parseEnumerator(this.config.enumerator, context)
return this.pipeline!!
}

fun getApiPipeline(inputFiles: Stream<*>): Pipeline {
this.pipeline = Pipeline()
fun getApiPipeline(inputFiles: Stream<*>): IndexingPipeline {
this.pipeline = IndexingPipeline()
this.parseApiEnumerator(this.config.enumerator, context, inputFiles)
return this.pipeline!!
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package org.vitrivr.engine.core.config.pipeline.execution

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.model.retrievable.Retrieved
import org.vitrivr.engine.core.operators.ingest.AbstractSegmenter
import java.util.*
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -72,12 +73,28 @@ class ExecutionServer {
}

/**
* Executes an extraction [Pipeline] in a blocking fashion, i.e., the call will block until the [Pipeline] has been executed.
* Executes an extraction [IndexingPipeline] in a blocking fashion, i.e., the call will block until the [IndexingPipeline] has been executed.
*
* @param pipeline The [Pipeline] to execute.
* This is mainly for testing purposes!
*
* @param pipeline The [IndexingPipeline] to execute.
*/
fun extract(pipeline: IndexingPipeline) {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("index-job-$jobId")
runBlocking {
val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(this).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
}
}

/**
* Executes an [IndexingPipeline] in a blocking fashion, i.e., the call will block until the [IndexingPipeline] has been executed.
*
* @param pipeline The [IndexingPipeline] to execute.
* @return [UUID] identifying the job.
*/
fun extractAsync(pipeline: Pipeline): UUID {
fun extractAsync(pipeline: IndexingPipeline): UUID {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("index-job-$jobId")
val job = scope.launch {
Expand All @@ -100,18 +117,28 @@ class ExecutionServer {
}

/**
* Executes an extraction [Pipeline] in a blocking fashion, i.e., the call will block until the [Pipeline] has been executed.
*
* This is mainly for testing purposes!
* Executes a [RetrievalPipeline] in a blocking fashion, i.e., the call will block until the [IndexingPipeline] has been executed.
* @param pipeline The [RetrievalPipeline] to execute.
* @return The resulting [List] of [Retrieved]
*/
fun query(pipeline: RetrievalPipeline): List<Retrieved> {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("query-job-$jobId")
TODO()
}

/**
* Executes an [RetrievalPipeline] in an asynchronous fashion, sending all results to the provided [SendChannel].
*
* @param pipeline The [Pipeline] to execute.
* @param pipeline The [RetrievalPipeline] to execute.
* @param into The [SendChannel] to send the results to.
* @return The [UUID] of the resulting [Job].
*/
fun extract(pipeline: Pipeline) {
fun queryAsync(pipeline: RetrievalPipeline, into: SendChannel<Retrieved>): UUID {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("index-job-$jobId")
runBlocking {
val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(this).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
}
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("query-job-$jobId")
TODO()
return jobId
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.vitrivr.engine.core.config.pipeline
package org.vitrivr.engine.core.config.pipeline.execution

import org.vitrivr.engine.core.model.retrievable.Retrievable
import org.vitrivr.engine.core.operators.Operator
import org.vitrivr.engine.core.source.Source
import java.util.*

open class Pipeline() {
/**
* A pipeline for indexing. It wraps a [List] of [Operator]s.
*
* @author Ralph Gasser
* @version 1.0.0
*/
class IndexingPipeline {
private val leaves: MutableList<Operator<Retrievable>> = mutableListOf()

fun addLeaf(leaf: Operator<Retrievable>) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.vitrivr.engine.core.config.pipeline.execution

import org.vitrivr.engine.core.model.retrievable.Retrieved
import org.vitrivr.engine.core.operators.Operator

/**
* A pipeline for retrieval. It wraps a (query) [Operator] that returns [Retrieved] objects.
*
* TODO: Builder for this must be implemented.
*
* @author Ralph Gasser
* @version 1.0.0
*/
data class RetrievalPipeline(val query: Operator<Retrieved>)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package org.vitrivr.engine.core.model.metamodel
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.config.pipeline.ExtractionPipelineBuilder
import org.vitrivr.engine.core.config.pipeline.execution.IndexingPipeline
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.context.QueryContext
import org.vitrivr.engine.core.database.Connection
Expand Down Expand Up @@ -42,8 +42,8 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
/** The [List] of [Exporter]s contained in this [Schema]. */
private val exporters: MutableList<Schema.Exporter> = mutableListOf()

/** The [List] of [Pipeline]s contained in this [Schema]. */
private val extractionPipelines: MutableMap<String, PipelineBuilder> = mutableMapOf()
/** The [List] of [IndexingPipeline]s contained in this [Schema]. */
private val extractionPipelines: MutableMap<String, ExtractionPipelineBuilder> = mutableMapOf()

/**
* Adds a new [Field] to this [Schema].
Expand Down Expand Up @@ -79,7 +79,7 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
* @param resolver The [Resolver] instance.
*/
fun addPipeline(name: String, config: IndexConfig) {
this.extractionPipelines[name] = PipelineBuilder(this, config)
this.extractionPipelines[name] = ExtractionPipelineBuilder(this, config)
}

/**
Expand Down Expand Up @@ -114,7 +114,7 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
fun getExporter(name: String) = this.exporters.firstOrNull { it.name == name }


fun getPipelineBuilder(key: String): PipelineBuilder = this.extractionPipelines[key]
fun getPipelineBuilder(key: String): ExtractionPipelineBuilder = this.extractionPipelines[key]
?: throw IllegalArgumentException("No pipeline with key '$key' found in schema '$name'.")

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.github.ajalt.clikt.core.NoOpCliktCommand
import com.github.ajalt.clikt.core.subcommands
import com.jakewharton.picnic.table
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.config.pipeline.ExtractionPipelineBuilder
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.database.Initializer
import org.vitrivr.engine.core.model.metamodel.Schema
Expand Down Expand Up @@ -100,7 +100,7 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer
inner class Extract(private val schema: Schema, private val executor: ExecutionServer) : CliktCommand(name = "extract", help = "Extracts data from a source and stores it in the schema.") {
override fun run() {
val config = IndexConfig.read(Paths.get(IndexConfig.DEFAULT_PIPELINE_PATH)) ?: return
val pipelineBuilder = PipelineBuilder.forConfig(this.schema, config)
val pipelineBuilder = ExtractionPipelineBuilder.forConfig(this.schema, config)
val pipeline = pipelineBuilder.getPipeline()
val uuid = this.executor.extractAsync(pipeline)
println("Started extraction job with UUID $uuid.")
Expand Down

0 comments on commit 5009550

Please sign in to comment.