diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 3ee8f12..2f3d186 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -28,9 +28,9 @@ jobs: - name: Publish collector locally run: | - git clone --branch 2.8.1 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.1.0_cross_scala_2.12 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector - sbt publishLocal + sbt +publishLocal - name: Run sbt run: sbt test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 93fc68b..faa6a6d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,9 +18,9 @@ jobs: - name: Publish collector locally run: | - git clone --branch 2.8.1 --depth 1 https://github.com/snowplow/stream-collector.git + git clone --branch 3.1.0_cross_scala_2.12 --depth 1 https://github.com/snowplow/stream-collector.git cd stream-collector - sbt publishLocal + sbt +publishLocal - name: Run sbt run: sbt test diff --git a/build.sbt b/build.sbt index e2b80da..2dc7cc9 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,9 @@ * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. */ import com.typesafe.sbt.packager.MappingsHelper.directory -import com.typesafe.sbt.packager.docker._ +import com.typesafe.sbt.packager.docker.* + +import scala.collection.Seq lazy val buildSettings = Seq( name := "snowplow-micro", @@ -27,22 +29,23 @@ lazy val dependencies = Seq( libraryDependencies ++= Seq( Dependencies.snowplowStreamCollector, Dependencies.snowplowCommonEnrich, + Dependencies.decline, + Dependencies.http4sCirce, Dependencies.circeJawn, Dependencies.circeGeneric, Dependencies.specs2, + Dependencies.specs2CE, Dependencies.badRows ) ) -lazy val exclusions = Seq( - excludeDependencies ++= Dependencies.exclusions -) - lazy val buildInfoSettings = Seq( - buildInfoKeys := Seq[BuildInfoKey](organization, name, version, scalaVersion), - buildInfoPackage := "buildinfo" + buildInfoKeys := Seq[BuildInfoKey](name, moduleName, dockerAlias, version, "shortName" -> "micro-ssc"), + buildInfoPackage := "com.snowplowanalytics.snowplow.micro", + buildInfoOptions += BuildInfoOption.Traits("com.snowplowanalytics.snowplow.collector.core.AppInfo") ) + lazy val dynVerSettings = Seq( ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix ThisBuild / dynverSeparator := "-" // to be compatible with docker @@ -50,7 +53,6 @@ lazy val dynVerSettings = Seq( lazy val commonSettings = dependencies ++ - exclusions ++ buildSettings ++ buildInfoSettings ++ dynVerSettings ++ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 84433ab..657f649 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,33 +16,36 @@ object Dependencies { object V { // Snowplow - val snowplowStreamCollector = "2.8.1" - val snowplowCommonEnrich = "3.8.0" + val snowplowStreamCollector = "3.1.1-rc1" + val snowplowCommonEnrich = "4.0.1" + val http4sCirce = "0.23.23" + val decline = "2.4.1" + // circe val circe = "0.14.2" // specs2 val specs2 = "4.12.2" + val specs2CE = "1.5.0" // force versions of transitive dependencies val badRows = "2.2.0" } - val exclusions = Seq( - "org.apache.tomcat.embed" % "tomcat-embed-core" - ) - - // Snowplow stream collector - val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-core" % V.snowplowStreamCollector - val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.snowplowCommonEnrich + val snowplowStreamCollector = "com.snowplowanalytics" %% "snowplow-stream-collector-http4s-core" % V.snowplowStreamCollector + val snowplowCommonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.snowplowCommonEnrich + + val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4sCirce + val decline = "com.monovore" %% "decline-effect" % V.decline // circe val circeJawn = "io.circe" %% "circe-jawn" % V.circe val circeGeneric = "io.circe" %% "circe-generic" % V.circe // specs2 - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2CE = "org.typelevel" %% "cats-effect-testing-specs2" % V.specs2CE % Test // transitive val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf deleted file mode 100644 index 3d062bd..0000000 --- a/src/main/resources/application.conf +++ /dev/null @@ -1,249 +0,0 @@ -# 'collector' contains configuration options for the main Scala collector. -collector { - # The collector runs as a web service specified on the following interface and port. - interface = "0.0.0.0" - port = "9090" - - # optional SSL/TLS configuration - ssl { - enable = false - # whether to redirect HTTP to HTTPS - redirect = false - port = 9543 - } - - # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol. - # The expected values are: - # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2 - # - r/tp2 for redirects - # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook - # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks - # downstream of the collector. - # But you can also map any valid (i.e. two-segment) path to one of the three defaults. - # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full - # valid paths starting with a leading slash. - # Pass in an empty map to avoid mapping. - paths { - # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" - # "/com.acme/redirect" = "/r/tp2" - # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" - } - - # Configure the P3P policy header. - p3p { - policyRef = "/w3c/p3p.xml" - CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" - } - - # Cross domain policy configuration. - # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml - # route. - crossDomain { - enabled = false - # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com - domains = [ "*" ] - # Whether to only grant access to HTTPS or both HTTPS and HTTP sources - secure = true - } - - # The collector returns a cookie to clients for user identification - # with the following domain and expiration. - cookie { - enabled = true - expiration = "365 days" - # Network cookie name - name = "micro" - # The domain is optional and will make the cookie accessible to other - # applications on the domain. Comment out these lines to tie cookies to - # the collector's full domain. - # The domain is determined by matching the domains from the Origin header of the request - # to the list below. The first match is used. If no matches are found, the fallback domain will be used, - # if configured. - # If you specify a main domain, all subdomains on it will be matched. - # If you specify a subdomain, only that subdomain will be matched. - # Examples: - # domain.com will match domain.com, www.domain.com and secure.client.domain.com - # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com - domains = [ - # "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned - # "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned - # ... more domains - ] - # ... more domains - # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of - # cookie domains configured above. (For example, if there is no Origin header.) - # fallback-domain = "{{fallbackDomain}}" - secure = false - httpOnly = false - # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`, - # `Lax` or `None` to limit the cookie sent context. - # Strict: the cookie will only be sent along with "same-site" requests. - # Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation. - # None: the cookie will be sent with same-site and cross-site requests. - # sameSite = "{{cookieSameSite}}" - } - - # If you have a do not track cookie in place, the Scala Stream Collector can respect it by - # completely bypassing the processing of an incoming request carrying this cookie, the collector - # will simply reply by a 200 saying "do not track". - # The cookie name and value must match the configuration below, where the names of the cookies must - # match entirely and the value could be a regular expression. - doNotTrackCookie { - enabled = false - name = "foo" - value = "bar" - } - - # When enabled and the cookie specified above is missing, performs a redirect to itself to check - # if third-party cookies are blocked using the specified name. If they are indeed blocked, - # fallbackNetworkId is used instead of generating a new random one. - cookieBounce { - enabled = false - # The name of the request parameter which will be used on redirects checking that third-party - # cookies work. - name = "n3pc" - # Network user id to fallback to when third-party cookies are blocked. - fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" - # Optionally, specify the name of the header containing the originating protocol for use in the - # bounce redirect location. Use this if behind a load balancer that performs SSL termination. - # The value of this header must be http or https. Example, if behind an AWS Classic ELB. - # forwardedProtocolHeader = "X-Forwarded-Proto" - } - - # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved. - # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found` - # Custom redirects configured in `paths` can still be used. - # enableDefaultRedirect = true - - # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder - # token. All instances of that token are replaced withe the network ID. If the placeholder isn't - # specified, the default value is `${SP_NUID}`. - redirectMacro { - enabled = false - # Optional custom placeholder token (defaults to the literal `${SP_NUID}`) - placeholder = "[TOKEN]" - } - - # Customize response handling for requests for the root path ("/"). - # Useful if you need to redirect to web content or privacy policies regarding the use of this collector. - rootResponse { - enabled = false - statusCode = 302 - # Optional, defaults to empty map - headers = { - Location = "https://127.0.0.1/", - X-Custom = "something" - } - # Optional, defaults to empty string - body = "302, redirecting" - } - - # Configuration related to CORS preflight requests - cors { - # The Access-Control-Max-Age response header indicates how long the results of a preflight - # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h. - accessControlMaxAge = 5 seconds - } - - - monitoring.metrics.statsd { - enabled = false - # StatsD metric reporting protocol configuration - hostname = localhost - port = 8125 - # Required, how frequently to report metrics - period = "10 seconds" - # Optional, override the default metric prefix - # "prefix": "snowplow.collector" - } - - streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "" - - # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic - bad = "" - - # Whether to use the incoming event's ip as the partition key for the good stream/topic - # Note: Nsq does not make use of partition key. - useIpAddressAsPartitionKey = false - - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, googlepubsub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = stdout - - } - - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 100000 - recordLimit = 40 - timeLimit = 1000 - } - } - - enableDefaultRedirect = false - redirectDomains = [] - enableStartupChecks = true - terminationDeadline = 10.seconds - preTerminationPeriod = 0.seconds - preTerminationUnhealthy = false - experimental { - warmup { - enable = false - numRequests = 2000 - maxConnections = 2000 - maxCycles = 1 - } - } -} - -# Akka has a variety of possible configuration options defined at -# http://doc.akka.io/docs/akka/current/scala/general/configuration.html -akka { - loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging. - loggers = ["akka.event.slf4j.Slf4jLogger"] - - # akka-http is the server the Stream collector uses and has configurable options defined at - # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html - http.server { - # To obtain the hostname in the collector, the 'remote-address' header - # should be set. By default, this is disabled, and enabling it - # adds the 'Remote-Address' header to every request automatically. - remote-address-header = on - - raw-request-uri-header = on - - # Define the maximum request length (the default is 2048) - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - } - } - - # By default setting `collector.ssl` relies on JSSE (Java Secure Socket - # Extension) to enable secure communication. - # To override the default settings set the following section as per - # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html - # ssl-config { - # debug = { - # ssl = true - # } - # keyManager = { - # stores = [ - # {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" } - # ] - # } - # loose { - # disableHostnameVerification = false - # } - # } -} diff --git a/src/main/resources/collector-micro.conf b/src/main/resources/collector-micro.conf new file mode 100644 index 0000000..3a76fea --- /dev/null +++ b/src/main/resources/collector-micro.conf @@ -0,0 +1,18 @@ +collector { + license { + accept = true + } + interface = "0.0.0.0" + port = 8080 + + streams { + good = "good" + bad = "bad" + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + sink {} + } +} \ No newline at end of file diff --git a/src/main/resources/default-iglu-resolver.json b/src/main/resources/default-iglu-resolver.conf similarity index 100% rename from src/main/resources/default-iglu-resolver.json rename to src/main/resources/default-iglu-resolver.conf diff --git a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala index d62bd6b..325aaa6 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/decode/ValueConverter.scala @@ -39,7 +39,7 @@ object ValueConverter { ofFunc(_ => f(_).asRight) implicit def valueDecoderCase[A](implicit decoder: ValueDecoder[A]): Aux[Option[String], A] = - ofFunc(key => x => decoder.parse(key, x.getOrElse(""))) + ofFunc(key => x => decoder.parse(key, x.getOrElse(""), None)) implicit def floatDoubleCase: Aux[Option[Float], Option[Double]] = simple(_.map(_.toDouble)) diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala deleted file mode 100644 index a0e313a..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/CirceSupport.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.micro - -import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} -import akka.http.scaladsl.model.{ContentType, ContentTypeRange, HttpEntity} -import akka.http.scaladsl.model.MediaType -import akka.http.scaladsl.model.MediaTypes.`application/json` -import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} -import akka.util.ByteString - -import io.circe.{Decoder, Encoder, Json, Printer, jawn} - -import scala.collection.immutable.Seq - -import org.joda.time.DateTime - -import org.apache.http.NameValuePair - -/** Add support for unmarshalling HTTP JSON requests - * and marshalling HTTP JSON responses, using circe library. - * More information about marshalling can be found here - * https://doc.akka.io/docs/akka-http/current/common/marshalling.html. - * - * This code mostly comes from https://github.com/hseeberger/akka-http-json. - */ -private[micro] object CirceSupport { - - // To encode the datetime in a CollectorPayload - implicit val dateTimeEncoder: Encoder[DateTime] = - Encoder[String].contramap(_.toString) - - // To encode the querystring in a CollectorPayload - implicit val nameValuePairEncoder: Encoder[NameValuePair] = - Encoder[String].contramap(kv => s"${kv.getName()}=${kv.getValue()}") - - def unmarshallerContentTypes: Seq[ContentTypeRange] = - mediaTypes.map(ContentTypeRange.apply) - - def mediaTypes: Seq[MediaType.WithFixedCharset] = - List(`application/json`) - - /** `Json` => HTTP entity - * @return marshaller for JSON value - */ - implicit final def jsonMarshaller( - implicit printer: Printer = Printer.noSpaces - ): ToEntityMarshaller[Json] = - Marshaller.oneOf(mediaTypes: _*) { mediaType => - Marshaller.withFixedContentType(ContentType(mediaType)) { json => - HttpEntity( - mediaType, - ByteString( - printer.printToByteBuffer(json, mediaType.charset.nioCharset()))) - } - } - - /** `A` => HTTP entity - * @tparam A type to encode - * @return marshaller for any `A` value - */ - implicit final def marshaller[A: Encoder]( - implicit printer: Printer = Printer.noSpaces - ): ToEntityMarshaller[A] = - jsonMarshaller(printer).compose(Encoder[A].apply) - - /** HTTP entity => `Json` - * @return unmarshaller for `Json` - */ - implicit final val jsonUnmarshaller: FromEntityUnmarshaller[Json] = - Unmarshaller.byteStringUnmarshaller - .forContentTypes(unmarshallerContentTypes: _*) - .map { - case ByteString.empty => throw Unmarshaller.NoContentException - case data => - jawn.parseByteBuffer(data.asByteBuffer).fold(throw _, identity) - } - - /** HTTP entity => `A` - * @tparam A type to decode - * @return unmarshaller for `A` - */ - implicit def unmarshaller[A: Decoder]: FromEntityUnmarshaller[A] = { - def decode(json: Json) = Decoder[A].decodeJson(json).fold(throw _, identity) - jsonUnmarshaller.map(decode) - } -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala deleted file mode 100644 index 2ef2ba4..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/ConfigHelper.scala +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.micro - -import cats.Id -import cats.implicits._ -import com.snowplowanalytics.iglu.client.IgluCirceClient -import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.client.resolver.registries.Registry -import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, SinkConfig} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf -import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils -import com.snowplowanalytics.snowplow.micro.IdImplicits._ -import com.typesafe.config.{Config, ConfigFactory} -import io.circe.Json -import io.circe.parser.parse -import io.circe.syntax._ -import pureconfig.generic.auto._ -import pureconfig.generic.{FieldCoproductHint, ProductHint} -import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource} - -import java.io.File -import java.net.URI -import java.nio.file.{Path, Paths} -import java.security.{KeyStore, SecureRandom} -import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} -import scala.io.Source - -/** Contain functions to parse the command line arguments, - * to parse the configuration for the collector, Akka HTTP and Iglu - * and to instantiate Iglu client. - */ -private[micro] object ConfigHelper { - object EnvironmentVariables { - val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL" - val igluApiKey = "MICRO_IGLU_API_KEY" - val sslCertificatePassword = "MICRO_SSL_CERT_PASSWORD" - } - - implicit def hint[T] = - ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - - // Copied from Enrich - necessary for parsing enrichment configs - implicit val sinkConfigHint = new FieldCoproductHint[SinkConfig]("enabled") - type EitherS[A] = Either[String, A] - - case class MicroConfig( - collectorConfig: CollectorConfig, - igluResolver: Resolver[Id], - igluClient: IgluCirceClient[Id], - enrichmentConfigs: List[EnrichmentConf], - akkaConfig: Config, - sslContext: Option[SSLContext], - outputEnrichedTsv: Boolean - ) - - /** Parse the command line arguments and the configuration files. */ - def parseConfig(args: Array[String]): MicroConfig = { - case class CommandLineOptions( - collectorConfigFile: Option[File] = None, - igluConfigFile: Option[File] = None, - outputEnrichedTsv: Boolean = false - ) - - def formatEnvironmentVariables(descriptions: (String, String)*): String = { - val longest = descriptions.map(_._1.length).max - descriptions.map { - case (envVar, desc) => s" $envVar${" " * (longest - envVar.length)} $desc" - }.mkString("\n") - } - - val parser = new scopt.OptionParser[CommandLineOptions](buildinfo.BuildInfo.name) { - head(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - help("help") - version("version") - opt[Option[File]]("collector-config") - .optional() - .valueName("") - .text("Configuration file for collector") - .action((f: Option[File], c: CommandLineOptions) => c.copy(collectorConfigFile = f)) - .validate(f => - f match { - case Some(file) => - if (file.exists) success - else failure(s"Configuration file $f does not exist") - case None => success - } - ) - opt[Option[File]]("iglu") - .optional() - .valueName("") - .text("Configuration file for Iglu Client") - .action((f: Option[File], c: CommandLineOptions) => c.copy(igluConfigFile = f)) - .validate(f => - f match { - case Some(file) => - if (file.exists) success - else failure(s"Configuration file $f does not exist") - case None => success - } - ) - opt[Unit]('t', "output-tsv") - .optional() - .text("Print events in TSV format to standard output") - .action((_, c: CommandLineOptions) => c.copy(outputEnrichedTsv = true)) - note( - "\nSupported environment variables:\n\n" + formatEnvironmentVariables( - EnvironmentVariables.igluRegistryUrl -> - "The URL for an additional custom Iglu registry", - EnvironmentVariables.igluApiKey -> - s"An optional API key for an Iglu registry defined with ${EnvironmentVariables.igluRegistryUrl}", - EnvironmentVariables.sslCertificatePassword -> - "The password for the optional SSL/TLS certificate in /config/ssl-certificate.p12. Enables HTTPS" - ) - ) - } - - val config = parser.parse(args, CommandLineOptions()) getOrElse { - throw new RuntimeException("Problem while parsing arguments") // should never be called - } - - val resolved = config.collectorConfigFile match { - case Some(f) => ConfigFactory.parseFile(f).resolve() - case None => ConfigFactory.empty() - } - - val collectorConfig = ConfigFactory.load(resolved.withFallback(ConfigFactory.load())) - - val resolverSource = config.igluConfigFile match { - case Some(f) => Source.fromFile(f) - case None => Source.fromResource("default-iglu-resolver.json") - } - - val extraRegistry = sys.env.get(EnvironmentVariables.igluRegistryUrl).map { registry => - val uri = URI.create(registry) - Registry.Http( - Registry.Config(s"Custom ($registry)", 0, List.empty), - Registry.HttpConnection(uri, sys.env.get(EnvironmentVariables.igluApiKey)) - ) - } - - val (resolver, igluClient) = getIgluClientFromSource(resolverSource, extraRegistry) match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while reading Iglu config file: $e.") - } - - val enrichmentConfigs = Option(getClass.getResource("/enrichments")).map { dir => - getEnrichmentRegistryFromPath(Paths.get(dir.toURI), igluClient) match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while reading enrichment config file(s): $e.") - } - }.getOrElse(List.empty) - - val sslContext = sys.env.get(EnvironmentVariables.sslCertificatePassword).map { password => - // Adapted from https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html. - // We could use SSLContext.getDefault instead of all of this, but then we would need to - // force the user to add arcane -D flags when running Micro, which is not the best experience. - val keystore = KeyStore.getInstance("PKCS12") - val certificateFile = getClass.getClassLoader.getResourceAsStream("ssl-certificate.p12") - keystore.load(certificateFile, password.toCharArray) - - val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") - keyManagerFactory.init(keystore, password.toCharArray) - - val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") - trustManagerFactory.init(keystore) - - val context = SSLContext.getInstance("TLS") - context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) - - context - } - - MicroConfig( - ConfigSource.fromConfig(collectorConfig.getConfig("collector")).loadOrThrow[CollectorConfig], - resolver, - igluClient, - enrichmentConfigs, - collectorConfig, - sslContext, - config.outputEnrichedTsv - ) - } - - /** Instantiate an Iglu client from its configuration file. */ - def getIgluClientFromSource(igluConfigSource: Source, extraRegistry: Option[Registry]): Either[String, (Resolver[Id], IgluCirceClient[Id])] = - for { - text <- Either.catchNonFatal(igluConfigSource.mkString).leftMap(_.getMessage) - json <- parse(text).leftMap(_.show) - config <- Resolver.parseConfig(json).leftMap(_.show) - resolver <- Resolver.fromConfig[Id](config).leftMap(_.show).value - completeResolver = resolver.copy(repos = resolver.repos ++ extraRegistry) - } yield (completeResolver, IgluCirceClient.fromResolver[Id](completeResolver, config.cacheSize)) - - def getEnrichmentRegistryFromPath(path: Path, igluClient: IgluCirceClient[Id]) = { - val schemaKey = SchemaKey( - "com.snowplowanalytics.snowplow", - "enrichments", - "jsonschema", - SchemaVer.Full(1, 0, 0) - ) - // Loosely adapted from Enrich#localEnrichmentConfigsExtractor - val directory = Option(path.toFile.listFiles).fold(List.empty[File])(_.toList) - val configs = directory - .filter(_.getName.endsWith(".json")) - .map(scala.io.Source.fromFile(_).mkString) - .map(JsonUtils.extractJson).sequence[EitherS, Json] - .map(jsonConfigs => SelfDescribingData[Json](schemaKey, Json.fromValues(jsonConfigs)).asJson) - .flatMap { jsonConfig => - EnrichmentRegistry.parse(jsonConfig, igluClient, localMode = false) - .leftMap(_.toList.mkString("; ")).toEither - } - val scripts = directory - .filter(_.getName.endsWith(".js")) - .map(scala.io.Source.fromFile(_).mkString) - .map(EnrichmentConf.JavascriptScriptConf(schemaKey, _)) - configs.map(scripts ::: _) - } -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala new file mode 100644 index 0000000..4696b42 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -0,0 +1,213 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.micro + +import cats.data.EitherT +import cats.effect.IO +import cats.implicits._ +import com.monovore.decline.Opts +import com.snowplowanalytics.iglu.client.IgluCirceClient +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig +import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.typesafe.config.{ConfigFactory, Config => TypesafeConfig} +import fs2.io.file.{Files, Path => FS2Path} +import io.circe.config.syntax.CirceConfigOps +import io.circe.syntax.EncoderOps +import io.circe.{Decoder, Json} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.net.URI +import java.nio.file.{Path, Paths} + +object Configuration { + + object Cli { + final case class Config(collector: Option[Path], iglu: Option[Path], outputEnrichedTsv: Boolean) + + private val collector = Opts.option[Path]("collector-config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone + private val iglu = Opts.option[Path]("iglu", "Configuration file for Iglu Client", "i", "iglu.json").orNone + private val outputEnrichedTsv = Opts.flag("output-tsv", "Print events in TSV format to standard output", "t").orFalse + + val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply) + } + + + object EnvironmentVariables { + val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL" + val igluApiKey = "MICRO_IGLU_API_KEY" + val sslCertificatePassword = "MICRO_SSL_CERT_PASSWORD" + } + + final case class DummySinkConfig() + type SinkConfig = DummySinkConfig + implicit val dec: Decoder[DummySinkConfig] = Decoder.instance(_ => Right(DummySinkConfig())) + + final case class MicroConfig(collector: CollectorConfig[SinkConfig], + iglu: IgluResources, + enrichmentsConfig: List[EnrichmentConf], + outputEnrichedTsv: Boolean) + + final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def load(): Opts[EitherT[IO, String, MicroConfig]] = { + Cli.config.map { cliConfig => + for { + collectorConfig <- loadCollectorConfig(cliConfig.collector) + igluResources <- loadIgluResources(cliConfig.iglu) + enrichmentsConfig <- loadEnrichmentConfig(igluResources.client) + } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, cliConfig.outputEnrichedTsv) + } + } + + def loadCollectorConfig(path: Option[Path]): EitherT[IO, String, CollectorConfig[SinkConfig]] = { + val resolveOrder = (config: TypesafeConfig) => + namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.parseResources("collector-micro.conf")))))) + + loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder) + } + + def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = { + val resolveOrder = (config: TypesafeConfig) => + config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf")) + + loadConfig[ResolverConfig](path, resolveOrder) + .flatMap(buildIgluResources) + } + + def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = { + Option(getClass.getResource("/enrichments")) match { + case Some(definedEnrichments) => + val path = Paths.get(definedEnrichments.toURI) + for { + asJson <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".json") + asHocon <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".hocon") + asJSScripts <- loadJSScripts(path) + } yield asJson ::: asHocon ::: asJSScripts + case None => + EitherT.rightT[IO, String](List.empty) + } + } + + private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = + for { + resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show) + completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry()) + client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize)) + } yield IgluResources(resolver, client) + + + private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, + igluClient: IgluCirceClient[IO], + fileType: String): EitherT[IO, String, List[EnrichmentConf]] = { + listAvailableEnrichments(enrichmentsDirectory, fileType) + .flatMap(loadEnrichmentsAsJsons) + .map(asSDD) + .flatMap(parseEnrichments(igluClient)) + } + + private def loadJSScripts(enrichmentsDirectory: Path): EitherT[IO, String, List[EnrichmentConf]] = EitherT.right { + listFiles(enrichmentsDirectory, fileType = ".js") + .flatMap { scripts => + scripts.traverse(buildJSConfig) + } + } + + private def buildJSConfig(script: FS2Path): IO[EnrichmentConf.JavascriptScriptConf] = { + Files[IO] + .readUtf8Lines(script) + .compile + .toList + .map(lines => EnrichmentConf.JavascriptScriptConf(null, lines.mkString("\n"))) + } + + private def listAvailableEnrichments(enrichmentsDirectory: Path, fileType: String) = { + listFiles(enrichmentsDirectory, fileType) + .flatTap(files => logger.info(s"Files found in $enrichmentsDirectory: ${files.mkString(", ")}")) + .attemptT + .leftMap(e => show"Cannot list ${enrichmentsDirectory.toAbsolutePath.toString} directory with JSON: ${e.getMessage}") + } + + private def listFiles(path: Path, fileType: String): IO[List[FS2Path]] = { + Files[IO].list(fs2.io.file.Path.fromNioPath(path)) + .filter(path => path.toString.endsWith(fileType)) + .compile + .toList + } + + private def loadEnrichmentsAsJsons(enrichments: List[FS2Path]): EitherT[IO, String, List[Json]] = { + enrichments.traverse { enrichmentPath => + loadConfig[Json](Some(enrichmentPath.toNioPath), identity) + } + } + + private def asSDD(jsons: List[Json]): SelfDescribingData[Json] = { + val schema = SchemaKey("com.snowplowanalytics.snowplow", "enrichments", "jsonschema", SchemaVer.Full(1, 0, 0)) + SelfDescribingData(schema, Json.arr(jsons: _*)) + } + + private def parseEnrichments(igluClient: IgluCirceClient[IO])(sdd: SelfDescribingData[Json]): EitherT[IO, String, List[EnrichmentConf]] = + EitherT { + EnrichmentRegistry + .parse[IO](sdd.asJson, igluClient, localMode = false, registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO]) + .map(_.toEither) + }.leftMap { x => + show"Cannot decode enrichments - ${x.mkString_(", ")}" + } + + private def readIgluExtraRegistry(): Option[Registry.Http] = { + sys.env.get(EnvironmentVariables.igluRegistryUrl).map { registry => + val uri = URI.create(registry) + Registry.Http( + Registry.Config(s"Custom ($registry)", 0, List.empty), + Registry.HttpConnection(uri, sys.env.get(EnvironmentVariables.igluApiKey)) + ) + } + } + + private def loadConfig[A: Decoder](path: Option[Path], + load: TypesafeConfig => TypesafeConfig): EitherT[IO, String, A] = EitherT { + IO { + for { + config <- Either.catchNonFatal(handleInputPath(path)).leftMap(_.getMessage) + config <- Either.catchNonFatal(config.resolve()).leftMap(_.getMessage) + config <- Either.catchNonFatal(load(config)).leftMap(_.getMessage) + parsed <- config.as[A].leftMap(_.show) + } yield parsed + } + } + + private def handleInputPath(path: Option[Path]): TypesafeConfig = { + path match { + case Some(definedPath) => ConfigFactory.parseFile(definedPath.toFile) + case None => ConfigFactory.empty() + } + } + + private def namespaced(config: TypesafeConfig): TypesafeConfig = { + val namespace = "collector" + if (config.hasPath(namespace)) + config.getConfig(namespace).withFallback(config.withoutPath(namespace)) + else + config + } + + implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) + +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala deleted file mode 100644 index cdcc992..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IdImplicits.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.micro - -import cats.Id -import cats.effect.Clock -import java.util.concurrent.TimeUnit - -object IdImplicits { - - implicit val clockProvider: Clock[Id] = new Clock[Id] { - final def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - final def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) - } - -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala deleted file mode 100644 index c3fbff1..0000000 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/IgluService.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.micro - -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route -import akka.http.scaladsl.model.StatusCodes.NotFound -import cats.Id -import io.circe.generic.auto._ - -import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} - -import IdImplicits._ -import CirceSupport._ - -class IgluService(resolver: Resolver[Id]) { - - def get(vendor: String, name: String, versionStr: String): Route = - SchemaVer.parseFull(versionStr) match { - case Right(version) => - val key = SchemaKey(vendor, name, "jsonschema", version) - resolver.lookupSchema(key) match { - case Right(json) => complete(json) - case Left(error) => complete(NotFound, error) - } - case Left(_) => reject - } - -} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala index e1b374b..0e8f570 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Main.scala @@ -12,96 +12,15 @@ */ package com.snowplowanalytics.snowplow.micro -import java.io.File - -import org.slf4j.LoggerFactory - -import scala.sys.process._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.{ConnectionContext, Http} - -import cats.Id - -import com.snowplowanalytics.snowplow.collectors.scalastream.model.CollectorSinks - -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} -import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution} - -import com.snowplowanalytics.snowplow.micro.ConfigHelper.MicroConfig -import com.snowplowanalytics.snowplow.micro.IdImplicits._ - -/** Read the configuration and instantiate Snowplow Micro, - * which acts as a `Collector` and has an in-memory sink - * holding the valid and invalid events. - * It offers an HTTP endpoint to query this sink. - */ -object Main { - lazy val logger = LoggerFactory.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - val config = ConfigHelper.parseConfig(args) - run(config) - } - - def setupEnrichments(configs: List[EnrichmentConf]): EnrichmentRegistry[Id] = { - configs.flatMap(_.filesToCache).foreach { case (uri, location) => - logger.info(s"Downloading ${uri}...") - uri.toURL #> new File(location) !! - } - - val enrichmentRegistry = EnrichmentRegistry.build[Id](configs, BlockerF.noop, ShiftExecution.noop).value match { - case Right(ok) => ok - case Left(e) => - throw new IllegalArgumentException(s"Error while enabling enrichments: $e.") - } - - val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect { - case Some(e: Enrichment) => e.getClass.getSimpleName - } - if (loadedEnrichments.nonEmpty) { - logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") - } else { - logger.info(s"No enrichments enabled") - } - - enrichmentRegistry - } - - /** Create the in-memory sink, - * get the endpoints for both the collector and to query Snowplow Micro, - * and start the HTTP server. - */ - def run(config: MicroConfig): Unit = { - implicit val system = ActorSystem.create("snowplow-micro", config.akkaConfig) - implicit val executionContext = system.dispatcher - - val enrichmentRegistry = setupEnrichments(config.enrichmentConfigs) - val sinks = CollectorSinks( - MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv), - MemorySink(config.igluClient, enrichmentRegistry, config.outputEnrichedTsv) - ) - val igluService = new IgluService(config.igluResolver) - - val routes = Routing.getMicroRoutes(config.collectorConfig, sinks, igluService) - logger.info("UI available at /micro/ui") - - Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.port) - .bind(routes) - .foreach { binding => - logger.info(s"REST interface bound to ${binding.localAddress}") - } - - config.sslContext.foreach { sslContext => - Http() - .newServerAt(config.collectorConfig.interface, config.collectorConfig.ssl.port) - .enableHttps(ConnectionContext.httpsServer(sslContext)) - .bind(routes) - .foreach { binding => - logger.info(s"HTTPS REST interface bound to ${binding.localAddress}") - } - } - } +import cats.effect.{ExitCode, IO} +import com.monovore.decline.Opts +import com.monovore.decline.effect.CommandIOApp + +object Main + extends CommandIOApp( + name = s"docker run ${BuildInfo.dockerAlias}", + header = "MICRO", + version = BuildInfo.version + ) { + override def main: Opts[IO[ExitCode]] = Run.run() } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a78efd5..ecd44de 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -12,48 +12,53 @@ */ package com.snowplowanalytics.snowplow.micro +import cats.data.{EitherT, Validated} +import cats.effect.IO import cats.implicits._ -import cats.Id -import cats.data.Validated -import io.circe.syntax._ -import org.joda.time.DateTime -import org.slf4j.LoggerFactory import com.snowplowanalytics.iglu.client.IgluCirceClient +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, EventConverter} +import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor} -import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.Sink +import com.snowplowanalytics.snowplow.collector.core.Sink +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils -import IdImplicits._ -import com.snowplowanalytics.snowplow.badrows.BadRow.{EnrichmentFailures, SchemaViolations, TrackerProtocolViolations} -import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import io.circe.syntax._ +import org.joda.time.DateTime +import org.slf4j.LoggerFactory /** Sink of the collector that Snowplow Micro is. - * Contains the functions that are called for each tracking event sent - * to the collector endpoint. - * The events are received as `CollectorPayload`s serialized with Thrift. - * For each event it tries to validate it using Common Enrich, - * and then stores the results in-memory in [[ValidationCache]]. - */ -private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enrichmentRegistry: EnrichmentRegistry[Id], outputEnrichedTsv: Boolean) extends Sink { - val MaxBytes = Int.MaxValue - private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) + * Contains the functions that are called for each tracking event sent + * to the collector endpoint. + * The events are received as `CollectorPayload`s serialized with Thrift. + * For each event it tries to validate it using Common Enrich, + * and then stores the results in-memory in [[ValidationCache]]. + */ +final class MemorySink(igluClient: IgluCirceClient[IO], + registryLookup: RegistryLookup[IO], + enrichmentRegistry: EnrichmentRegistry[IO], + outputEnrichedTsv: Boolean, + processor: Processor, + adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] { + override val maxBytes = Int.MaxValue private lazy val logger = LoggerFactory.getLogger("EventLog") - /** Function of the [[Sink]] called for all the events received by a collector. */ - override def storeRawEvents(events: List[Array[Byte]], key: String) = { - events.foreach(bytes => processThriftBytes(bytes, igluClient, enrichmentRegistry, processor)) + override def isHealthy: IO[Boolean] = IO.pure(true) + + override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = { + events.traverse(bytes => processThriftBytes(bytes)).void } - private def formatEvent(event: GoodEvent) = + private def formatEvent(event: GoodEvent): String = s"id:${event.event.event_id}" + - event.event.app_id.fold("")(i => s" app_id:$i") + - event.eventType.fold("")(t => s" type:$t") + - event.schema.fold("")(s => s" ($s)") + event.event.app_id.fold("")(i => s" app_id:$i") + + event.eventType.fold("")(t => s" type:$t") + + event.schema.fold("")(s => s" ($s)") - private def formatBadRow(badRow: BadRow) = badRow match { + private def formatBadRow(badRow: BadRow): String = badRow match { case TrackerProtocolViolations(_, Failure.TrackerProtocolViolations(_, _, _, messages), _) => messages.map(_.asJson).toList.mkString case SchemaViolations(_, Failure.SchemaViolations(_, messages), _) => @@ -64,73 +69,76 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri } /** Deserialize Thrift bytes into `CollectorPayload`s, - * validate them and store the result in [[ValidationCache]]. - * A `CollectorPayload` can contain several events. - */ - private[micro] def processThriftBytes( - thriftBytes: Array[Byte], - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], - processor: Processor - ): Unit = + * validate them and store the result in [[ValidationCache]]. + * A `CollectorPayload` can contain several events. + */ + private[micro] def processThriftBytes(thriftBytes: Array[Byte]): IO[Unit] = ThriftLoader.toCollectorPayload(thriftBytes, processor) match { case Validated.Valid(maybePayload) => maybePayload match { case Some(collectorPayload) => - new AdapterRegistry().toRawEvents(collectorPayload, igluClient, processor) match { + adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap { case Validated.Valid(rawEvents) => - val (goodEvents, badEvents) = rawEvents.toList.foldRight((Nil, Nil) : (List[GoodEvent], List[BadEvent])) { - case (rawEvent, (good, bad)) => - validateEvent(rawEvent, igluClient, enrichmentRegistry, processor) match { + val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) { + case ((good, bad), rawEvent) => + validateEvent(rawEvent).value.map { case Right(goodEvent) => logger.info(s"GOOD ${formatEvent(goodEvent)}") (goodEvent :: good, bad) case Left((errors, badRow)) => val badEvent = - BadEvent( - Some(collectorPayload), - Some(rawEvent), - errors - ) + BadEvent( + Some(collectorPayload), + Some(rawEvent), + errors + ) logger.warn(s"BAD ${formatBadRow(badRow)}") (good, badEvent :: bad) } } - ValidationCache.addToGood(goodEvents) - ValidationCache.addToBad(badEvents) - if (outputEnrichedTsv) { - goodEvents.foreach { event => - println(event.event.toTsv) - } + partitionEvents.map { + case (goodEvents, badEvents) => + ValidationCache.addToGood(goodEvents) + ValidationCache.addToBad(badEvents) + if (outputEnrichedTsv) { + goodEvents.foreach { event => + println(event.event.toTsv) + } + } else () } case Validated.Invalid(badRow) => val bad = BadEvent(Some(collectorPayload), None, List("Error while extracting event(s) from collector payload and validating it/them.", badRow.compact)) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } case None => val bad = BadEvent(None, None, List("No payload.")) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } case Validated.Invalid(badRows) => val bad = BadEvent(None, None, List("Can't deserialize Thrift bytes.") ++ badRows.toList.map(_.compact)) logger.warn(s"BAD ${bad.errors.head}") - ValidationCache.addToBad(List(bad)) + IO(ValidationCache.addToBad(List(bad))) } /** Validate the raw event using Common Enrich logic, and extract the event type if any, - * the schema if any, and the schemas of the contexts attached to the event if any. - * @return [[GoodEvent]] with the extracted event type, schema and contexts, - * or error if the event couldn't be validated. - */ - private[micro] def validateEvent( - rawEvent: RawEvent, - igluClient: IgluCirceClient[Id], - enrichmentRegistry: EnrichmentRegistry[Id], - processor: Processor - ): Either[(List[String], BadRow), GoodEvent] = - EnrichmentManager.enrichEvent[Id](enrichmentRegistry, igluClient, processor, DateTime.now(), rawEvent, EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), ()) + * the schema if any, and the schemas of the contexts attached to the event if any. + * @return [[GoodEvent]] with the extracted event type, schema and contexts, + * or error if the event couldn't be validated. + */ + private[micro] def validateEvent(rawEvent: RawEvent): EitherT[IO, (List[String], BadRow), GoodEvent] = + EnrichmentManager.enrichEvent[IO]( + enrichmentRegistry, + igluClient, + processor, + DateTime.now(), + rawEvent, + EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), + IO.unit, + registryLookup, + AtomicFields.from(Map.empty) + ) .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure => @@ -138,18 +146,17 @@ private[micro] final case class MemorySink(igluClient: IgluCirceClient[Id], enri } .toEither } - .value.bimap( - badRow => (List("Error while validating the event.", badRow.compact), badRow), - enriched => GoodEvent(rawEvent, enriched.event, getEnrichedSchema(enriched), getEnrichedContexts(enriched), enriched) - ) + .bimap( + badRow => (List("Error while validating the event.", badRow.compact), badRow), + enriched => GoodEvent(rawEvent, enriched.event, getEnrichedSchema(enriched), getEnrichedContexts(enriched), enriched) + ) + - private[micro] def getEnrichedSchema(enriched: Event): Option[String] = + private def getEnrichedSchema(enriched: Event): Option[String] = List(enriched.event_vendor, enriched.event_name, enriched.event_format, enriched.event_version) .sequence .map(_.mkString("iglu:", "/", "")) - private[micro] def getEnrichedContexts(enriched: Event): List[String] = + private def getEnrichedContexts(enriched: Event): List[String] = enriched.contexts.data.map(_.schema.toSchemaUri) - - override def shutdown(): Unit = () } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala new file mode 100644 index 0000000..a4c9c5f --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala @@ -0,0 +1,157 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.IO +import com.snowplowanalytics.snowplow.enrich.common.adapters._ + +object MicroAdapterRegistry { + + private val adaptersSchemas = AdaptersSchemas( + CallrailSchemas("iglu:com.callrail/call_complete/jsonschema/1-0-2"), + CloudfrontAccessLogSchemas( + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-2", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-3", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-1", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-0", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-4", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-5", + "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-6" + ), + GoogleAnalyticsSchemas( + "iglu:com.google.analytics.measurement-protocol/page_view/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/screen_view/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/event/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/transaction/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/item/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/social/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/exception/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/timing/jsonschema/1-0-0", + "iglu:com.google.analytics/undocumented/jsonschema/1-0-0", + "iglu:com.google.analytics/private/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/general/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/user/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/session/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/traffic_source/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/system_info/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/link/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/app/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_action/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/content_experiment/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/hit/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/promotion_action/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_list/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/product_impression_custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/promotion/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/custom_dimension/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/custom_metric/jsonschema/1-0-0", + "iglu:com.google.analytics.measurement-protocol/content_group/jsonschema/1-0-0" + ), + HubspotSchemas( + "iglu:com.hubspot/contact_creation/jsonschema/1-0-0", + "iglu:com.hubspot/contact_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/contact_change/jsonschema/1-0-0", + "iglu:com.hubspot/company_creation/jsonschema/1-0-0", + "iglu:com.hubspot/company_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/company_change/jsonschema/1-0-0", + "iglu:com.hubspot/deal_creation/jsonschema/1-0-0", + "iglu:com.hubspot/deal_deletion/jsonschema/1-0-0", + "iglu:com.hubspot/deal_change/jsonschema/1-0-0" + ), + MailchimpSchemas( + "iglu:com.mailchimp/subscribe/jsonschema/1-0-0", + "iglu:com.mailchimp/unsubscribe/jsonschema/1-0-0", + "iglu:com.mailchimp/campaign_sending_status/jsonschema/1-0-0", + "iglu:com.mailchimp/cleaned_email/jsonschema/1-0-0", + "iglu:com.mailchimp/email_address_change/jsonschema/1-0-0", + "iglu:com.mailchimp/profile_update/jsonschema/1-0-0" + ), + MailgunSchemas( + "iglu:com.mailgun/message_bounced/jsonschema/1-0-0", + "iglu:com.mailgun/message_clicked/jsonschema/1-0-0", + "iglu:com.mailgun/message_complained/jsonschema/1-0-0", + "iglu:com.mailgun/message_delivered/jsonschema/1-0-0", + "iglu:com.mailgun/message_dropped/jsonschema/1-0-0", + "iglu:com.mailgun/message_opened/jsonschema/1-0-0", + "iglu:com.mailgun/recipient_unsubscribed/jsonschema/1-0-0" + ), + MandrillSchemas( + "iglu:com.mandrill/message_bounced/jsonschema/1-0-1", + "iglu:com.mandrill/message_clicked/jsonschema/1-0-1", + "iglu:com.mandrill/message_delayed/jsonschema/1-0-1", + "iglu:com.mandrill/message_marked_as_spam/jsonschema/1-0-1", + "iglu:com.mandrill/message_opened/jsonschema/1-0-1", + "iglu:com.mandrill/message_rejected/jsonschema/1-0-0", + "iglu:com.mandrill/message_sent/jsonschema/1-0-0", + "iglu:com.mandrill/message_soft_bounced/jsonschema/1-0-1", + "iglu:com.mandrill/recipient_unsubscribed/jsonschema/1-0-1" + ), + MarketoSchemas("iglu:com.marketo/event/jsonschema/2-0-0"), + OlarkSchemas( + "iglu:com.olark/transcript/jsonschema/1-0-0", + "iglu:com.olark/offline_message/jsonschema/1-0-0" + ), + PagerdutySchemas( + "iglu:com.pagerduty/incident/jsonschema/1-0-0" + ), + PingdomSchemas( + "iglu:com.pingdom/incident_assign/jsonschema/1-0-0", + "iglu:com.pingdom/incident_notify_user/jsonschema/1-0-0", + "iglu:com.pingdom/incident_notify_of_close/jsonschema/1-0-0" + ), + SendgridSchemas( + "iglu:com.sendgrid/processed/jsonschema/3-0-0", + "iglu:com.sendgrid/dropped/jsonschema/3-0-0", + "iglu:com.sendgrid/delivered/jsonschema/3-0-0", + "iglu:com.sendgrid/deferred/jsonschema/3-0-0", + "iglu:com.sendgrid/bounce/jsonschema/3-0-0", + "iglu:com.sendgrid/open/jsonschema/3-0-0", + "iglu:com.sendgrid/click/jsonschema/3-0-0", + "iglu:com.sendgrid/spamreport/jsonschema/3-0-0", + "iglu:com.sendgrid/unsubscribe/jsonschema/3-0-0", + "iglu:com.sendgrid/group_unsubscribe/jsonschema/3-0-0", + "iglu:com.sendgrid/group_resubscribe/jsonschema/3-0-0" + ), + StatusGatorSchemas( + "iglu:com.statusgator/status_change/jsonschema/1-0-0" + ), + UnbounceSchemas( + "iglu:com.unbounce/form_post/jsonschema/1-0-0" + ), + UrbanAirshipSchemas( + "iglu:com.urbanairship.connect/CLOSE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/CUSTOM/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/FIRST_OPEN/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_DISPLAY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_EXPIRATION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/IN_APP_MESSAGE_RESOLUTION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/LOCATION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/OPEN/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/PUSH_BODY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/REGION/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_DELETE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_DELIVERY/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/RICH_HEAD/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/SEND/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/TAG_CHANGE/jsonschema/1-0-0", + "iglu:com.urbanairship.connect/UNINSTALL/jsonschema/1-0-0" + ), + VeroSchemas( + "iglu:com.getvero/bounced/jsonschema/1-0-0", + "iglu:com.getvero/clicked/jsonschema/1-0-0", + "iglu:com.getvero/delivered/jsonschema/1-0-0", + "iglu:com.getvero/opened/jsonschema/1-0-0", + "iglu:com.getvero/sent/jsonschema/1-0-0", + "iglu:com.getvero/unsubscribed/jsonschema/1-0-0", + "iglu:com.getvero/created/jsonschema/1-0-0", + "iglu:com.getvero/updated/jsonschema/1-0-0" + ) + ) + + def create(): AdapterRegistry[IO] = { + new AdapterRegistry[IO](Map.empty, adaptersSchemas) + } +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala index 54811b3..9a2d2d8 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Routing.scala @@ -1,121 +1,109 @@ -/* - * Copyright (c) 2019-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ package com.snowplowanalytics.snowplow.micro -import akka.http.scaladsl.server.{Route, RouteResult} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.model.headers.{`Access-Control-Allow-Methods`, `Access-Control-Allow-Headers`} -import akka.http.scaladsl.model.{HttpMethods, StatusCodes} +import cats.effect.IO +import com.snowplowanalytics.iglu.client.ClientError.ResolutionError +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload +import com.snowplowanalytics.snowplow.micro.Routing._ +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.syntax.EncoderOps +import io.circe.{Decoder, Encoder} +import org.apache.http.NameValuePair +import org.http4s.circe.CirceEntityDecoder._ +import org.http4s.circe.CirceEntityEncoder._ +import org.http4s.dsl.Http4sDsl +import org.http4s.{HttpRoutes, Response, StaticFile} +import org.joda.time.DateTime -import io.circe.generic.auto._ +final class Routing(igluResolver: Resolver[IO]) + (implicit lookup: RegistryLookup[IO]) extends Http4sDsl[IO] { -import com.snowplowanalytics.snowplow.collectors.scalastream.model.{CollectorConfig, CollectorSinks} -import com.snowplowanalytics.snowplow.collectors.scalastream.{CollectorRoute, CollectorService, HealthService} - -import scala.concurrent.ExecutionContext - -import CirceSupport._ - -/** Contain definitions of the routes (endpoints) for Snowplow Micro. - * Make the link between Snowplow Micro endpoints and the functions called. - * Snowplow Micro has 2 types of endpoints: - * - to receive tracking events; - * - to query the validated events. - * - * More information about an Akka HTTP routes can be found here: - * https://doc.akka.io/docs/akka-http/current/routing-dsl/routes.html. - */ -private[micro] object Routing { - - /** Create `Route` for Snowplow Micro, with the endpoints of the collector to receive tracking events - * and the endpoints to query the validated events. - */ - def getMicroRoutes( - collectorConf: CollectorConfig, - collectorSinks: CollectorSinks, - igluService: IgluService - )(implicit ec: ExecutionContext): Route = { - val c = new CollectorService(collectorConf, collectorSinks, buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - - val health = new HealthService.Settable { - toHealthy() - } - val collectorRoutes = new CollectorRoute { - override def collectorService = c - override def healthService = health - }.collectorRoute - - withCors(c) { - pathPrefix("micro") { - (get | post) { - path("all") { - complete(ValidationCache.getSummary()) - } ~ path("reset") { - ValidationCache.reset() - complete(ValidationCache.getSummary()) - } - } ~ get { - path("good") { - complete(ValidationCache.filterGood(FiltersGood(None, None, None, None))) - } ~ path("bad") { - complete(ValidationCache.filterBad(FiltersBad(None, None, None))) - } ~ pathPrefix("ui") { - pathEndOrSingleSlash { - getFromResource("ui/index.html") - } ~ path("errors") { - getFromResource("ui/errors.html") - } ~ getFromResourceDirectory("ui") ~ getFromResource("ui/404.html") + val value: HttpRoutes[IO] = HttpRoutes.of[IO] { + case request@method -> "micro" /: path => + (method, path.segments.head.encoded) match { + case (POST | GET, "all") => + Ok(ValidationCache.getSummary()) + case (POST | GET, "reset") => + ValidationCache.reset() + Ok(ValidationCache.getSummary()) + case (GET, "good") => + Ok(ValidationCache.filterGood(FiltersGood(None, None, None, None))) + case (POST, "good") => + request.as[FiltersGood].flatMap { filters => + Ok(ValidationCache.filterGood(filters).asJson) } - } ~ post { - path("good") { - entity(as[FiltersGood]) { filters => - complete(ValidationCache.filterGood(filters)) - } - } ~ path("bad") { - entity(as[FiltersBad]) { filters => - complete(ValidationCache.filterBad(filters)) - } + case (GET, "bad") => + Ok(ValidationCache.filterBad(FiltersBad(None, None, None))) + case (POST, "bad") => + request.as[FiltersBad].flatMap { filters => + Ok(ValidationCache.filterBad(filters)) } - } ~ options { - complete(StatusCodes.OK) - } ~ pathPrefix("iglu") { - path(Segment / Segment / "jsonschema" / Segment) { - igluService.get(_, _, _) - } ~ { - complete(StatusCodes.NotFound, "Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") + case (GET, "iglu") => + path match { + case Path.empty / "iglu" / vendor / name / "jsonschema" / versionVar => + lookupSchema(vendor, name, versionVar) + case _ => + NotFound("Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") } - } ~ { - complete(StatusCodes.NotFound, "Path for micro has to be one of: /all /good /bad /reset /iglu") - } + case (GET, "ui") => + handleUIPath(path) + case _ => + NotFound("Path for micro has to be one of: /all /good /bad /reset /iglu") } - } ~ collectorRoutes } - /** Wrap a Route with CORS header handling. - * - * Reuses the implementation used by the stream collector - */ - private def withCors(c: CollectorService)(route: Route)(implicit ec: ExecutionContext): Route = - extractRequest { request => requestContext => - route(requestContext).map { - case RouteResult.Complete(response) => - val r = response.withHeaders(List( - `Access-Control-Allow-Methods`(List(HttpMethods.POST, HttpMethods.GET, HttpMethods.OPTIONS)), - c.accessControlAllowOriginHeader(request), - `Access-Control-Allow-Headers`("Content-Type") - )) - RouteResult.Complete(r) - case other => other - } + private def handleUIPath(path: Path): IO[Response[IO]] = { + path match { + case Path.empty / "ui" | Path.empty / "ui" / "/" => + resource("ui/index.html") + case Path.empty / "ui" / "errors" => + resource("ui/errors.html") + case other => + resource(other.renderString) + } + } + + private def resource(path: String): IO[Response[IO]] = { + StaticFile.fromResource[IO](path) + .getOrElseF(NotFound()) + } + + private def lookupSchema(vendor: String, name: String, versionVar: String): IO[Response[IO]] = { + SchemaVer.parseFull(versionVar) match { + case Right(version) => + val key = SchemaKey(vendor, name, "jsonschema", version) + igluResolver.lookupSchema(key).flatMap { + case Right(json) => Ok(json) + case Left(error) => NotFound(error) + } + case Left(_) => NotFound("Schema lookup should be in format iglu/{vendor}/{schemaName}/jsonschema/{model}-{revision}-{addition}") } + } } + +object Routing { + + implicit val dateTimeEncoder: Encoder[DateTime] = + Encoder[String].contramap(_.toString) + + implicit val nameValuePairEncoder: Encoder[NameValuePair] = + Encoder[String].contramap(kv => s"${kv.getName}=${kv.getValue}") + + implicit val vs: Encoder[ValidationSummary] = deriveEncoder + implicit val ge: Encoder[GoodEvent] = deriveEncoder + implicit val rwe: Encoder[RawEvent] = deriveEncoder + implicit val cp: Encoder[CollectorPayload] = deriveEncoder + implicit val cpa: Encoder[CollectorPayload.Api] = deriveEncoder + implicit val cps: Encoder[CollectorPayload.Source] = deriveEncoder + implicit val cpc: Encoder[CollectorPayload.Context] = deriveEncoder + implicit val e: Encoder[Event] = deriveEncoder + implicit val be: Encoder[BadEvent] = deriveEncoder + implicit val re: Encoder[ResolutionError] = deriveEncoder + + implicit val fg: Decoder[FiltersGood] = deriveDecoder + implicit val fb: Decoder[FiltersBad] = deriveDecoder +} \ No newline at end of file diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala new file mode 100644 index 0000000..1483dd2 --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.micro + +import cats.data.EitherT +import cats.effect.{ExitCode, IO, Resource} +import cats.implicits._ +import com.monovore.decline.Opts +import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup +import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.collector.core._ +import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} +import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} +import com.snowplowanalytics.snowplow.micro.Configuration.MicroConfig +import org.http4s.ember.client.EmberClientBuilder +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.io.File +import java.security.{KeyStore, SecureRandom} +import java.util.concurrent.Executors +import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} +import scala.concurrent.ExecutionContext +import scala.sys.process._ + +object Run { + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def run(): Opts[IO[ExitCode]] = { + Configuration.load().map { configuration => + handleAppErrors { + configuration + .semiflatMap { validMicroConfig => + buildEnvironment(validMicroConfig) + .use(_ => IO.never) + .as(ExitCode.Success) + } + } + } + } + + private def buildEnvironment(config: MicroConfig): Resource[IO, Unit] = { + for { + _ <- Resource.eval(setupSSLContext()) + enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) + badProcessor = Processor(BuildInfo.name, BuildInfo.version) + adapterRegistry = MicroAdapterRegistry.create() + lookup = JavaNetRegistryLookup.ioLookupInstance[IO] + sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) + collectorService = new Service[IO]( + config.collector, + Sinks(sink, sink), + BuildInfo + ) + collectorRoutes = new Routes[IO]( + config.collector.enableDefaultRedirect, + config.collector.rootResponse.enabled, + config.collector.crossDomain.enabled, + collectorService + ).value + + miniRoutes = new Routing(config.iglu.resolver)(lookup).value + allRoutes = miniRoutes <+> collectorRoutes + _ <- HttpServer.build[IO]( + allRoutes, + if (config.collector.ssl.enable) config.collector.ssl.port else config.collector.port, + config.collector.ssl.enable, + config.collector.hsts, + config.collector.networking, + config.collector.monitoring.metrics + ) + } yield () + } + + private def setupSSLContext(): IO[Unit] = IO { + sys.env.get(Configuration.EnvironmentVariables.sslCertificatePassword).foreach { password => + // Adapted from https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html. + // We could use SSLContext.getDefault instead of all of this, but then we would need to + // force the user to add arcane -D flags when running Micro, which is not the best experience. + val keystore = KeyStore.getInstance("PKCS12") + val certificateFile = getClass.getClassLoader.getResourceAsStream("ssl-certificate.p12") + keystore.load(certificateFile, password.toCharArray) + + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(keystore, password.toCharArray) + + val trustManagerFactory = TrustManagerFactory.getInstance("SunX509") + trustManagerFactory.init(keystore) + + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) + + SSLContext.setDefault(context) + } + } + + private def buildEnrichmentRegistry(configs: List[EnrichmentConf]): Resource[IO, EnrichmentRegistry[IO]] = { + for { + _ <- Resource.eval(downloadAssets(configs)) + shift <- ShiftExecution.ofSingleThread[IO] + httpClient <- EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO]) + blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool) + enrichmentRegistry <- Resource.eval(EnrichmentRegistry.build[IO](configs, shift, httpClient, blockingEC) + .leftMap(error => new IllegalArgumentException(s"can't build EnrichmentRegistry: $error")) + .value.rethrow) + _ <- Resource.eval { + val loadedEnrichments = enrichmentRegistry.productIterator.toList.collect { + case Some(e: Enrichment) => e.getClass.getSimpleName + } + if (loadedEnrichments.nonEmpty) { + logger.info(s"Enabled enrichments: ${loadedEnrichments.mkString(", ")}") + } else { + logger.info(s"No enrichments enabled") + } + } + + } yield enrichmentRegistry + } + + private def downloadAssets(configs: List[EnrichmentConf]): IO[Unit] = { + configs + .flatMap(_.filesToCache) + .traverse_ { case (uri, location) => + logger.info(s"Downloading $uri...") *> IO(uri.toURL #> new File(location) !!) + } + } + + private def handleAppErrors(appOutput: EitherT[IO, String, ExitCode]): IO[ExitCode] = { + appOutput + .leftSemiflatMap { error => + logger.error(error).as(ExitCode.Error) + } + .merge + .handleErrorWith { exception => + logger.error(exception)("Exiting") >> + prettyLogException(exception).as(ExitCode.Error) + } + } + + private def prettyLogException(e: Throwable): IO[Unit] = { + + def logCause(e: Throwable): IO[Unit] = + Option(e.getCause) match { + case Some(e) => logger.error(s"caused by: ${e.getMessage}") >> logCause(e) + case None => IO.unit + } + + logger.error(e.getMessage) >> logCause(e) + } + + +} diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala index 58b1b1f..a6eb260 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/ConfigHelperSpec.scala @@ -12,12 +12,16 @@ */ package com.snowplowanalytics.snowplow.micro +import cats.effect.testing.specs2.CatsEffect import org.specs2.mutable.Specification -class ConfigHelperSpec extends Specification { +class ConfigHelperSpec extends Specification with CatsEffect { "ConfigHelper" >> { "will produce a valid parsed collector config if `--collector-config` is not present" >> { - ConfigHelper.parseConfig(Array()) must not(throwA[Exception]) + Configuration.loadCollectorConfig(None).value.map { + case Right(_) => ok + case Left(_) => ko + } } } } diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala index 18c707c..dfaa100 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala @@ -12,136 +12,163 @@ */ package com.snowplowanalytics.snowplow.micro -import org.specs2.mutable.Specification - -import cats.Id - +import cats.effect.testing.specs2.CatsResource +import cats.effect.{IO, Resource} import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver -import com.snowplowanalytics.iglu.client.resolver.registries.Registry - +import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} +import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import org.specs2.mutable.SpecificationLike -import com.snowplowanalytics.snowplow.badrows.Processor +class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike { -class MemorySinkSpec extends Specification { import events._ - private val igluClient = IgluCirceClient.fromResolver[Id](Resolver(List(Registry.IgluCentral), None), 500) - private val enrichmentRegistry = new EnrichmentRegistry[Id]() - private val processor = Processor(buildinfo.BuildInfo.name, buildinfo.BuildInfo.version) - private val sink = MemorySink(igluClient, enrichmentRegistry, outputEnrichedTsv = false) + override val resource: Resource[IO, MemorySink] = Resource.eval(createSink()) sequential "processThriftBytes" >> { - "should add a BadEvent to the cache if the array of bytes is not a valid Thrift payload" >> { + "should add a BadEvent to the cache if the array of bytes is not a valid Thrift payload" >> withResource { sink => ValidationCache.reset() val bytes = Array(1, 3, 5, 7).map(_.toByte) - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Can't deserialize Thrift bytes")) => ok } - ValidationCache.filterGood().size must_== 0 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Can't deserialize Thrift bytes")) => ok } + ValidationCache.filterGood().size must beEqualTo(0) + } } - "should add a BadEvent to the cache if RawEvent(s) can't be extracted from the CollectorPayload" >> { + "should add a BadEvent to the cache if RawEvent(s) can't be extracted from the CollectorPayload" >> withResource { sink => ValidationCache.reset() val bytes = buildThriftBytesBadCollectorPayload() - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while extracting event(s) from collector payload")) => ok } - ValidationCache.filterGood().size must_== 0 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while extracting event(s) from collector payload")) => ok } + ValidationCache.filterGood().size must beEqualTo(0) + } } - "should add a GoodEvent and a BadEvent to the cache for a CollectorPayload containing both" >> { + "should add a GoodEvent and a BadEvent to the cache for a CollectorPayload containing both" >> withResource { sink => ValidationCache.reset() val bytes = buildThriftBytes1Good1Bad() - sink.processThriftBytes(bytes, igluClient, enrichmentRegistry, processor) - ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while validating the event")) => ok } - ValidationCache.filterGood().size must_== 1 + sink.processThriftBytes(bytes).map { _ => + ValidationCache.filterBad() must beLike { case List(badEvent) if badEvent.errors.exists(_.contains("Error while validating the event")) => ok } + ValidationCache.filterGood().size must beEqualTo(1) + } } } "validateEvent" >> { - "should fail if the timestamp is not valid" >> { + "should fail if the timestamp is not valid" >> withResource { sink => val raw = buildRawEvent() val withoutTimestamp = raw.copy(context = raw.context.copy(timestamp = None)) val expected = "Error while validating the event" - sink.validateEvent(withoutTimestamp, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(withoutTimestamp).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event type parameter is not set" >> { + "should fail if the event type parameter is not set" >> withResource { sink => val raw = buildRawEvent() val withoutEvent = raw.copy(parameters = raw.parameters - "e") val expected = "Error while validating the event" - sink.validateEvent(withoutEvent, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(withoutEvent).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail for an invalid unstructured event" >> { + "should fail for an invalid unstructured event" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjInvalid))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event has an invalid context" >> { + "should fail if the event has an invalid context" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjInvalid)))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail for a unstructured event with an unknown schema" >> { + "should fail for a unstructured event with an unknown schema" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjDoesNotExist))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "should fail if the event has a context with an unknown schema" >> { + "should fail if the event has a context with an unknown schema" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjDoesNotExist)))) val expected = "Error while validating the event" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beLeft.like { - case (errors, _) if errors.exists(_.contains(expected)) => ok - case errs => ko(s"$errs doesn't contain [$expected]") + sink.validateEvent(raw).value.map { result => + result must beLeft.like { + case (errors, _) if errors.exists(_.contains(expected)) => ok + case errs => ko(s"$errs doesn't contain [$expected]") + } } } - "extract the type of an event" >> { + "extract the type of an event" >> withResource { sink => val raw = buildRawEvent() val expected = "page_ping" - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, typE, _, _, _) if typE == Some(expected) => ok - case GoodEvent(_, typE, _, _, _) => ko(s"extracted type $typE isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, typE, _, _, _) if typE == Some(expected) => ok + case GoodEvent(_, typE, _, _, _) => ko(s"extracted type $typE isn't $expected") + } } } - "should extract the schema of an unstructured event" >> { + "should extract the schema of an unstructured event" >> withResource { sink => val raw = buildRawEvent(Some(buildUnstruct(sdjLinkClick))) val expected = schemaLinkClick - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, _, schema, _, _) if schema == Some(expected) => ok - case GoodEvent(_, _, schema, _, _) => ko(s"extracted schema $schema isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, _, schema, _, _) if schema == Some(expected) => ok + case GoodEvent(_, _, schema, _, _) => ko(s"extracted schema $schema isn't $expected") + } } } - "should extract the contexts of an event" >> { + "should extract the contexts of an event" >> withResource { sink => val raw = buildRawEvent(None, Some(buildContexts(List(sdjLinkClick, sdjMobileContext)))) val expected = List(schemaLinkClick, schemaMobileContext) - sink.validateEvent(raw, igluClient, enrichmentRegistry, processor) must beRight.like { - case GoodEvent(_, _, _, contexts, _) if contexts == expected => ok - case GoodEvent(_, _, _, contexts, _) => ko(s"extracted contexts $contexts isn't $expected") + sink.validateEvent(raw).value.map { result => + result must beRight.like { + case GoodEvent(_, _, _, contexts, _) if contexts == expected => ok + case GoodEvent(_, _, _, contexts, _) => ko(s"extracted contexts $contexts isn't $expected") + } } } } + + private def createSink(): IO[MemorySink] = { + for { + igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500) + enrichmentRegistry = new EnrichmentRegistry[IO]() + processor = Processor(BuildInfo.name, BuildInfo.version) + adapterRegistry = MicroAdapterRegistry.create() + lookup = JavaNetRegistryLookup.ioLookupInstance[IO] + } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry) + } + }