Skip to content

Commit

Permalink
RDB Loader: mitigate exhausted input error (close #203)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Nov 30, 2020
1 parent 5e3955d commit b1f50b0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import cats.Id
import cats.data._
import cats.implicits._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.core.circe.implicits.{ schemaCriterionDecoder => _, _ }
import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaCriterion}

import com.snowplowanalytics.iglu.client.Client

import io.circe._
import io.circe.Decoder._
import io.circe.generic.semiauto._
Expand Down Expand Up @@ -251,6 +252,11 @@ object StorageTarget {
* @return circe AST
*/
private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ParseError, SelfDescribingData[Json]] = {
client.check(json).value.leftMap(e => ParseError(e.show)).leftMap(error => ParseError(s"${json.schema.toSchemaUri} ${error.message}")).as(json)
def attempt = client.check(json)
attempt
.recoverWith { case error if isInputError(error) => attempt }
.value
.leftMap(error => ParseError(s"${json.schema.toSchemaUri} ${error.show}"))
.as(json)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ package com.snowplowanalytics.snowplow.rdbloader

import java.util.UUID

import scala.concurrent.duration.{TimeUnit, MILLISECONDS, NANOSECONDS}

import io.circe._

import cats.Id
import cats.effect.Clock

import scala.concurrent.duration.{MILLISECONDS, NANOSECONDS, TimeUnit}
import com.snowplowanalytics.iglu.client.ClientError
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryError

import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider

package object common {
Expand All @@ -45,4 +50,19 @@ package object common {
case Left(message) => Left(DecodingFailure(message, hCursor.history))
}
}

def isInputError(clientError: ClientError): Boolean =
clientError match {
case ClientError.ValidationError(_) =>
false
case ClientError.ResolutionError(map) =>
map.values.toList.flatMap(_.errors.toList).exists {
case RegistryError.RepoFailure(message) =>
message.contains("exhausted input")
case RegistryError.ClientFailure(message) =>
message.contains("exhausted input")
case RegistryError.NotFound =>
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl

import cats.effect.{ Sync, Clock }
import cats.effect.{Sync, Clock}

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError

import com.snowplowanalytics.snowplow.rdbloader.LoaderError
import com.snowplowanalytics.snowplow.rdbloader.common._
import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure
Expand All @@ -32,10 +35,18 @@ object Iglu {
def apply[F[_]](implicit ev: Iglu[F]): Iglu[F] = ev

def igluInterpreter[F[_]: Sync: Clock](client: Client[F, Json]): Iglu[F] = new Iglu[F] {
def getSchemas(vendor: String, name: String, model: Int): F[Either[LoaderError, SchemaList]] =
Flattening.getOrdered(client.resolver, vendor, name, model).leftMap { resolutionError =>
val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError"
LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message))
}.value
def getSchemas(vendor: String, name: String, model: Int): F[Either[LoaderError, SchemaList]] = {
val attempt = Flattening.getOrdered(client.resolver, vendor, name, model)
attempt
.recoverWith {
case LoaderIgluError.SchemaListNotFound(_, error) if isInputError(error) =>
attempt
}
.leftMap { resolutionError =>
val message = s"Cannot get schemas for iglu:$vendor/$name/jsonschema/$model-*-*\n$resolutionError"
LoaderError.DiscoveryError(DiscoveryFailure.IgluError(message))
}
.value
}
}
}

0 comments on commit b1f50b0

Please sign in to comment.