Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(forms) Clean up form prompts on structured property deletion #12053

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
package com.linkedin.metadata.entity;

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataList;
import com.linkedin.data.DataMap;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.schema.PathSpec;
import com.linkedin.data.schema.grammar.PdlSchemaParser;
import com.linkedin.data.schema.resolver.DefaultDataSchemaResolver;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.Aspect;
import com.linkedin.form.FormInfo;
import com.linkedin.form.FormPrompt;
import com.linkedin.form.FormPromptArray;
import com.linkedin.form.StructuredPropertyParams;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.schema.SchemaMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import junit.framework.TestCase;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -359,4 +376,60 @@ public void testSchemaMetadataDelete() {
.get("tags"))
.size());
}

@Test
public void testRemovePromptsFromFormInfo() {
Urn deletedPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:1");
Urn existingPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:2");
List<FormPrompt> prompts = new ArrayList<>();
prompts.add(
new FormPrompt()
.setId("1")
.setStructuredPropertyParams(
new StructuredPropertyParams().setUrn(deletedPropertyUrn)));
prompts.add(
new FormPrompt()
.setId("2")
.setStructuredPropertyParams(
new StructuredPropertyParams().setUrn(existingPropertyUrn)));
FormInfo formInfo = new FormInfo().setPrompts(new FormPromptArray(prompts));

FormInfo updatedFormInfo =
DeleteEntityUtils.removePromptsFromFormInfoAspect(formInfo, deletedPropertyUrn);

assertEquals(updatedFormInfo.getPrompts().size(), 1);
assertEquals(
updatedFormInfo.getPrompts(),
formInfo.getPrompts().stream()
.filter(prompt -> !prompt.getId().equals("1"))
.collect(Collectors.toList()));
}

@Test
public void testFilterForStructuredPropDeletion() {
Urn deletedPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:1");

final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(
new Criterion()
.setField("structuredPropertyPromptUrns")
.setValues(new StringArray(deletedPropertyUrn.toString()))
.setNegated(false)
.setValue("")
.setCondition(Condition.EQUAL));
Filter expectedFilter =
new Filter()
.setOr(
new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(criterionArray)));

assertEquals(
DeleteEntityUtils.getFilterForStructuredPropertyDeletion(deletedPropertyUrn),
expectedFilter);
}

@Test
public void testEntityNamesForStructuredPropDeletion() {
assertEquals(
DeleteEntityUtils.getEntityNamesForStructuredPropertyDeletion(), ImmutableList.of("form"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ record FormPrompt {
/**
* The structured property that is required on this entity
*/
@Searchable = {
"fieldType": "URN",
"fieldName": "structuredPropertyPromptUrns",
}
urn: Urn
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices;
import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices;
import com.linkedin.metadata.boot.steps.RestoreFormInfoIndicesStep;
import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices;
import com.linkedin.metadata.boot.steps.WaitForSystemUpdateStep;
import com.linkedin.metadata.entity.AspectMigrationsDao;
Expand Down Expand Up @@ -110,6 +111,8 @@ protected BootstrapManager createInstance(
final WaitForSystemUpdateStep waitForSystemUpdateStep =
new WaitForSystemUpdateStep(_dataHubUpgradeKafkaListener, _configurationProvider);
final IngestEntityTypesStep ingestEntityTypesStep = new IngestEntityTypesStep(_entityService);
final RestoreFormInfoIndicesStep restoreFormInfoIndicesStep =
new RestoreFormInfoIndicesStep(_entityService);

final List<BootstrapStep> finalSteps =
new ArrayList<>(
Expand All @@ -124,7 +127,8 @@ protected BootstrapManager createInstance(
restoreDbtSiblingsIndices,
indexDataPlatformsStep,
restoreColumnLineageIndices,
ingestEntityTypesStep));
ingestEntityTypesStep,
restoreFormInfoIndicesStep));

return new BootstrapManager(finalSteps);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.linkedin.metadata.boot.steps;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.form.FormInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.query.ExtraInfo;
import io.datahubproject.metadata.context.OperationContext;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RestoreFormInfoIndicesStep extends UpgradeStep {
private static final String VERSION = "1";
private static final String UPGRADE_ID = "restore-form-info-indices";
private static final Integer BATCH_SIZE = 1000;

public RestoreFormInfoIndicesStep(@Nonnull final EntityService<?> entityService) {
super(entityService, VERSION, UPGRADE_ID);
}

@Override
public void upgrade(@Nonnull OperationContext systemOperationContext) throws Exception {
final AuditStamp auditStamp =
new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());

final int totalFormCount = getAndRestoreFormInfoIndices(systemOperationContext, 0, auditStamp);
int formCount = BATCH_SIZE;
while (formCount < totalFormCount) {
getAndRestoreFormInfoIndices(systemOperationContext, formCount, auditStamp);
formCount += BATCH_SIZE;
}
}

@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

private int getAndRestoreFormInfoIndices(
@Nonnull OperationContext systemOperationContext, int start, AuditStamp auditStamp) {
final AspectSpec formInfoAspectSpec =
systemOperationContext
.getEntityRegistry()
.getEntitySpec(Constants.FORM_ENTITY_NAME)
.getAspectSpec(Constants.FORM_INFO_ASPECT_NAME);

final ListResult<RecordTemplate> latestAspects =
entityService.listLatestAspects(
systemOperationContext,
Constants.FORM_ENTITY_NAME,
Constants.FORM_INFO_ASPECT_NAME,
start,
BATCH_SIZE);

if (latestAspects.getTotalCount() == 0
|| latestAspects.getValues() == null
|| latestAspects.getMetadata() == null) {
log.debug("Found 0 formInfo aspects for forms. Skipping migration.");
return 0;
}

if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) {
// Bad result -- we should log that we cannot migrate this batch of formInfos.
log.warn(
"Failed to match formInfo aspects with corresponding urns. Found mismatched length between aspects ({})"
+ "and metadata ({}) for metadata {}",
latestAspects.getValues().size(),
latestAspects.getMetadata().getExtraInfos().size(),
latestAspects.getMetadata());
return latestAspects.getTotalCount();
}

List<Future<?>> futures = new LinkedList<>();
for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate formInfoRecord = latestAspects.getValues().get(i);
Urn urn = info.getUrn();
FormInfo formInfo = (FormInfo) formInfoRecord;
if (formInfo == null) {
log.warn("Received null formInfo for urn {}", urn);
continue;
}

futures.add(
entityService
.alwaysProduceMCLAsync(
systemOperationContext,
urn,
Constants.FORM_ENTITY_NAME,
Constants.FORM_INFO_ASPECT_NAME,
formInfoAspectSpec,
null,
formInfo,
null,
null,
auditStamp,
ChangeType.RESTATE)
.getFirst());
}

futures.stream()
.filter(Objects::nonNull)
.forEach(
f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

return latestAspects.getTotalCount();
}
}
Loading
Loading