Skip to content

Commit

Permalink
fix(entity-service): no-op batches
Browse files Browse the repository at this point in the history
Refactor entityService function to filter no-op upserts. Upserts which would result in no change to the row. Still allows for updating last observed.
  • Loading branch information
david-leifker committed Dec 5, 2024
1 parent 3c388a5 commit ff09983
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class Constants {
// App sources
public static final String UI_SOURCE = "ui";
public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate";
public static final String METADATA_TESTS_SOURCE = "metadataTests";

/** Entities */
public static final String CORP_USER_ENTITY_NAME = "corpuser";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,10 +972,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
*/
if (overwrite || databaseAspect == null) {
result =
ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect)
.toBuilder()
.request(writeItem)
.build();
Optional.ofNullable(
ingestAspectToLocalDB(
txContext, writeItem, databaseSystemAspect))
.map(
optResult ->
optResult.toBuilder().request(writeItem).build())
.orElse(null);

} else {
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
Expand All @@ -996,49 +999,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP

return result;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
}
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
log.info("Ingestion of aspects batch to database took {} ms", took);
}

// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
if (!upsertResults.isEmpty()) {
// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
}
log.info(
"Ingestion of aspects batch to database took {} in {} ms",
upsertResults.size(),
TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()));

// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing
// aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
} else {
log.warn("Retention service is missing!");
}
} else {
log.warn("Retention service is missing!");
log.warn("Empty transaction detected. {}", inputBatch);
}

return upsertResults;
Expand Down Expand Up @@ -2506,7 +2516,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(
* @param databaseAspect The aspect as it exists in the database.
* @return result object
*/
@Nonnull
@Nullable
private UpdateAspectResult ingestAspectToLocalDB(
@Nullable TransactionContext txContext,
@Nonnull final ChangeMCP writeItem,
Expand All @@ -2520,6 +2530,9 @@ private UpdateAspectResult ingestAspectToLocalDB(
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);

// 2. Compare the latest existing and new.
final RecordTemplate databaseValue =
databaseAspect == null ? null : databaseAspect.getRecordTemplate();

final EntityAspect.EntitySystemAspect previousBatchAspect =
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
final RecordTemplate previousValue =
Expand All @@ -2528,7 +2541,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (previousValue != null
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {

SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
Expand Down Expand Up @@ -2564,45 +2577,49 @@ private UpdateAspectResult ingestAspectToLocalDB(
}

// 4. Save the newValue as the latest version
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
}

return null;
}

private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.linkedin.metadata.entity;

import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
Expand All @@ -19,6 +21,7 @@
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringMap;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.AspectGenerationUtils;
Expand Down Expand Up @@ -61,6 +64,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -534,8 +538,8 @@ public void testBatchPatchWithTrailingNoOp() throws Exception {
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(
envelopedAspect.getSystemMetadata().getVersion(),
"2",
"Expected version 2. 1 - Initial, + 1 batch operation (1 add, 1 remove)");
"3",
"Expected version 3. 1 - Initial, + 1 add, 1 remove");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Expand Down Expand Up @@ -649,14 +653,103 @@ public void testBatchPatchAdd() throws Exception {
EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Set.of(tag1, tag2, tag3),
"Expected all tags");
}

@Test
public void testBatchPatchAddDuplicate() throws Exception {
Urn entityUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)");
List<TagAssociation> initialTags =
List.of(
TagUrn.createFromString("urn:li:tag:__default_large_table"),
TagUrn.createFromString("urn:li:tag:__default_low_queries"),
TagUrn.createFromString("urn:li:tag:__default_low_changes"),
TagUrn.createFromString("urn:li:tag:!10TB+ tables"))
.stream()
.map(tag -> new TagAssociation().setTag(tag))
.collect(Collectors.toList());
TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+");

SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();

SystemMetadata patchSystemMetadata = new SystemMetadata();
patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1);
patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE)));

ChangeItemImpl initialAspectTag1 =
ChangeItemImpl.builder()
.urn(entityUrn)
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags)))
.systemMetadata(systemMetadata.copy())
.auditStamp(TEST_AUDIT_STAMP)
.build(TestOperationContexts.emptyAspectRetriever(null));

PatchItemImpl patchAdd2 =
PatchItemImpl.builder()
.urn(entityUrn)
.entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME))
.aspectName(GLOBAL_TAGS_ASPECT_NAME)
.aspectSpec(
_testEntityRegistry
.getEntitySpec(DATASET_ENTITY_NAME)
.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(Map.of("properties", List.of("tag")))
.patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2)))
.build()
.getJsonPatch())
.systemMetadata(patchSystemMetadata)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(_testEntityRegistry);

// establish base entity
_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(initialAspectTag1))
.build(),
false,
true);

_entityServiceImpl.ingestAspects(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.items(List.of(patchAdd2, patchAdd2)) // duplicate
.build(),
false,
true);

// List aspects urns
ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1);

assertEquals(batch.getStart().intValue(), 0);
assertEquals(batch.getCount().intValue(), 1);
assertEquals(batch.getTotal().intValue(), 1);
assertEquals(batch.getEntities().size(), 1);
assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());

EnvelopedAspect envelopedAspect =
_entityServiceImpl.getLatestEnvelopedAspect(
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
assertEquals(
new GlobalTags(envelopedAspect.getValue().data())
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2))
.collect(Collectors.toSet()),
"Expected all tags");
}

@Test
public void dataGeneratorThreadingTest() {
DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);
Expand Down

0 comments on commit ff09983

Please sign in to comment.