Skip to content

Commit

Permalink
refactor: updated a variety of packages
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Oct 16, 2024
1 parent 0250afc commit 6e8c9f3
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import technology.idlab.runner.Runner
/**
* A simple implementation of an orchestrator which only succeeds if all runners succeed without
* intervention.
*
* @property parser The parser which is used to parse the configuration.
*/
class SimpleOrchestrator(
/** Configuration of the pipeline. */
parser: Parser
) : Orchestrator {
class SimpleOrchestrator(parser: Parser) : Orchestrator {
/** Message broker. */
private val broker: Broker<ByteArray>

Expand All @@ -26,14 +25,14 @@ class SimpleOrchestrator(

/** Load all stages into their respective runners. */
init {
val result = mutableListOf<Runner>()
val runners = mutableListOf<Runner>()

for (runner in parser.runners()) {
val stages = parser.stages(runner)
result.add(Runner.from(runner, stages))
runners.add(Runner.from(runner, stages))
}

this.runners = result
this.runners = runners
}

/** Initialize a broker. */
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/src/main/kotlin/parser/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface Parser {
/**
* Get a runner by its URI.
*
* @param id The URI of the runner.
* @param uri The URI of the runner.
* @return The runner with the given URI.
*/
fun runner(uri: String): IRRunner
Expand Down
14 changes: 11 additions & 3 deletions orchestrator/src/main/kotlin/process/ProcessManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@ import technology.idlab.log.Log
* throws an error when the process itself logs to the error stream.
*
* @param process The process to manage.
* @property job The job which manages the coroutine scope.
* @property coroutineScope The coroutine scope which manages IO operations for the process.
* @property hooks The exit hooks that must be run when the process exits.
*/
class ProcessManager(
val process: Process,
) {
/** The job used to manage the [coroutineScope]. */
private val job = Job()

/**
* The coroutine scope which manages IO operations for the process. The lifecycle is managed by
* [job].
*/
private val coroutineScope = CoroutineScope(job + Dispatchers.Default)

/**
* Functions that must be executed when the process halts. The integer argument is the exit code
* of the process.
*/
private val hooks = mutableListOf<suspend (Int) -> Unit>()

init {
Expand Down
14 changes: 7 additions & 7 deletions orchestrator/src/main/kotlin/runner/Runner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,12 @@ abstract class Runner(protected val stages: Collection<IRStage>) : BrokerClient<
/** The tasks to run concurrently, in a FIFO order. */
private val tasks = Channel<suspend () -> Unit>(Channel.UNLIMITED)

/** Reference to the broker, set via dependency injection. */
// BrokerClient implementation.
final override lateinit var broker: Broker<ByteArray>

/** The URIs the runner wants to listen to. */
final override val receiving = stages.map { it.readers() }.flatten()

/** The URIs the runners wants to send to. */
final override val sending = stages.map { it.writers() }.flatten()

/**
/*
* All incoming tasks will be handled in a first-in first-out based, in order to guarantee their
* order.
*/
Expand All @@ -58,7 +54,11 @@ abstract class Runner(protected val stages: Collection<IRStage>) : BrokerClient<
}
}

/** Schedule a new task to execute. */
/**
* Schedule a new task to execute in the local [scope].
*
* @param task The task to execute.
*/
fun scheduleTask(task: suspend () -> Unit) {
runBlocking { tasks.send(task) }
}
Expand Down

0 comments on commit 6e8c9f3

Please sign in to comment.