Skip to content

Commit

Permalink
Enhanced Event API
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Aug 25, 2023
1 parent 1e52394 commit 1a27a13
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 20 deletions.
4 changes: 2 additions & 2 deletions roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
if (now < timeframe) continue

if (now != last) {
if (actions.isNotEmpty()) channel.send(Event(actions, last))
channel.sendNotEmpty(Event(actions, last))
last = now
actions = ArrayList<PriceAction>(actions.size)
}
Expand All @@ -152,7 +152,7 @@ class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
val action = recToPriceAction(rec, serializer)
actions.add(action)
}
if (actions.isNotEmpty()) channel.send(Event(actions, last))
channel.sendNotEmpty(Event(actions, last))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class QuestDBFeed(private val tableName: String, dbPath: Path = Config.home / "q

val time = record.getTimestamp(1)
if (time != last) {
if (last != Long.MIN_VALUE) channel.send(Event(actions, ofEpochMicro(last)))
channel.sendNotEmpty(Event(actions, ofEpochMicro(last)))
last = time
actions = mutableListOf()
}
Expand All @@ -125,7 +125,7 @@ class QuestDBFeed(private val tableName: String, dbPath: Path = Config.home / "q
val price = handler.getPriceAction(asset, record)
actions.add(price)
}
if (actions.isNotEmpty()) channel.send(Event(actions, ofEpochMicro(last)))
channel.sendNotEmpty(Event(actions, ofEpochMicro(last)))
}
}

Expand Down
18 changes: 9 additions & 9 deletions roboquant/src/main/kotlin/org/roboquant/Roboquant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,20 @@ data class Roboquant(

/**
* Start a new run using the provided [feed] as data. If no [timeframe] is provided all the events in the feed
* will be processed. You can provide a custom [name] that will help to later identify this run. If none is
* provided, the default [name] "run" will be used.
* will be processed. You can provide a custom [name] that will help to later identify this run.
* If none is provided, the default [name] "run-${timeframe}" will be used that can help to identify the run later.
* Additionally, you can provide a [warmup] period in which no metrics will be logged or orders placed.
*
* By default, at the beginning of a run, all components (besides the logger) will be [reset] and typically discard
* their state. If you don't want this behavior set [reset] to false.
* By default, at the beginning of a run, all components (besides the logger) will be [reset] and as a result
* discard their state. If you don't want this behavior set [reset] to false.
*
* This is the synchronous (blocking) method of run that is convenient to use. However, if you want to execute runs
* in parallel, use the [runAsync] method.
*/
fun run(
feed: Feed,
timeframe: Timeframe = feed.timeframe,
name: String = "run",
name: String = "run-${timeframe.toPrettyString()}",
warmup: TimeSpan = TimeSpan.ZERO,
reset: Boolean = true
) = runBlocking {
Expand All @@ -136,15 +136,15 @@ data class Roboquant(

/**
* This is the same method as the [run] method but as the name already suggests, asynchronously. This makes it
* better suited for running back-test in parallel. Other than that, it behaves exactly the same as the regular
* run method.
* suited for running back-test in parallel. Other than that, it behaves the same as the regular blocking run
* method.
*
* @see [run]
*/
suspend fun runAsync(
feed: Feed,
timeframe: Timeframe = feed.timeframe,
name: String = "run",
name: String = "run-${timeframe.toPrettyString()}",
warmup: TimeSpan = TimeSpan.ZERO,
reset: Boolean = true
) {
Expand Down Expand Up @@ -220,7 +220,7 @@ data class Roboquant(
*
* This method performs the following steps:
* 1. Cancel existing open orders
* 2. Close open positions by placing [MarketOrder] for the required opposite sizes
* 2. Close open positions by placing [MarketOrder] for the required opposite sizes.
* 3. Run and log the metrics
*/
fun closePositions(time: Instant? = null, runName: String = "close") {
Expand Down
8 changes: 5 additions & 3 deletions roboquant/src/main/kotlin/org/roboquant/feeds/Action.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlin.math.absoluteValue
* on those types of actions they are interested in.
*
* # Example
* event.actions.filterByType<PriceBar>(). ...
* event.actions.filterIsInstance<PriceBar>(). ...
*
*/
interface Action
Expand Down Expand Up @@ -63,7 +63,8 @@ interface PriceAction : Action {
fun getPriceAmount(type: String = "DEFAULT") = Amount(asset.currency, getPrice(type))

/**
* Volume for the price action. If not implemented, it should return [Double.NaN]
* Volume for the price action.
* If not supported, it returns [Double.NaN].
*
* Volume in the context of a PriceAction can mean different things. For example, is can be trade volume but also
* the total order-book volume, depending on the type of PriceAction.
Expand Down Expand Up @@ -154,7 +155,7 @@ class PriceBar(
}

/**
* Get the price for this price bar, default is the closing price.
* Get the price for this price bar.
*
* The supported types are: CLOSE, OPEN, LOW, HIGH, TYPICAL, with the default type being "CLOSE".
*
Expand All @@ -165,6 +166,7 @@ class PriceBar(
*/
override fun getPrice(type: String): Double {
return when (type) {
"DEFAULT" -> ohlcv[3]
"CLOSE" -> ohlcv[3]
"OPEN" -> ohlcv[0]
"LOW" -> ohlcv[2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class AggregatorFeed(
expiration = time.expirationTime()
} else if (time >= expiration) {
val newEvent = Event(history.values.toList(), expiration)
if (newEvent.actions.isNotEmpty()) channel.send(newEvent)
channel.sendNotEmpty(newEvent)
history.clear()
expiration += aggregationPeriod
}
Expand All @@ -120,9 +120,9 @@ class AggregatorFeed(
} finally {

// Send remaining
if (remaining && history.isNotEmpty() && expiration != null) {
if (remaining && expiration != null) {
val newEvent = Event(history.values.toList(), expiration)
channel.send(newEvent)
channel.sendNotEmpty(newEvent)
}
if (job.isActive) job.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class AggregatorLiveFeed(
newEvent
}

if (newEvent.actions.isNotEmpty()) channel.send(newEvent)
channel.sendNotEmpty(newEvent)
}

/**
Expand Down
11 changes: 11 additions & 0 deletions roboquant/src/main/kotlin/org/roboquant/feeds/Event.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ class Event(val actions: List<Action>, val time: Instant) : Comparable<Event> {
* their chronological order.
*/
override fun compareTo(other: Event): Int = time.compareTo(other.time)

/**
* Return true if this is event has at least one action, false otherwise
*/
fun isNotEmpty(): Boolean = actions.isNotEmpty()


/**
* Return true if this event has no actions, false otherwise
*/
fun isEmpty(): Boolean = actions.isEmpty()
}

/**
Expand Down
9 changes: 9 additions & 0 deletions roboquant/src/main/kotlin/org/roboquant/feeds/EventChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ open class EventChannel(val capacity: Int = 10, val timeframe: Timeframe = Timef
}
}

/**
* Send an [event] on this channel if it is not an empty event. If the time of event is before the timeframe
* if this channel, it will be silently ignored. And if the event is after the timeframe, the channel
* will be [closed].
*/
suspend fun sendNotEmpty(event: Event) {
if (event.isNotEmpty()) send(event)
}

/**
* Receive an event from the channel. This will throw a [ClosedReceiveChannelException] if the channel
* is [closed].
Expand Down

0 comments on commit 1a27a13

Please sign in to comment.