Skip to content

Commit

Permalink
Merge pull request #161 from boudicca-events/abl/de-eventify
Browse files Browse the repository at this point in the history
Abl/de eventify
  • Loading branch information
kadhonn authored Oct 21, 2023
2 parents 911199f + 182ab30 commit 0523e8b
Show file tree
Hide file tree
Showing 58 changed files with 410 additions and 391 deletions.
2 changes: 1 addition & 1 deletion .run/LocalEventDB.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<option name="MAIN_CLASS_NAME" value="base.boudicca.eventdb.EventDBApplicationKt" />
<module name="boudicca.boudicca.base.eventdb.main" />
<shortenClasspath name="NONE" />
<option name="VM_PARAMETERS" value="-Dboudicca.store.path=boudicca.store" />
<option name="VM_PARAMETERS" value="-Dboudicca.store.path=boudicca.store -Dboudicca.entryKeyNames=name,startDate" />
<extension name="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
Expand Down
26 changes: 26 additions & 0 deletions boudicca.base/enricher-api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
kotlin("jvm")
kotlin("plugin.allopen")
}

repositories {
mavenCentral()
mavenLocal()
}

dependencies {
api(project(":boudicca.base:semantic-conventions"))
implementation("org.jetbrains.kotlin:kotlin-stdlib")
implementation(project(":boudicca.base:enricher-openapi"))
}

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
vendor.set(JvmVendorSpec.ADOPTIUM)
}
}

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions.javaParameters = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package base.boudicca.api.enricher

import base.boudicca.Event
import events.boudicca.enricher.openapi.ApiClient
import events.boudicca.enricher.openapi.api.EnricherControllerApi
import events.boudicca.enricher.openapi.model.EnrichRequestDTO

class Enricher(enricherUrl: String) {

private val enricherApi: EnricherControllerApi

init {
if (enricherUrl.isBlank()) {
throw IllegalStateException("you need to pass an eventDbUrl!")
}
val apiClient = ApiClient()
apiClient.updateBaseUri(enricherUrl)

enricherApi = EnricherControllerApi(apiClient)
}

fun enrichEvents(events: List<Event>): List<Event> {
return enricherApi.enrich(EnrichRequestDTO().events(events.map { mapToEnricherEvent(it) })).map { toEvent(it) }
}

private fun toEvent(enricherEvent: events.boudicca.enricher.openapi.model.Event): Event {
return Event(enricherEvent.name, enricherEvent.startDate, enricherEvent.data ?: mapOf())
}

private fun mapToEnricherEvent(event: Event): events.boudicca.enricher.openapi.model.Event {
return events.boudicca.enricher.openapi.model.Event()
.name(event.name)
.startDate(event.startDate)
.data(event.data)
}
}
4 changes: 2 additions & 2 deletions boudicca.base/enricher-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ repositories {

dependencies {
implementation("org.json:json:20231013")
implementation(project(":boudicca.base:enricher-openapi"))
implementation(project(":boudicca.base:eventdb-openapi"))
implementation(project(":boudicca.base:enricher-api"))
implementation(project(":boudicca.base:publisher-api"))
}

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package base.boudicca.enricher_utils

import base.boudicca.Event
import base.boudicca.SemanticKeys
import events.boudicca.enricher.openapi.api.EnricherControllerApi
import events.boudicca.enricher.openapi.model.EnrichRequestDTO
import events.boudicca.enricher.openapi.model.Event
import events.boudicca.openapi.api.EventPublisherResourceApi
import base.boudicca.api.enricher.Enricher
import base.boudicca.api.eventdb.publisher.EventDB

private const val EVENTDB_URL = "http://localhost:8081"
private const val ENRICHER_URL = "http://localhost:8085"

fun main() {

var startTime = System.currentTimeMillis()
val events = base.boudicca.enricher_utils.getEvents()
val events = getEvents()
println("fetch all events took ${System.currentTimeMillis() - startTime}ms")

val filteredEvents = events.filter { it.data?.get(SemanticKeys.COLLECTORNAME) == "linz termine" }
val filteredEvents = events.filter { it.data[SemanticKeys.COLLECTORNAME] == "linz termine" }

startTime = System.currentTimeMillis()
val enrichedEvents = base.boudicca.enricher_utils.enrich(filteredEvents)
val enrichedEvents = enrich(filteredEvents)
println("enrich filtered events took ${System.currentTimeMillis() - startTime}ms")

base.boudicca.enricher_utils.compare(filteredEvents, enrichedEvents)
compare(filteredEvents, enrichedEvents)
}

fun compare(events: List<Event>, enrichedEvents: List<Event>) {
Expand All @@ -31,31 +30,31 @@ fun compare(events: List<Event>, enrichedEvents: List<Event>) {

for (i in events.indices) {
if (events[i] != enrichedEvents[i]) {
base.boudicca.enricher_utils.printDiff(events[i], enrichedEvents[i])
printDiff(events[i], enrichedEvents[i])
}
}
}

fun printDiff(event: Event, enrichedEvent: Event) {
println()
base.boudicca.enricher_utils.printValues("name", event.name, enrichedEvent.name)
printValues("name", event.name, enrichedEvent.name)
if (event.startDate != enrichedEvent.startDate) {
base.boudicca.enricher_utils.printValues(
printValues(
"startDate",
event.startDate.toString(),
enrichedEvent.startDate.toString()
)
}
val oldValues = event.data ?: emptyMap()
val newValues = enrichedEvent.data?.toMutableMap() ?: mutableMapOf()
val oldValues = event.data
val newValues = enrichedEvent.data.toMutableMap()
for (key in oldValues.keys.sorted()) {
if (oldValues[key] != newValues[key]) {
base.boudicca.enricher_utils.printValues(key, oldValues[key], newValues[key])
printValues(key, oldValues[key], newValues[key])
}
newValues.remove(key)
}
for (key in newValues.keys.sorted()) {
base.boudicca.enricher_utils.printValues(key, null, newValues[key])
printValues(key, null, newValues[key])
}
}

Expand All @@ -64,18 +63,9 @@ fun printValues(key: String, oldValue: String?, newValue: String?) {
}

private fun enrich(originalEvents: List<Event>): List<Event> {
val apiClient = events.boudicca.enricher.openapi.ApiClient()
apiClient.updateBaseUri(base.boudicca.enricher_utils.ENRICHER_URL)
val enricherApi = EnricherControllerApi(apiClient)

return enricherApi.enrich(EnrichRequestDTO().events(originalEvents))
return Enricher(ENRICHER_URL).enrichEvents(originalEvents)
}


fun getEvents(): List<Event> {
val apiClient = events.boudicca.openapi.ApiClient()
apiClient.updateBaseUri(base.boudicca.enricher_utils.EVENTDB_URL)
val eventdbResource = EventPublisherResourceApi(apiClient)

return eventdbResource.eventsGet().map { Event().name(it.name).startDate(it.startDate).data(it.data) }
return EventDB(EVENTDB_URL).getAllEvents()
}
8 changes: 4 additions & 4 deletions boudicca.base/eventcollector-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ repositories {
}

dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib")
api("org.jetbrains.kotlin:kotlin-stdlib")
implementation("org.apache.velocity:velocity-engine-core:2.3")
implementation("org.apache.velocity.tools:velocity-tools-generic:3.1")
api("ch.qos.logback:logback-classic:1.4.11")
implementation("ch.qos.logback:logback-classic:1.4.11")
api("org.slf4j:slf4j-api:2.0.9")
api(project(":boudicca.base:eventdb-openapi"))
api(project(":boudicca.base:enricher-openapi"))
implementation(project(":boudicca.base:ingest-api"))
implementation(project(":boudicca.base:enricher-api"))
}

java {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package base.boudicca.api.eventcollector

import base.boudicca.Event

interface EventCollector {
fun getName(): String
fun collectEvents(): List<Event>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package base.boudicca.api.eventcollector

import base.boudicca.Event
import base.boudicca.api.eventcollector.collections.Collections

class EventCollectorDebugger {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package base.boudicca.api.eventcollector

import base.boudicca.Event
import base.boudicca.SemanticKeys
import base.boudicca.api.enricher.Enricher
import base.boudicca.api.eventcollector.collections.Collections
import events.boudicca.enricher.openapi.api.EnricherControllerApi
import events.boudicca.enricher.openapi.model.EnrichRequestDTO
import events.boudicca.openapi.ApiClient
import events.boudicca.openapi.ApiException
import events.boudicca.openapi.api.EventIngestionResourceApi
import base.boudicca.api.eventdb.ingest.EventDB
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
Expand Down Expand Up @@ -99,7 +96,7 @@ class EventCollectorScheduler(
eventSink.accept(event)
}
}
} catch (e: ApiException) {
} catch (e: RuntimeException) {
LOG.error("could not ingest events, is the core available?", e)
}
} catch (e: Exception) {
Expand All @@ -124,16 +121,16 @@ class EventCollectorScheduler(
if (event.name.isBlank()) {
LOG.warn("event has empty name: $event")
}
for (entry in event.additionalData.entries) {
for (entry in event.data.entries) {
if (entry.value.isBlank()) {
LOG.warn("event contains empty field ${entry.key}: $event")
}
}
if (!event.additionalData.containsKey(SemanticKeys.COLLECTORNAME)) {
if (!event.data.containsKey(SemanticKeys.COLLECTORNAME)) {
return Event(
event.name,
event.startDate,
event.additionalData.toMutableMap().apply { put(SemanticKeys.COLLECTORNAME, collectorName) }
event.data.toMutableMap().apply { put(SemanticKeys.COLLECTORNAME, collectorName) }
)
}
return event
Expand All @@ -157,56 +154,26 @@ fun createBoudiccaEventSink(eventDbUrl: String?): Consumer<Event> {
if (eventDbUrl.isNullOrBlank()) {
throw IllegalStateException("you need to specify the boudicca.eventdb.url property!")
}
val apiClient = ApiClient()
apiClient.updateBaseUri(eventDbUrl)
apiClient.setRequestInterceptor {
it.header(
"Authorization",
"Basic " + Base64.getEncoder()
.encodeToString(
Configuration.getProperty("boudicca.ingest.auth")?.encodeToByteArray()
?: throw IllegalStateException("you need to specify the boudicca.ingest.auth property!")
)
)
}
val ingestionApi = EventIngestionResourceApi(apiClient)
val userAndPassword = Configuration.getProperty("boudicca.ingest.auth")
?: throw IllegalStateException("you need to specify the boudicca.ingest.auth property!")
val user = userAndPassword.split(":")[0]
val password = userAndPassword.split(":")[1]
val eventDb = EventDB(eventDbUrl, user, password)
return Consumer {
ingestionApi.ingestAddPost(mapToApiEvent(it))
eventDb.ingestEvents(listOf(it))
}
}

fun createBoudiccaEnricherFunction(enricherUrl: String?): Function<List<Event>, List<Event>>? {
if (enricherUrl.isNullOrBlank()) {
return null
}
val apiClient = events.boudicca.enricher.openapi.ApiClient()
apiClient.updateBaseUri(enricherUrl)
val enricherApi = EnricherControllerApi(apiClient)
val enricher = Enricher(enricherUrl)
return Function<List<Event>, List<Event>> { events ->
enricherApi.enrich(
EnrichRequestDTO().events(events.map { mapToEnricherEvent(it) })
).map { mapToEventCollectorEvent(it) }
enricher.enrichEvents(events)
}
}

private fun mapToApiEvent(event: Event): events.boudicca.openapi.model.Event {
return events.boudicca.openapi.model.Event()
.name(event.name)
.startDate(event.startDate)
.data(event.additionalData)
}

private fun mapToEnricherEvent(event: Event): events.boudicca.enricher.openapi.model.Event {
return events.boudicca.enricher.openapi.model.Event()
.name(event.name)
.startDate(event.startDate)
.data(event.additionalData)
}

private fun mapToEventCollectorEvent(event: events.boudicca.enricher.openapi.model.Event): Event {
return Event(event.name, event.startDate, event.data ?: emptyMap())
}

fun <T> retry(log: Logger, function: () -> T): T {
var lastException: Throwable? = null
for (i in 1..5) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package base.boudicca.api.eventcollector

import base.boudicca.Event
import org.slf4j.LoggerFactory

abstract class TwoStepEventCollector<T>(private val name: String) : EventCollector {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package base.boudicca.eventdb

import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "boudicca")
data class BoudiccaEventDbProperties(
val store: Store,
val ingest: Ingest,
val entryKeyNames: List<String>?,
)

data class Store(
val path: String?
)

data class Ingest(
val password: String?
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package base.boudicca.eventdb

import io.swagger.v3.oas.annotations.OpenAPIDefinition
import io.swagger.v3.oas.annotations.servers.Server
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.scheduling.annotation.EnableScheduling
Expand All @@ -23,6 +23,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
)
@SpringBootApplication
@EnableScheduling
@EnableConfigurationProperties(BoudiccaEventDbProperties::class)
class EventDBApplication : WebMvcConfigurer {

@Bean
Expand All @@ -44,10 +45,10 @@ class EventDBApplication : WebMvcConfigurer {

//TODO this really should be done better....
@Bean
fun users(@Value("\${boudicca.ingest.password}") ingestPassword: String): UserDetailsService {
fun users(boudiccaEventDbProperties: BoudiccaEventDbProperties): UserDetailsService {
val ingestUser = User.builder()
.username("ingest")
.password("{noop}" + ingestPassword)
.password("{noop}" + boudiccaEventDbProperties.ingest.password)
.roles("INGEST")
.build()
return InMemoryUserDetailsManager(ingestUser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ data class Event(
val data: Map<String, String>? = mapOf()
)

data class EventKey(
val name: String,
val startDate: ZonedDateTime,
) {
constructor(event: Event) : this(event.name, event.startDate)
}
typealias EntryKey = Map<String, String>

data class InternalEventProperties(
val timeAdded: Long
Expand Down
Loading

0 comments on commit 0523e8b

Please sign in to comment.