From c4ada540c4b1460377dd47e8dd6234314c0c7d4c Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:02:58 -0600 Subject: [PATCH] fix(entity-service): no-op batches (#12047) --- .../java/com/linkedin/metadata/Constants.java | 1 + .../metadata/entity/EntityServiceImpl.java | 188 ++++++++++-------- .../entity/EbeanEntityServiceTest.java | 99 ++++++++- 3 files changed, 201 insertions(+), 87 deletions(-) diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index 077e0e2b666be..9c608187342e8 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -51,6 +51,7 @@ public class Constants { // App sources public static final String UI_SOURCE = "ui"; public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate"; + public static final String METADATA_TESTS_SOURCE = "metadataTests"; /** Entities */ public static final String CORP_USER_ENTITY_NAME = "corpuser"; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index a0a55cf505cf3..bf3481205fb5a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -855,6 +855,7 @@ private List ingestAspectsToLocalDB( if (inputBatch.containsDuplicateAspects()) { log.warn(String.format("Batch contains duplicates: %s", inputBatch)); + MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc(); } return aspectDao @@ -928,6 +929,7 @@ private List ingestAspectsToLocalDB( // No changes, return if (changeMCPs.isEmpty()) { + MetricUtils.counter(EntityServiceImpl.class, "batch_empty").inc(); return Collections.emptyList(); } @@ -935,6 +937,7 @@ private List ingestAspectsToLocalDB( ValidationExceptionCollection exceptions = AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get()); if (!exceptions.isEmpty()) { + MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc(); throw new ValidationException(collectMetrics(exceptions).toString()); } @@ -972,10 +975,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP */ if (overwrite || databaseAspect == null) { result = - ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect) - .toBuilder() - .request(writeItem) - .build(); + Optional.ofNullable( + ingestAspectToLocalDB( + txContext, writeItem, databaseSystemAspect)) + .map( + optResult -> + optResult.toBuilder().request(writeItem).build()) + .orElse(null); } else { RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate(); @@ -996,49 +1002,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP return result; }) + .filter(Objects::nonNull) .collect(Collectors.toList()); - // commit upserts prior to retention or kafka send, if supported by impl - if (txContext != null) { - txContext.commitAndContinue(); - } - long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()); - if (took > DB_TIMER_LOG_THRESHOLD_MS) { - log.info("Ingestion of aspects batch to database took {} ms", took); - } + if (!upsertResults.isEmpty()) { + // commit upserts prior to retention or kafka send, if supported by impl + if (txContext != null) { + txContext.commitAndContinue(); + } + long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()); + if (took > DB_TIMER_LOG_THRESHOLD_MS) { + log.info("Ingestion of aspects batch to database took {} ms", took); + } - // Retention optimization and tx - if (retentionService != null) { - List retentionBatch = - upsertResults.stream() - // Only consider retention when there was a previous version - .filter( - result -> - batchAspects.containsKey(result.getUrn().toString()) - && batchAspects - .get(result.getUrn().toString()) - .containsKey(result.getRequest().getAspectName())) - .filter( - result -> { - RecordTemplate oldAspect = result.getOldValue(); - RecordTemplate newAspect = result.getNewValue(); - // Apply retention policies if there was an update to existing aspect - // value - return oldAspect != newAspect - && oldAspect != null - && retentionService != null; - }) - .map( - result -> - RetentionService.RetentionContext.builder() - .urn(result.getUrn()) - .aspectName(result.getRequest().getAspectName()) - .maxVersion(Optional.of(result.getMaxVersion())) - .build()) - .collect(Collectors.toList()); - retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch); + // Retention optimization and tx + if (retentionService != null) { + List retentionBatch = + upsertResults.stream() + // Only consider retention when there was a previous version + .filter( + result -> + batchAspects.containsKey(result.getUrn().toString()) + && batchAspects + .get(result.getUrn().toString()) + .containsKey(result.getRequest().getAspectName())) + .filter( + result -> { + RecordTemplate oldAspect = result.getOldValue(); + RecordTemplate newAspect = result.getNewValue(); + // Apply retention policies if there was an update to existing + // aspect + // value + return oldAspect != newAspect + && oldAspect != null + && retentionService != null; + }) + .map( + result -> + RetentionService.RetentionContext.builder() + .urn(result.getUrn()) + .aspectName(result.getRequest().getAspectName()) + .maxVersion(Optional.of(result.getMaxVersion())) + .build()) + .collect(Collectors.toList()); + retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch); + } else { + log.warn("Retention service is missing!"); + } } else { - log.warn("Retention service is missing!"); + MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc(); + log.warn("Empty transaction detected. {}", inputBatch); } return upsertResults; @@ -2506,7 +2519,7 @@ private Map getEnvelopedAspects( * @param databaseAspect The aspect as it exists in the database. * @return result object */ - @Nonnull + @Nullable private UpdateAspectResult ingestAspectToLocalDB( @Nullable TransactionContext txContext, @Nonnull final ChangeMCP writeItem, @@ -2520,6 +2533,9 @@ private UpdateAspectResult ingestAspectToLocalDB( .setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL); // 2. Compare the latest existing and new. + final RecordTemplate databaseValue = + databaseAspect == null ? null : databaseAspect.getRecordTemplate(); + final EntityAspect.EntitySystemAspect previousBatchAspect = (EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect(); final RecordTemplate previousValue = @@ -2528,7 +2544,7 @@ private UpdateAspectResult ingestAspectToLocalDB( // 3. If there is no difference between existing and new, we just update // the lastObserved in system metadata. RunId should stay as the original runId if (previousValue != null - && DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) { + && DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) { SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata(); latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved()); @@ -2564,45 +2580,49 @@ private UpdateAspectResult ingestAspectToLocalDB( } // 4. Save the newValue as the latest version - log.debug( - "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn()); - String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate()); - long versionOfOld = - aspectDao.saveLatestAspect( - txContext, - writeItem.getUrn().toString(), - writeItem.getAspectName(), - previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue), - previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(), - previousBatchAspect == null - ? null - : previousBatchAspect.getEntityAspect().getCreatedFor(), - previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(), - previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(), - newValueStr, - writeItem.getAuditStamp().getActor().toString(), - writeItem.getAuditStamp().hasImpersonator() - ? writeItem.getAuditStamp().getImpersonator().toString() - : null, - new Timestamp(writeItem.getAuditStamp().getTime()), - EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()), - writeItem.getNextAspectVersion()); - - // metrics - aspectDao.incrementWriteMetrics( - writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length); - - return UpdateAspectResult.builder() - .urn(writeItem.getUrn()) - .oldValue(previousValue) - .newValue(writeItem.getRecordTemplate()) - .oldSystemMetadata( - previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata()) - .newSystemMetadata(writeItem.getSystemMetadata()) - .operation(MetadataAuditOperation.UPDATE) - .auditStamp(writeItem.getAuditStamp()) - .maxVersion(versionOfOld) - .build(); + if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) { + log.debug( + "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn()); + String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate()); + long versionOfOld = + aspectDao.saveLatestAspect( + txContext, + writeItem.getUrn().toString(), + writeItem.getAspectName(), + previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue), + previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(), + previousBatchAspect == null + ? null + : previousBatchAspect.getEntityAspect().getCreatedFor(), + previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(), + previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(), + newValueStr, + writeItem.getAuditStamp().getActor().toString(), + writeItem.getAuditStamp().hasImpersonator() + ? writeItem.getAuditStamp().getImpersonator().toString() + : null, + new Timestamp(writeItem.getAuditStamp().getTime()), + EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()), + writeItem.getNextAspectVersion()); + + // metrics + aspectDao.incrementWriteMetrics( + writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length); + + return UpdateAspectResult.builder() + .urn(writeItem.getUrn()) + .oldValue(previousValue) + .newValue(writeItem.getRecordTemplate()) + .oldSystemMetadata( + previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata()) + .newSystemMetadata(writeItem.getSystemMetadata()) + .operation(MetadataAuditOperation.UPDATE) + .auditStamp(writeItem.getAuditStamp()) + .maxVersion(versionOfOld) + .build(); + } + + return null; } private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index f2ed2fddba765..a1000fd02abfe 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,8 +1,10 @@ package com.linkedin.metadata.entity; +import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME; +import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -19,6 +21,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringMap; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.AspectGenerationUtils; @@ -61,6 +64,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Triple; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -534,8 +538,8 @@ public void testBatchPatchWithTrailingNoOp() throws Exception { opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); assertEquals( envelopedAspect.getSystemMetadata().getVersion(), - "2", - "Expected version 2. 1 - Initial, + 1 batch operation (1 add, 1 remove)"); + "3", + "Expected version 3. 1 - Initial, + 1 add, 1 remove"); assertEquals( new GlobalTags(envelopedAspect.getValue().data()) .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), @@ -649,7 +653,7 @@ public void testBatchPatchAdd() throws Exception { EnvelopedAspect envelopedAspect = _entityServiceImpl.getLatestEnvelopedAspect( opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); - assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3"); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4"); assertEquals( new GlobalTags(envelopedAspect.getValue().data()) .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), @@ -657,6 +661,95 @@ public void testBatchPatchAdd() throws Exception { "Expected all tags"); } + @Test + public void testBatchPatchAddDuplicate() throws Exception { + Urn entityUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)"); + List initialTags = + List.of( + TagUrn.createFromString("urn:li:tag:__default_large_table"), + TagUrn.createFromString("urn:li:tag:__default_low_queries"), + TagUrn.createFromString("urn:li:tag:__default_low_changes"), + TagUrn.createFromString("urn:li:tag:!10TB+ tables")) + .stream() + .map(tag -> new TagAssociation().setTag(tag)) + .collect(Collectors.toList()); + TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+"); + + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + + SystemMetadata patchSystemMetadata = new SystemMetadata(); + patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1); + patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE))); + + ChangeItemImpl initialAspectTag1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags))) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + PatchItemImpl patchAdd2 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2))) + .build() + .getJsonPatch()) + .systemMetadata(patchSystemMetadata) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + // establish base entity + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(initialAspectTag1)) + .build(), + false, + true); + + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd2, patchAdd2)) // duplicate + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2)) + .collect(Collectors.toSet()), + "Expected all tags"); + } + @Test public void dataGeneratorThreadingTest() { DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);