Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade common-streams to 0.8.0-M4 #383

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
12 changes: 7 additions & 5 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
# -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll.
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand Down Expand Up @@ -139,6 +134,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"legacyColumns": []
"exitOnMissingIgluSchema": true

"http": {
oguzhanunlu marked this conversation as resolved.
Show resolved Hide resolved
"client": ${snowplow.defaults.http.client}
}

"monitoring": {
"metrics": {
"statsd": ${snowplow.defaults.statsd}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.comcast.ip4s.Port
import scala.concurrent.duration.FiniteDuration
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand All @@ -32,7 +32,8 @@ case class Config[+Source, +Sink](
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion],
legacyColumns: List[SchemaCriterion],
exitOnMissingIgluSchema: Boolean
exitOnMissingIgluSchema: Boolean,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -91,6 +92,8 @@ object Config {
tooManyColumns: TooManyColumnsRetries
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val sinkWithMaxSize = for {
Expand All @@ -114,6 +117,7 @@ object Config {
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]

// TODO add bigquery docs
implicit val licenseDecoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ package com.snowplowanalytics.snowplow.bigquery

import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import retry.RetryPolicy

Expand All @@ -21,7 +19,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}

case class Environment[F[_]](
appInfo: AppInfo,
Expand Down Expand Up @@ -55,7 +53,7 @@ object Environment {
sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.main.http.client)
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object Processing {
}

/** Transform the Event into values compatible with the BigQuery sdk */
private def transform[F[_]: Sync: RegistryLookup](
private def transform[F[_]: Async: RegistryLookup](
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, Batched, BatchAfterTransform] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -126,7 +126,8 @@ object KafkaConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

private val extendedConfig = Config[KafkaSourceConfig, KafkaSinkConfig](
Expand Down Expand Up @@ -219,6 +220,7 @@ object KafkaConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

Expand Down Expand Up @@ -67,7 +66,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -124,7 +122,8 @@ object KinesisConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

// workerIdentifer coming from "HOSTNAME" env variable set in BuildSettings
Expand All @@ -135,7 +134,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -214,6 +212,7 @@ object KinesisConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.pubsub.{GcpUserAgent => PubsubUserAgent}
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -121,7 +121,8 @@ object PubsubConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
Expand Down Expand Up @@ -207,6 +208,7 @@ object PubsubConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.8.0-M2"
val streams = "0.8.0-M4"
val igluClient = "3.1.0"

// tests
Expand Down
Loading