Skip to content

Commit

Permalink
feat(businessAttribute): generate platform events on association/remo…
Browse files Browse the repository at this point in the history
…val of businessattributes with schemaField
  • Loading branch information
deepgarg-visa committed Dec 26, 2024
1 parent 756b199 commit 05673f1
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.metadata.timeline.data.dataset.schema;

import com.google.common.collect.ImmutableMap;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import com.linkedin.metadata.timeline.data.SemanticChangeType;
import lombok.Builder;

public class SchemaFieldBusinessAttributeChangeEvent extends ChangeEvent {
@Builder(builderMethodName = "schemaFieldBusinessAttributeChangeEventBuilder")
public SchemaFieldBusinessAttributeChangeEvent(
String entityUrn,
ChangeCategory category,
ChangeOperation operation,
String modifier,
AuditStamp auditStamp,
SemanticChangeType semVerChange,
String description,
Urn parentUrn,
Urn businessAttributeUrn,
Urn datasetUrn) {
super(

Check warning on line 25 in metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java#L25

Added line #L25 was not covered by tests
entityUrn,
category,
operation,
modifier,
ImmutableMap.of(
"parentUrn", parentUrn.toString(),
"businessAttributeUrn", businessAttributeUrn.toString(),
"datasetUrn", datasetUrn.toString()),

Check warning on line 33 in metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java#L30-L33

Added lines #L30 - L33 were not covered by tests
auditStamp,
semVerChange,
description);
}

Check warning on line 37 in metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/data/dataset/schema/SchemaFieldBusinessAttributeChangeEvent.java#L37

Added line #L37 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.linkedin.businessattribute.BusinessAttributeAssociation;
import com.linkedin.businessattribute.BusinessAttributes;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import com.linkedin.metadata.timeline.data.SemanticChangeType;
import com.linkedin.metadata.timeline.data.dataset.schema.SchemaFieldBusinessAttributeChangeEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BusinessAttributesChangeEventGenerator

Check warning on line 19 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L18-L19

Added lines #L18 - L19 were not covered by tests
extends EntityChangeEventGenerator<BusinessAttributes> {

private static final String BUSINESS_ATTRIBUTE_ADDED_FORMAT =
"BusinessAttribute '%s' added to entity '%s'.";
private static final String BUSINESS_ATTRIBUTE_REMOVED_FORMAT =
"BusinessAttribute '%s' removed from entity '%s'.";

@Override
public List<ChangeEvent> getChangeEvents(
@Nonnull Urn urn,
@Nonnull String entityName,
@Nonnull String aspectName,
@Nonnull Aspect<BusinessAttributes> from,
@Nonnull Aspect<BusinessAttributes> to,
@Nonnull AuditStamp auditStamp) {
log.debug(

Check warning on line 35 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L35

Added line #L35 was not covered by tests
"Calling BusinessAttributesChangeEventGenerator for entity {} and aspect {}",
entityName,
aspectName);
return computeDiff(urn, entityName, aspectName, from.getValue(), to.getValue(), auditStamp);

Check warning on line 39 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L39

Added line #L39 was not covered by tests
}

private List<ChangeEvent> computeDiff(
Urn urn,
String entityName,
String aspectName,
BusinessAttributes previousValue,
BusinessAttributes newValue,
AuditStamp auditStamp) {
List<ChangeEvent> changeEvents = new ArrayList<>();

Check warning on line 49 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L49

Added line #L49 was not covered by tests

BusinessAttributeAssociation previousAssociation =
previousValue != null ? previousValue.getBusinessAttribute() : null;
BusinessAttributeAssociation newAssociation =
newValue != null ? newValue.getBusinessAttribute() : null;

if (Objects.nonNull(previousAssociation) && Objects.isNull(newAssociation)) {
changeEvents.add(
createChangeEvent(

Check warning on line 58 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L57-L58

Added lines #L57 - L58 were not covered by tests
previousAssociation,
urn,
ChangeOperation.REMOVE,
BUSINESS_ATTRIBUTE_REMOVED_FORMAT,
auditStamp));

} else if (Objects.isNull(previousAssociation) && Objects.nonNull(newAssociation)) {
changeEvents.add(
createChangeEvent(

Check warning on line 67 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L66-L67

Added lines #L66 - L67 were not covered by tests
newAssociation,
urn,
ChangeOperation.ADD,
BUSINESS_ATTRIBUTE_ADDED_FORMAT,
auditStamp));
}
return changeEvents;

Check warning on line 74 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L74

Added line #L74 was not covered by tests
}

private ChangeEvent createChangeEvent(
BusinessAttributeAssociation businessAttributeAssociation,
Urn entityUrn,
ChangeOperation changeOperation,
String format,
AuditStamp auditStamp) {
return SchemaFieldBusinessAttributeChangeEvent.schemaFieldBusinessAttributeChangeEventBuilder()
.entityUrn(entityUrn.toString())
.category(ChangeCategory.BUSINESS_ATTRIBUTE)
.operation(changeOperation)
.modifier(businessAttributeAssociation.getBusinessAttributeUrn().toString())
.auditStamp(auditStamp)
.semVerChange(SemanticChangeType.MINOR)
.description(
String.format(
format, businessAttributeAssociation.getBusinessAttributeUrn().getId(), entityUrn))
.parentUrn(entityUrn)
.businessAttributeUrn(businessAttributeAssociation.getBusinessAttributeUrn())
.datasetUrn(entityUrn.getIdAsUrn())
.build();

Check warning on line 96 in metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java

View check run for this annotation

Codecov / codecov/patch

metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/BusinessAttributesChangeEventGenerator.java#L83-L96

Added lines #L83 - L96 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.metadata.kafka.hook.event;

import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
Expand All @@ -27,6 +25,7 @@
import com.linkedin.platform.event.v1.Parameters;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -65,6 +64,7 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
Constants.ASSERTION_RUN_EVENT_ASPECT_NAME,
Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
Constants.BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME,
Constants.BUSINESS_ATTRIBUTE_ASPECT,

// Entity Lifecycle Event
Constants.DATASET_KEY_ASPECT_NAME,
Expand All @@ -83,13 +83,12 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
private static final Set<String> SUPPORTED_OPERATIONS =
ImmutableSet.of("CREATE", "UPSERT", "DELETE");

private static final Set<String> ENTITY_EXCLUSIONS = ImmutableSet.of(SCHEMA_FIELD_ENTITY_NAME);

private final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry;
private final OperationContext systemOperationContext;
private final SystemEntityClient systemEntityClient;
private final Boolean isEnabled;
@Getter private final String consumerGroupSuffix;
private final List<String> entityExclusions;

@Autowired
public EntityChangeEventGeneratorHook(
Expand All @@ -98,13 +97,16 @@ public EntityChangeEventGeneratorHook(
final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry,
@Nonnull final SystemEntityClient entityClient,
@Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled,
@Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix) {
@Nonnull @Value("${entityChangeEvents.consumerGroupSuffix}") String consumerGroupSuffix,
@Nonnull @Value("#{'${entityChangeEvents.entityExclusions}'.split(',')}")
List<String> entityExclusions) {
this.systemOperationContext = systemOperationContext;
this.entityChangeEventGeneratorRegistry =
Objects.requireNonNull(entityChangeEventGeneratorRegistry);
this.systemEntityClient = Objects.requireNonNull(entityClient);
this.isEnabled = isEnabled;
this.consumerGroupSuffix = consumerGroupSuffix;
this.entityExclusions = entityExclusions;
}

@VisibleForTesting
Expand All @@ -113,7 +115,13 @@ public EntityChangeEventGeneratorHook(
@Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry,
@Nonnull final SystemEntityClient entityClient,
@Nonnull Boolean isEnabled) {
this(systemOperationContext, entityChangeEventGeneratorRegistry, entityClient, isEnabled, "");
this(
systemOperationContext,
entityChangeEventGeneratorRegistry,
entityClient,
isEnabled,
"",
Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -202,7 +210,7 @@ private <T extends RecordTemplate> List<ChangeEvent> generateChangeEvents(
private boolean isEligibleForProcessing(final MetadataChangeLog log) {
return SUPPORTED_OPERATIONS.contains(log.getChangeType().toString())
&& SUPPORTED_ASPECT_NAMES.contains(log.getAspectName())
&& !ENTITY_EXCLUSIONS.contains(log.getEntityType());
&& !entityExclusions.contains(log.getEntityType());
}

private void emitPlatformEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ featureFlags:
entityChangeEvents:
enabled: ${ENABLE_ENTITY_CHANGE_EVENTS_HOOK:true}
consumerGroupSuffix: ${ECE_CONSUMER_GROUP_SUFFIX:}
entityExclusions: ${ECE_ENTITY_EXCLUSIONS:schemaField} # provides a comma separated list of entities to exclude from the ECE hook

views:
enabled: ${VIEWS_ENABLED:true}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.timeline.eventgenerator.AssertionRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeAssociationChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributeInfoChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.BusinessAttributesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DataProcessInstanceRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DatasetPropertiesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DeprecationChangeEventGenerator;
Expand Down Expand Up @@ -59,6 +60,7 @@ protected EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry(
BUSINESS_ATTRIBUTE_INFO_ASPECT_NAME, new BusinessAttributeInfoChangeEventGenerator());
registry.register(
BUSINESS_ATTRIBUTE_ASSOCIATION, new BusinessAttributeAssociationChangeEventGenerator());
registry.register(BUSINESS_ATTRIBUTE_ASPECT, new BusinessAttributesChangeEventGenerator());

Check warning on line 63 in metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java

View check run for this annotation

Codecov / codecov/patch

metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeline/eventgenerator/EntityChangeEventGeneratorRegistryFactory.java#L63

Added line #L63 was not covered by tests

// Entity Lifecycle Differs
registry.register(DATASET_KEY_ASPECT_NAME, new EntityKeyChangeEventGenerator<>());
Expand Down

0 comments on commit 05673f1

Please sign in to comment.