From 9d67d24b0c693e4c22d44d8d1de73f00caae80d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 2 Feb 2024 11:38:55 +0100 Subject: [PATCH] Add micro changes to micro --- build.sbt | 26 ++ http4s/src/main/resources/collector.conf | 245 ++++++++++++++++++ .../main/resources/default-iglu-resolver.json | 28 ++ http4s/src/main/resources/enrich.conf | 83 ++++++ .../main/resources/simplelogger.properties | 2 + .../Cli.scala | 21 ++ .../Collector.scala | 47 ++++ .../Enrich.scala | 58 +++++ .../InMemoryStore.scala | 26 ++ .../Main.scala | 14 + .../Micro.scala | 35 +++ .../MicroStream.scala | 21 ++ .../Processing.scala | 49 ++++ .../ResourceUtils.scala | 26 ++ project/Dependencies.scala | 10 + 15 files changed, 691 insertions(+) create mode 100644 http4s/src/main/resources/collector.conf create mode 100644 http4s/src/main/resources/default-iglu-resolver.json create mode 100644 http4s/src/main/resources/enrich.conf create mode 100644 http4s/src/main/resources/simplelogger.properties create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Cli.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Collector.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Enrich.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/InMemoryStore.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Micro.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroStream.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Processing.scala create mode 100644 http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/ResourceUtils.scala diff --git a/build.sbt b/build.sbt index e2b80da..1532b92 100644 --- a/build.sbt +++ b/build.sbt @@ -34,6 +34,19 @@ lazy val dependencies = Seq( ) ) +lazy val dependenciesHttp4s = Seq( + libraryDependencies ++= Seq( + Dependencies.Http4s.snowplowStreamCollector, + Dependencies.Http4s.snowplowCommonEnrich, + Dependencies.Http4s.decline, + Dependencies.Http4s.analyticsSdk, + Dependencies.circeJawn, + Dependencies.circeGeneric, + Dependencies.specs2, + Dependencies.badRows + ) +) + lazy val exclusions = Seq( excludeDependencies ++= Dependencies.exclusions ) @@ -56,6 +69,14 @@ lazy val commonSettings = dynVerSettings ++ Settings.dynverOptions ++ Settings.assemblyOptions + +lazy val commonSettingsHttp4s = + dependenciesHttp4s ++ + buildSettings ++ + buildInfoSettings ++ + dynVerSettings ++ + Settings.dynverOptions ++ + Settings.assemblyOptions lazy val dockerCommon = Seq( Docker / maintainer := "Snowplow Analytics Ltd. ", @@ -99,6 +120,11 @@ lazy val micro = project .settings(commonSettings ++ microSettings) .enablePlugins(BuildInfoPlugin, DockerPlugin, JavaAppPackaging) +lazy val microHttp4s = project + .in(file("http4s")) + .settings(commonSettingsHttp4s ++ microSettings) + .enablePlugins(BuildInfoPlugin, DockerPlugin, JavaAppPackaging) + lazy val microDistroless = project .in(file("distroless/micro")) .settings(commonSettings ++ microSettingsDistroless) diff --git a/http4s/src/main/resources/collector.conf b/http4s/src/main/resources/collector.conf new file mode 100644 index 0000000..6d9df63 --- /dev/null +++ b/http4s/src/main/resources/collector.conf @@ -0,0 +1,245 @@ +# 'collector' contains configuration options for the main Scala collector. +collector { + + license { + accept = true + } + + # The collector runs as a web service specified on the following interface and port. + interface = "0.0.0.0" + port = "9090" + + # optional SSL/TLS configuration + ssl { + enable = false + # whether to redirect HTTP to HTTPS + redirect = false + port = 9543 + } + + # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol. + # The expected values are: + # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2 + # - r/tp2 for redirects + # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook + # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks + # downstream of the collector. + # But you can also map any valid (i.e. two-segment) path to one of the three defaults. + # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full + # valid paths starting with a leading slash. + # Pass in an empty map to avoid mapping. + paths { + # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" + # "/com.acme/redirect" = "/r/tp2" + # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" + } + + # Configure the P3P policy header. + p3p { + policyRef = "/w3c/p3p.xml" + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + } + + # Cross domain policy configuration. + # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml + # route. + crossDomain { + enabled = false + # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com + domains = [ "*" ] + # Whether to only grant access to HTTPS or both HTTPS and HTTP sources + secure = true + } + + # The collector returns a cookie to clients for user identification + # with the following domain and expiration. + cookie { + enabled = true + expiration = "365 days" + # Network cookie name + name = "micro" + # The domain is optional and will make the cookie accessible to other + # applications on the domain. Comment out these lines to tie cookies to + # the collector's full domain. + # The domain is determined by matching the domains from the Origin header of the request + # to the list below. The first match is used. If no matches are found, the fallback domain will be used, + # if configured. + # If you specify a main domain, all subdomains on it will be matched. + # If you specify a subdomain, only that subdomain will be matched. + # Examples: + # domain.com will match domain.com, www.domain.com and secure.client.domain.com + # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com + domains = [ + # "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned + # "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned + # ... more domains + ] + # ... more domains + # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of + # cookie domains configured above. (For example, if there is no Origin header.) + # fallback-domain = "{{fallbackDomain}}" + secure = false + httpOnly = false + # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`, + # `Lax` or `None` to limit the cookie sent context. + # Strict: the cookie will only be sent along with "same-site" requests. + # Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation. + # None: the cookie will be sent with same-site and cross-site requests. + # sameSite = "{{cookieSameSite}}" + } + + # If you have a do not track cookie in place, the Scala Stream Collector can respect it by + # completely bypassing the processing of an incoming request carrying this cookie, the collector + # will simply reply by a 200 saying "do not track". + # The cookie name and value must match the configuration below, where the names of the cookies must + # match entirely and the value could be a regular expression. + doNotTrackCookie { + enabled = false + name = "foo" + value = "bar" + } + + # When enabled and the cookie specified above is missing, performs a redirect to itself to check + # if third-party cookies are blocked using the specified name. If they are indeed blocked, + # fallbackNetworkId is used instead of generating a new random one. + cookieBounce { + enabled = false + # The name of the request parameter which will be used on redirects checking that third-party + # cookies work. + name = "n3pc" + # Network user id to fallback to when third-party cookies are blocked. + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" + # Optionally, specify the name of the header containing the originating protocol for use in the + # bounce redirect location. Use this if behind a load balancer that performs SSL termination. + # The value of this header must be http or https. Example, if behind an AWS Classic ELB. + # forwardedProtocolHeader = "X-Forwarded-Proto" + } + + # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved. + # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found` + # Custom redirects configured in `paths` can still be used. + # enableDefaultRedirect = true + + # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder + # token. All instances of that token are replaced withe the network ID. If the placeholder isn't + # specified, the default value is `${SP_NUID}`. + redirectMacro { + enabled = false + # Optional custom placeholder token (defaults to the literal `${SP_NUID}`) + placeholder = "[TOKEN]" + } + + # Customize response handling for requests for the root path ("/"). + # Useful if you need to redirect to web content or privacy policies regarding the use of this collector. + rootResponse { + enabled = false + statusCode = 302 + # Optional, defaults to empty map + headers = { + Location = "https://127.0.0.1/", + X-Custom = "something" + } + # Optional, defaults to empty string + body = "302, redirecting" + } + + # Configuration related to CORS preflight requests + cors { + # The Access-Control-Max-Age response header indicates how long the results of a preflight + # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h. + accessControlMaxAge = 5 seconds + } + + + monitoring.metrics.statsd { + enabled = false + # StatsD metric reporting protocol configuration + hostname = localhost + port = 8125 + # Required, how frequently to report metrics + period = "10 seconds" + # Optional, override the default metric prefix + # "prefix": "snowplow.collector" + } + + streams { + good { + name = "good-sink" + buffer { + byteLimit = 100000 + recordLimit = 40 + timeLimit = 1000 + } + } + + bad { + name = "bad-sink" + buffer { + byteLimit = 100000 + recordLimit = 40 + timeLimit = 1000 + } + } + + # Whether to use the incoming event's ip as the partition key for the good stream/topic + # Note: Nsq does not make use of partition key. + useIpAddressAsPartitionKey = false + } + + enableDefaultRedirect = false + redirectDomains = [] + enableStartupChecks = true + terminationDeadline = 10.seconds + preTerminationPeriod = 0.seconds + preTerminationUnhealthy = false + experimental { + warmup { + enable = false + numRequests = 2000 + maxConnections = 2000 + maxCycles = 1 + } + } +} + +# Akka has a variety of possible configuration options defined at +# http://doc.akka.io/docs/akka/current/scala/general/configuration.html +akka { + loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging. + loggers = ["akka.event.slf4j.Slf4jLogger"] + + # akka-http is the server the Stream collector uses and has configurable options defined at + # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html + http.server { + # To obtain the hostname in the collector, the 'remote-address' header + # should be set. By default, this is disabled, and enabling it + # adds the 'Remote-Address' header to every request automatically. + remote-address-header = on + + raw-request-uri-header = on + + # Define the maximum request length (the default is 2048) + parsing { + max-uri-length = 32768 + uri-parsing-mode = relaxed + } + } + + # By default setting `collector.ssl` relies on JSSE (Java Secure Socket + # Extension) to enable secure communication. + # To override the default settings set the following section as per + # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html + # ssl-config { + # debug = { + # ssl = true + # } + # keyManager = { + # stores = [ + # {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" } + # ] + # } + # loose { + # disableHostnameVerification = false + # } + # } +} diff --git a/http4s/src/main/resources/default-iglu-resolver.json b/http4s/src/main/resources/default-iglu-resolver.json new file mode 100644 index 0000000..784261b --- /dev/null +++ b/http4s/src/main/resources/default-iglu-resolver.json @@ -0,0 +1,28 @@ +{ + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", + "data": { + "cacheSize": 500, + "repositories": [ + { + "name": "Iglu Central", + "priority": 1, + "vendorPrefixes": [], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + { + "name": "Iglu Central - Mirror 01", + "priority": 1, + "vendorPrefixes": [], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } +} diff --git a/http4s/src/main/resources/enrich.conf b/http4s/src/main/resources/enrich.conf new file mode 100644 index 0000000..69142e5 --- /dev/null +++ b/http4s/src/main/resources/enrich.conf @@ -0,0 +1,83 @@ +{ + "license": { + "accept": true + } + + # NSQ as input and outputs just to trick config parsing in Enrich, NSQ is not really used in micro. + "input": { + "type": "Nsq" + "topic": "collector-payloads" + "lookupHost": "127.0.0.1" + "lookupPort": 4161 + "channel": "collector-payloads-channel" + "maxBufferQueueSize": 3000 + "checkpointBackoff": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + + "output": { + "good": { + "type": "Nsq" + "topic": "enriched" + "nsqdHost": "127.0.0.1" + "nsqdPort": 4150 + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + "bad": { + "type": "Nsq" + "topic": "enriched" + "nsqdHost": "127.0.0.1" + "nsqdPort": 4150 + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + } + + "concurrency" : { + "enrich": 256 + "sink": 1 + } + + "remoteAdapters" : { + "connectionTimeout": 10 seconds, + "readTimeout": 45 seconds, + "maxConnections": 10, + "configs" : [] + } + + "monitoring": { + "metrics": { + "cloudwatch": false + } + } + + "telemetry": { + "disable": true + "interval": 15 minutes + "method": POST + "collectorUri": collector-g.snowplowanalytics.com + "collectorPort": 443 + "secure": true + } + + "featureFlags" : { + "acceptInvalid": false + "legacyEnrichmentOrder": false + "tryBase64Decoding": false + } + + "blobStorage": { + "gcs": false + "s3": false + } +} \ No newline at end of file diff --git a/http4s/src/main/resources/simplelogger.properties b/http4s/src/main/resources/simplelogger.properties new file mode 100644 index 0000000..6e50731 --- /dev/null +++ b/http4s/src/main/resources/simplelogger.properties @@ -0,0 +1,2 @@ +org.slf4j.simpleLogger.showThreadName=false +org.slf4j.simpleLogger.levelInBrackets=true diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Cli.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Cli.scala new file mode 100644 index 0000000..903a635 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Cli.scala @@ -0,0 +1,21 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.implicits._ +import com.monovore.decline.Opts + +import java.nio.file.Path + +object Cli { + + final case class Config(collector: Option[Path], + enrich: Option[Path], + iglu: Option[Path], + tsv: Boolean) + + private val collector = Opts.option[Path]("collector-config", "Path to HOCON collector configuration (optional)", "c", "collector-config.hocon").orNone + private val enrich = Opts.option[Path]("enrich-config", "Path to HOCON enrich configuration (optional)", "e", "enrich-config.hocon").orNone + private val iglu = Opts.option[Path]("iglu", "Path to HOCON Iglu configuration (optional)", "i", "iglu.hocon").orNone + private val outputEnrichedTsv = Opts.flag("output-tsv", "Print events in TSV format to standard output", "t").orFalse + + val config: Opts[Config] = (collector, enrich, iglu, outputEnrichedTsv).mapN(Config.apply) +} \ No newline at end of file diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Collector.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Collector.scala new file mode 100644 index 0000000..f73cb9d --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Collector.scala @@ -0,0 +1,47 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect._ +import cats.implicits.toTraverseOps +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.collector.core.{AppInfo, Sink, Telemetry, Run => RunCollector} +import com.snowplowanalytics.snowplow.micro.ResourceUtils.resolveConfig +import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking +import io.circe.Decoder + +object Collector { + + private val info = new AppInfo { + override val name: String = buildinfo.BuildInfo.name + override val moduleName: String = buildinfo.BuildInfo.name + override val version: String = buildinfo.BuildInfo.version + override val dockerAlias: String = "something" + override val shortName: String = "something" + } + + def run(cliConfig: Cli.Config, + streams: Micro.Streams): ResourceIO[Unit] = { + for { + config <- resolveConfig(cliConfig.collector, "collector.conf") + _ <- RunCollector.fromPath[IO, DummySinkConfig]( + info, + _ => Resource.pure(Sinks(mkSink(streams.raw), mkSink(streams.bad))), + _ => IO.pure(Telemetry.TelemetryInfo(None, None, None)), + Some(config) + ).background + } yield () + } + + final case class DummySinkConfig() + + implicit val decoder: Decoder[DummySinkConfig] = Decoder.instance(_ => Right(DummySinkConfig())) + + private def mkSink(stream: MicroStream): Sink[IO] = new Sink[IO] { + override val maxBytes: Int = Int.MaxValue + + override def isHealthy: IO[Boolean] = IO.pure(true) + + override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = { + events.traverse(payload => stream.sink(payload)).void + } + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Enrich.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Enrich.scala new file mode 100644 index 0000000..b4860ed --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Enrich.scala @@ -0,0 +1,58 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{IO, Resource, ResourceIO} +import cats.implicits.toTraverseOps +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig => EnrichConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, ByteSink, RunEnrich} +import com.snowplowanalytics.snowplow.micro.ResourceUtils.resolveConfig + +import java.nio.file.{Path, Paths} + +object Enrich { + + def run(cliConfig: Cli.Config, + streams: Micro.Streams): ResourceIO[Unit] = { + for { + config <- resolveEnrichConfig(cliConfig) + _ <- RunEnrich.run[IO, Array[Byte]]( + name = buildinfo.BuildInfo.name, + version = buildinfo.BuildInfo.version, + cfg = config, + mkSource = (_, _) => streams.raw.read, + mkSinkGood = _ => Resource.pure(mkAttributedSink(streams.enriched)), + mkSinkPii = _ => Resource.pure(mkAttributedSink(streams.pii)), + mkSinkBad = _ => Resource.pure(mkByteSink(streams.bad)), + checkpoint = _ => IO.unit, + mkClients = _ => List.empty, + getPayload = identity, + maxRecordSize = Int.MaxValue, + cloud = None, + getRegion = None + ).background + } yield () + } + + private def resolveEnrichConfig(cliConfig: Cli.Config): Resource[IO, EnrichConfig] = { + for { + appConfig <- resolveConfig(cliConfig.enrich, "enrich.conf") + igluConfig <- resolveConfig(cliConfig.iglu, "default-iglu-resolver.json") + enrichmentsConfig <- readEnrichments() + } yield { + EnrichConfig(Right(appConfig), Right(igluConfig), Right(enrichmentsConfig)) + } + } + + private def readEnrichments(): Resource[IO, Path] = { + Option(getClass.getResource("/enrichments")) match { + case Some(definedEnrichments) => Resource.pure(Paths.get(definedEnrichments.toURI)) + case None => Resource.pure(Paths.get(".")) + } + } + + private def mkAttributedSink(microStream: MicroStream): AttributedByteSink[IO] = records => + records.traverse(record => microStream.sink(record.data)).void + + private def mkByteSink(microStream: MicroStream): ByteSink[IO] = records => + records.traverse(record => microStream.sink(record)).void + +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/InMemoryStore.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/InMemoryStore.scala new file mode 100644 index 0000000..d667502 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/InMemoryStore.scala @@ -0,0 +1,26 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{IO, Ref} +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.BadRow + +class InMemoryStore(good: Ref[IO, List[Event]], + bad: Ref[IO, List[BadRow]]) { + + def addGood(event: Event): IO[Unit] = { + good.update(_ :+ event) + } + + def addBad(badrow: BadRow): IO[Unit] = { + bad.update(_ :+ badrow) + } + +} + +object InMemoryStore { + def mk(): IO[InMemoryStore] = + for { + good <- Ref.of[IO, List[Event]](List.empty) + bad <- Ref.of[IO, List[BadRow]](List.empty) + } yield new InMemoryStore(good, bad) +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala new file mode 100644 index 0000000..5c6392a --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala @@ -0,0 +1,14 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{ExitCode, IO} +import com.monovore.decline.Opts +import com.monovore.decline.effect.CommandIOApp + +object Main + extends CommandIOApp( + name = buildinfo.BuildInfo.version, + header = "Something", + version = buildinfo.BuildInfo.version) { + + override def main: Opts[IO[ExitCode]] = Micro.run() +} \ No newline at end of file diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Micro.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Micro.scala new file mode 100644 index 0000000..a5ebccd --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Micro.scala @@ -0,0 +1,35 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{ExitCode, IO, Resource} +import com.monovore.decline.Opts + +object Micro { + + final case class Streams(raw: MicroStream, + enriched: MicroStream, + bad: MicroStream, + pii: MicroStream) + + def run(): Opts[IO[ExitCode]] = { + Cli.config.map { cliConfig => + val resource = for { + streams <- mkStreams() + inMemoryStore <- Resource.eval(InMemoryStore.mk()) + _ <- Collector.run(cliConfig, streams) + _ <- Enrich.run(cliConfig, streams) + _ <- Processing.run(streams, inMemoryStore).background + } yield ExitCode.Success + + resource.use { exitCode => IO.never.as(exitCode) } + } + } + + private def mkStreams(): Resource[IO, Streams] = { + for { + raw <- MicroStream.inMemory + enriched <- MicroStream.inMemory + bad <- MicroStream.inMemory + pii <- MicroStream.inMemory + } yield Streams(raw, bad, enriched, pii) + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroStream.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroStream.scala new file mode 100644 index 0000000..546ca31 --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroStream.scala @@ -0,0 +1,21 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.std.Queue +import cats.effect.{IO, Resource} + +trait MicroStream { + def sink(data: Array[Byte]): IO[Unit] + def read: fs2.Stream[IO, Array[Byte]] +} + +object MicroStream { + def inMemory: Resource[IO, MicroStream] = { + Resource.eval(Queue.unbounded[IO, Array[Byte]]).map { queue => + new MicroStream { + override def sink(data: Array[Byte]): IO[Unit] = queue.offer(data) + override def read: fs2.Stream[IO, Array[Byte]] = + fs2.Stream.fromQueueUnterminated(queue) + } + } + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Processing.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Processing.scala new file mode 100644 index 0000000..2d4f8bd --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/Processing.scala @@ -0,0 +1,49 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.IO +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.BadRow +import com.snowplowanalytics.snowplow.micro.Micro.Streams +import io.circe.parser.decode + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + +object Processing { + + def run(streams: Streams, + store: InMemoryStore): IO[Unit] = { + processingEnrichedOutput(streams, store) + .merge(processingBadOutput(streams, store)) + .compile + .drain + } + + private def processingEnrichedOutput(streams: Streams, + store: InMemoryStore): fs2.Stream[IO, Unit] = { + streams.enriched.read + .map(parseGood) + .evalTap(store.addGood) + .evalMap(event => IO(println(s"ENRICHED - [${event.toTsv}]"))) + } + + private def processingBadOutput(streams: Streams, + store: InMemoryStore): fs2.Stream[IO, Unit] = { + streams.bad.read + .map(parseBadRow) + .evalTap(store.addBad) + .evalMap(badRow => IO(println(s"BAD - [${badRow.compact}]"))) + } + + private def parseGood(bytes: Array[Byte]): Event = { + Event.parseBytes(ByteBuffer.wrap(bytes)) + .getOrElse(throw new RuntimeException("Imposssibru")) + } + + private def parseBadRow(bytes: Array[Byte]): BadRow = { + decode[SelfDescribingData[BadRow]](new String(bytes, StandardCharsets.UTF_8)) + .getOrElse(throw new RuntimeException("Imposssibru")) + .data + } +} diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/ResourceUtils.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/ResourceUtils.scala new file mode 100644 index 0000000..006d73c --- /dev/null +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.micro/ResourceUtils.scala @@ -0,0 +1,26 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{IO, Resource} +import fs2.io.file.Files + +import java.nio.charset.StandardCharsets +import java.nio.file.Path +import scala.io.Source + +object ResourceUtils { + + def resolveConfig(custom: Option[Path], fallbackResource: String): Resource[IO, Path] = + custom match { + case Some(definedCustomPath) => Resource.pure(definedCustomPath) + case None => loadResourceToTemporaryPath(fallbackResource) + } + + private def loadResourceToTemporaryPath(resource: String): Resource[IO, Path] = { + for { + source <- Resource.make(IO(Source.fromResource(resource)))(source => IO(source.close())) + tempConfigFile <- Files[IO].tempFile + _ <- Resource.eval(fs2.Stream.emits(source.mkString.getBytes(StandardCharsets.UTF_8)) + .through(Files[IO].writeAll(tempConfigFile)).compile.drain) + } yield tempConfigFile.toNioPath + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 84433ab..0162e4e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,6 +19,8 @@ object Dependencies { val snowplowStreamCollector = "2.8.1" val snowplowCommonEnrich = "3.8.0" + val decline = "2.4.1" + // circe val circe = "0.14.2" @@ -37,6 +39,14 @@ object Dependencies { val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-core" % V.snowplowStreamCollector val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.snowplowCommonEnrich + object Http4s{ + + val decline = "com.monovore" %% "decline-effect" % V.decline + val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % "3.2.0" + val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-http4s-core" % "3.1.0-2-4e8aecfc-20240202-1202" + val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-enrich-common-fs2" % "4.0.0-2-9664e317-SNAPSHOT" + } + // circe val circeJawn = "io.circe" %% "circe-jawn" % V.circe val circeGeneric = "io.circe" %% "circe-generic" % V.circe