Skip to content

Commit

Permalink
feat(models): update mlflow-related mappers (#12263)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <[email protected]>
Co-authored-by: RyanHolstien <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent e34b2e4 commit ddd0d21
Show file tree
Hide file tree
Showing 35 changed files with 2,046 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataQualityContract;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
Expand Down Expand Up @@ -346,6 +347,7 @@
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataplatforminstance.DataPlatformInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.DataProcessInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataproduct.DataProductType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
Expand Down Expand Up @@ -530,6 +532,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DataProcessInstanceType dataProcessInstanceType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -649,6 +652,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dataProcessInstanceType = new DataProcessInstanceType(entityClient, featureFlags);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -699,7 +703,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dataProcessInstanceType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down Expand Up @@ -1024,6 +1029,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("tag", getResolver(tagType))
.dataFetcher("dataFlow", getResolver(dataFlowType))
.dataFetcher("dataJob", getResolver(dataJobType))
.dataFetcher("dataProcessInstance", getResolver(dataProcessInstanceType))
.dataFetcher("glossaryTerm", getResolver(glossaryTermType))
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
Expand Down Expand Up @@ -3058,6 +3064,35 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
"DataProcessInstance",
typeWiring ->
typeWiring
.dataFetcher(
"dataPlatformInstance",
new LoadableTypeResolver<>(
dataPlatformInstanceType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getDataPlatformInstance() != null
? dataProcessInstance.getDataPlatformInstance().getUrn()
: null;
}))
.dataFetcher(
"platform",
new LoadableTypeResolver<>(
dataPlatformType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getPlatform() != null
? dataProcessInstance.getPlatform().getUrn()
: null;
}))
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
.dataFetcher(
"container",
new LoadableTypeResolver<>(
containerType,
(env) -> {
final DataProcessInstance dpi = env.getSource();
return dpi.getContainer() != null ? dpi.getContainer().getUrn() : null;
}))
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"lineage",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
Expand Down Expand Up @@ -28,6 +29,11 @@ public DataPlatformInstance apply(
result.setType(EntityType.DATA_PLATFORM_INSTANCE);
result.setUrn(input.getInstance().toString());
}
result.setPlatform(
DataPlatform.builder()
.setUrn(input.getPlatform().toString())
.setType(EntityType.DATA_PLATFORM)
.build());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.TimeStamp;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import javax.annotation.Nullable;

public class TimeStampToAuditStampMapper {

public static final TimeStampToAuditStampMapper INSTANCE = new TimeStampToAuditStampMapper();

public static AuditStamp map(
@Nullable final QueryContext context, @Nullable final TimeStamp input) {
if (input == null) {
return null;
}
final AuditStamp result = new AuditStamp();
result.setTime(input.getTime());
if (input.hasActor()) {
result.setActor(input.getActor().toString());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.Domain;
Expand Down Expand Up @@ -225,6 +226,11 @@ public Entity apply(@Nullable QueryContext context, Urn input) {
((BusinessAttribute) partialEntity).setUrn(input.toString());
((BusinessAttribute) partialEntity).setType(EntityType.BUSINESS_ATTRIBUTE);
}
if (input.getEntityType().equals(DATA_PROCESS_INSTANCE_ENTITY_NAME)) {
partialEntity = new DataProcessInstance();
((DataProcessInstance) partialEntity).setUrn(input.toString());
((DataProcessInstance) partialEntity).setType(EntityType.DATA_PROCESS_INSTANCE);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.types.dataprocessinst;

import static com.linkedin.metadata.Constants.*;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import graphql.execution.DataFetcherResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class DataProcessInstanceType
implements com.linkedin.datahub.graphql.types.EntityType<DataProcessInstance, String> {

public static final Set<String> ASPECTS_TO_FETCH =
ImmutableSet.of(
DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
TEST_RESULTS_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
SUB_TYPES_ASPECT_NAME,
CONTAINER_ASPECT_NAME);

private final EntityClient _entityClient;
private final FeatureFlags _featureFlags;

@Override
public EntityType type() {
return EntityType.DATA_PROCESS_INSTANCE;
}

@Override
public Function<Entity, String> getKeyProvider() {
return Entity::getUrn;
}

@Override
public Class<DataProcessInstance> objectClass() {
return DataProcessInstance.class;
}

@Override
public List<DataFetcherResult<DataProcessInstance>> batchLoad(
@Nonnull List<String> urns, @Nonnull QueryContext context) throws Exception {
final List<Urn> dataProcessInstanceUrns =
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
Map<Urn, EntityResponse> entities = new HashMap<>();
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
entities =
_entityClient.batchGetV2(
context.getOperationContext(),
DATA_PROCESS_INSTANCE_ENTITY_NAME,
new HashSet<>(dataProcessInstanceUrns),
ASPECTS_TO_FETCH);
}

final List<EntityResponse> gmsResults = new ArrayList<>();
for (Urn urn : dataProcessInstanceUrns) {
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
gmsResults.add(entities.getOrDefault(urn, null));
}
}

return gmsResults.stream()
.map(
gmsResult ->
gmsResult == null
? null
: DataFetcherResult.<DataProcessInstance>newResult()
.data(DataProcessInstanceMapper.map(context, gmsResult))
.build())
.collect(Collectors.toList());

} catch (Exception e) {
throw new RuntimeException("Failed to load Data Process Instance entity", e);
}
}
}
Loading

0 comments on commit ddd0d21

Please sign in to comment.