Skip to content

Commit

Permalink
Remove too long atomic fields (close #889)
Browse files Browse the repository at this point in the history
Co-Authored-By: Benjamin BENOIST <[email protected]>
  • Loading branch information
oguzhanunlu and benjben committed Apr 8, 2024
1 parent c4ab89b commit 9999d5e
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,97 +32,98 @@ object AtomicFields {

final case class AtomicField(
name: String,
enrichedValueExtractor: EnrichedEvent => String
enrichedValueExtractor: EnrichedEvent => String,
nullify: EnrichedEvent => Unit
)
final case class LimitedAtomicField(value: AtomicField, limit: Int)

// format: off
val supportedFields: List[AtomicField] =
List(
AtomicField(name = "app_id", _.app_id ),
AtomicField(name = "platform", _.platform ),
AtomicField(name = "event", _.event ),
AtomicField(name = "event_id", _.event ),
AtomicField(name = "name_tracker", _.name_tracker ),
AtomicField(name = "v_tracker", _.v_tracker ),
AtomicField(name = "v_collector", _.v_collector ),
AtomicField(name = "v_etl", _.v_etl ),
AtomicField(name = "user_id", _.user_id ),
AtomicField(name = "user_ipaddress", _.user_ipaddress ),
AtomicField(name = "user_fingerprint", _.user_fingerprint ),
AtomicField(name = "domain_userid", _.domain_userid ),
AtomicField(name = "network_userid", _.network_userid ),
AtomicField(name = "geo_country", _.geo_country ),
AtomicField(name = "geo_region", _.geo_region ),
AtomicField(name = "geo_city", _.geo_city ),
AtomicField(name = "geo_zipcode", _.geo_zipcode ),
AtomicField(name = "geo_region_name", _.geo_region_name ),
AtomicField(name = "ip_isp", _.ip_isp ),
AtomicField(name = "ip_organization", _.ip_organization ),
AtomicField(name = "ip_domain", _.ip_domain ),
AtomicField(name = "ip_netspeed", _.ip_netspeed ),
AtomicField(name = "page_url", _.page_url ),
AtomicField(name = "page_title", _.page_title ),
AtomicField(name = "page_referrer", _.page_referrer ),
AtomicField(name = "page_urlscheme", _.page_urlscheme ),
AtomicField(name = "page_urlhost", _.page_urlhost ),
AtomicField(name = "page_urlpath", _.page_urlpath ),
AtomicField(name = "page_urlquery", _.page_urlquery ),
AtomicField(name = "page_urlfragment", _.page_urlfragment ),
AtomicField(name = "refr_urlscheme", _.refr_urlscheme ),
AtomicField(name = "refr_urlhost", _.refr_urlhost ),
AtomicField(name = "refr_urlpath", _.refr_urlpath ),
AtomicField(name = "refr_urlquery", _.refr_urlquery ),
AtomicField(name = "refr_urlfragment", _.refr_urlfragment ),
AtomicField(name = "refr_medium", _.refr_medium ),
AtomicField(name = "refr_source", _.refr_source ),
AtomicField(name = "refr_term", _.refr_term ),
AtomicField(name = "mkt_medium", _.mkt_medium ),
AtomicField(name = "mkt_source", _.mkt_source ),
AtomicField(name = "mkt_term", _.mkt_term ),
AtomicField(name = "mkt_content", _.mkt_content ),
AtomicField(name = "mkt_campaign", _.mkt_campaign ),
AtomicField(name = "se_category", _.se_category ),
AtomicField(name = "se_action", _.se_action ),
AtomicField(name = "se_label", _.se_label ),
AtomicField(name = "se_property", _.se_property ),
AtomicField(name = "tr_orderid", _.tr_orderid ),
AtomicField(name = "tr_affiliation", _.tr_affiliation ),
AtomicField(name = "tr_city", _.tr_city ),
AtomicField(name = "tr_state", _.tr_state ),
AtomicField(name = "tr_country", _.tr_country ),
AtomicField(name = "ti_orderid", _.ti_orderid ),
AtomicField(name = "ti_sku", _.ti_sku ),
AtomicField(name = "ti_name", _.ti_name ),
AtomicField(name = "ti_category", _.ti_category ),
AtomicField(name = "useragent", _.useragent ),
AtomicField(name = "br_name", _.br_name ),
AtomicField(name = "br_family", _.br_family ),
AtomicField(name = "br_version", _.br_version ),
AtomicField(name = "br_type", _.br_type ),
AtomicField(name = "br_renderengine", _.br_renderengine ),
AtomicField(name = "br_lang", _.br_lang ),
AtomicField(name = "br_colordepth", _.br_colordepth ),
AtomicField(name = "os_name", _.os_name ),
AtomicField(name = "os_family", _.os_family ),
AtomicField(name = "os_manufacturer", _.os_manufacturer ),
AtomicField(name = "os_timezone", _.os_timezone ),
AtomicField(name = "dvce_type", _.dvce_type ),
AtomicField(name = "doc_charset", _.doc_charset ),
AtomicField(name = "tr_currency", _.tr_currency ),
AtomicField(name = "ti_currency", _.ti_currency ),
AtomicField(name = "base_currency", _.base_currency ),
AtomicField(name = "geo_timezone", _.geo_timezone ),
AtomicField(name = "mkt_clickid", _.mkt_clickid ),
AtomicField(name = "mkt_network", _.mkt_network ),
AtomicField(name = "etl_tags", _.etl_tags ),
AtomicField(name = "refr_domain_userid", _.refr_domain_userid ),
AtomicField(name = "domain_sessionid", _.domain_sessionid ),
AtomicField(name = "event_vendor", _.event_vendor ),
AtomicField(name = "event_name", _.event_name ),
AtomicField(name = "event_format", _.event_format ),
AtomicField(name = "event_version", _.event_version ),
AtomicField(name = "event_fingerprint", _.event_fingerprint ),
AtomicField(name = "app_id", _.app_id, _.app_id = null),
AtomicField(name = "platform", _.platform, _.platform = null),
AtomicField(name = "event", _.event, _.event = null),
AtomicField(name = "event_id", _.event_id, _ => ()), // required in loading
AtomicField(name = "name_tracker", _.name_tracker, _.name_tracker = null),
AtomicField(name = "v_tracker", _.v_tracker, _.v_tracker = null),
AtomicField(name = "v_collector", _.v_collector, _ => ()), // required in loading
AtomicField(name = "v_etl", _.v_etl, _ => ()), // required in loading
AtomicField(name = "user_id", _.user_id, _.user_id = null),
AtomicField(name = "user_ipaddress", _.user_ipaddress, _.user_ipaddress = null),
AtomicField(name = "user_fingerprint", _.user_fingerprint, _.user_fingerprint = null),
AtomicField(name = "domain_userid", _.domain_userid, _.domain_userid = null),
AtomicField(name = "network_userid", _.network_userid, _.network_userid = null),
AtomicField(name = "geo_country", _.geo_country, _.geo_country = null),
AtomicField(name = "geo_region", _.geo_region, _.geo_region = null),
AtomicField(name = "geo_city", _.geo_city, _.geo_city = null),
AtomicField(name = "geo_zipcode", _.geo_zipcode, _.geo_zipcode = null),
AtomicField(name = "geo_region_name", _.geo_region_name, _.geo_region_name = null),
AtomicField(name = "ip_isp", _.ip_isp, _.ip_isp = null),
AtomicField(name = "ip_organization", _.ip_organization, _.ip_organization = null),
AtomicField(name = "ip_domain", _.ip_domain, _.ip_domain = null),
AtomicField(name = "ip_netspeed", _.ip_netspeed, _.ip_netspeed = null),
AtomicField(name = "page_url", _.page_url, _.page_url = null),
AtomicField(name = "page_title", _.page_title, _.page_title = null),
AtomicField(name = "page_referrer", _.page_referrer, _.page_referrer = null),
AtomicField(name = "page_urlscheme", _.page_urlscheme, _.page_urlscheme = null),
AtomicField(name = "page_urlhost", _.page_urlhost, _.page_urlhost = null),
AtomicField(name = "page_urlpath", _.page_urlpath, _.page_urlpath = null),
AtomicField(name = "page_urlquery", _.page_urlquery, _.page_urlquery = null),
AtomicField(name = "page_urlfragment", _.page_urlfragment, _.page_urlfragment = null),
AtomicField(name = "refr_urlscheme", _.refr_urlscheme, _.refr_urlscheme = null),
AtomicField(name = "refr_urlhost", _.refr_urlhost, _.refr_urlhost = null),
AtomicField(name = "refr_urlpath", _.refr_urlpath, _.refr_urlpath = null),
AtomicField(name = "refr_urlquery", _.refr_urlquery, _.refr_urlquery = null),
AtomicField(name = "refr_urlfragment", _.refr_urlfragment, _.refr_urlfragment = null),
AtomicField(name = "refr_medium", _.refr_medium, _.refr_medium = null),
AtomicField(name = "refr_source", _.refr_source, _.refr_source = null),
AtomicField(name = "refr_term", _.refr_term, _.refr_term = null),
AtomicField(name = "mkt_medium", _.mkt_medium, _.mkt_medium = null),
AtomicField(name = "mkt_source", _.mkt_source, _.mkt_source = null),
AtomicField(name = "mkt_term", _.mkt_term, _.mkt_term = null),
AtomicField(name = "mkt_content", _.mkt_content, _.mkt_content = null),
AtomicField(name = "mkt_campaign", _.mkt_campaign, _.mkt_campaign = null),
AtomicField(name = "se_category", _.se_category, _.se_category = null),
AtomicField(name = "se_action", _.se_action, _.se_action = null),
AtomicField(name = "se_label", _.se_label, _.se_label = null),
AtomicField(name = "se_property", _.se_property, _.se_property = null),
AtomicField(name = "tr_orderid", _.tr_orderid, _.tr_orderid = null),
AtomicField(name = "tr_affiliation", _.tr_affiliation, _.tr_affiliation = null),
AtomicField(name = "tr_city", _.tr_city, _.tr_city = null),
AtomicField(name = "tr_state", _.tr_state, _.tr_state = null),
AtomicField(name = "tr_country", _.tr_country, _.tr_country = null),
AtomicField(name = "ti_orderid", _.ti_orderid, _.ti_orderid = null),
AtomicField(name = "ti_sku", _.ti_sku, _.ti_sku = null),
AtomicField(name = "ti_name", _.ti_name, _.ti_name = null),
AtomicField(name = "ti_category", _.ti_category, _.ti_category = null),
AtomicField(name = "useragent", _.useragent, _.useragent = null),
AtomicField(name = "br_name", _.br_name, _.br_name = null),
AtomicField(name = "br_family", _.br_family, _.br_family = null),
AtomicField(name = "br_version", _.br_version, _.br_version = null),
AtomicField(name = "br_type", _.br_type, _.br_type = null),
AtomicField(name = "br_renderengine", _.br_renderengine, _.br_renderengine = null),
AtomicField(name = "br_lang", _.br_lang, _.br_lang = null),
AtomicField(name = "br_colordepth", _.br_colordepth, _.br_colordepth = null),
AtomicField(name = "os_name", _.os_name, _.os_name = null),
AtomicField(name = "os_family", _.os_family, _.os_family = null),
AtomicField(name = "os_manufacturer", _.os_manufacturer, _.os_manufacturer = null),
AtomicField(name = "os_timezone", _.os_timezone, _.os_timezone = null),
AtomicField(name = "dvce_type", _.dvce_type, _.dvce_type = null),
AtomicField(name = "doc_charset", _.doc_charset, _.doc_charset = null),
AtomicField(name = "tr_currency", _.tr_currency, _.tr_currency = null),
AtomicField(name = "ti_currency", _.ti_currency, _.ti_currency = null),
AtomicField(name = "base_currency", _.base_currency, _.base_currency = null),
AtomicField(name = "geo_timezone", _.geo_timezone, _.geo_timezone = null),
AtomicField(name = "mkt_clickid", _.mkt_clickid, _.mkt_clickid = null),
AtomicField(name = "mkt_network", _.mkt_network, _.mkt_network = null),
AtomicField(name = "etl_tags", _.etl_tags, _.etl_tags = null),
AtomicField(name = "refr_domain_userid", _.refr_domain_userid, _.refr_domain_userid = null),
AtomicField(name = "domain_sessionid", _.domain_sessionid, _.domain_sessionid = null),
AtomicField(name = "event_vendor", _.event_vendor, _.event_vendor = null),
AtomicField(name = "event_name", _.event_name, _.event_name = null),
AtomicField(name = "event_format", _.event_format, _.event_format = null),
AtomicField(name = "event_version", _.event_version, _.event_version = null),
AtomicField(name = "event_fingerprint", _.event_fingerprint, _.event_fingerprint = null),
)
// format: on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ object AtomicFieldsLengthValidator {
event: EnrichedEvent,
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
.map(validateField(event, _, emitIncomplete).toValidatedNel)
.combineAll match {
case Invalid(errors) if acceptInvalid =>
handleAcceptableErrors(invalidCount, event, errors) *> Monad[F].pure(Ior.Right(()))
Expand All @@ -51,17 +52,19 @@ object AtomicFieldsLengthValidator {

private def validateField(
event: EnrichedEvent,
atomicField: LimitedAtomicField
atomicField: LimitedAtomicField,
emitIncomplete: Boolean
): Either[ValidatorReport, Unit] = {
val actualValue = atomicField.value.enrichedValueExtractor(event)
if (actualValue != null && actualValue.length > atomicField.limit)
if (actualValue != null && actualValue.length > atomicField.limit) {
if (emitIncomplete) atomicField.value.nullify(event)
ValidatorReport(
s"Field is longer than maximum allowed size ${atomicField.limit}",
Some(atomicField.value.name),
Nil,
Some(actualValue)
).asLeft
else
} else
Right(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ object EnrichmentManager {
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
atomicFields,
emitIncomplete
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
Expand Down Expand Up @@ -230,12 +231,13 @@ object EnrichmentManager {
registryLookup: RegistryLookup[F],
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields, emitIncomplete)
.leftMap { v: Failure => NonEmptyList.one(v) }
} yield validContexts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,27 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
ko(s"[$other] is not a SchemaViolations bad row with 2 IgluError")
}
}

"remove an oversized atomic field if emitIncomplete is set to true" >> {
val enriched = EnrichmentManager
.enrichEvent[IO](
enrichmentReg,
client,
processor,
timestamp,
RawEvent(api, fatBody, None, source, context),
featureFlags = AcceptInvalid.featureFlags.copy(acceptInvalid = false),
IO.unit,
SpecHelpers.registryLookup,
atomicFieldLimits,
emitIncomplete = true
)

enriched.value.map {
case Ior.Both(_: BadRow.SchemaViolations, enriched) if Option(enriched.v_tracker).isEmpty => ok
case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without tracker version")
}
}
}

"setDerivedContexts" should {
Expand Down

0 comments on commit 9999d5e

Please sign in to comment.