Skip to content

Commit

Permalink
1576 - implementation of ingest relationships API and GET Relationshi…
Browse files Browse the repository at this point in the history
…ps API
  • Loading branch information
Vladysl committed Jan 22, 2024
1 parent dfca26e commit 48611e3
Show file tree
Hide file tree
Showing 22 changed files with 102 additions and 104 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
spring-webflux = '6.0.9'
reactor-extra = '3.5.1'
micrometer-registry-prometheus = '1.9.0'
ingestion-contract-server = '0.1.34'
ingestion-contract-server = '0.1.35'
oddrn-generator-java = '0.1.21'
odd-integration-manifests = '0.0.6'
apache-collections = '4.4'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import org.opendatadiscovery.oddplatform.api.contract.api.DataSetApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetStructure;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetVersionDiffList;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetRelationshipList;
import org.opendatadiscovery.oddplatform.api.contract.model.ERDRelationshipDetailsList;
import org.opendatadiscovery.oddplatform.api.contract.model.GraphRelationshipDetailsList;
import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipList;
import org.opendatadiscovery.oddplatform.service.DatasetVersionService;
import org.opendatadiscovery.oddplatform.service.ERDRelationshipsService;
import org.opendatadiscovery.oddplatform.service.GraphRelationshipsService;
Expand Down Expand Up @@ -55,8 +55,8 @@ public Mono<ResponseEntity<DataSetVersionDiffList>> getDataSetStructureDiff(fina
}

@Override
public Mono<ResponseEntity<RelationshipList>> getDataSetRelationships(final Long dataEntityId,
final ServerWebExchange exchange) {
public Mono<ResponseEntity<DatasetRelationshipList>> getDataSetRelationships(final Long dataEntityId,
final ServerWebExchange exchange) {
return relationshipsService.getRelationsByDatasetId(dataEntityId)
.map(ResponseEntity::ok);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSourceList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DatasetStatisticsList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.MetricSetList;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.RelationshipList;
import org.opendatadiscovery.oddplatform.service.DataEntityGroupService;
import org.opendatadiscovery.oddplatform.service.DataSourceIngestionService;
import org.opendatadiscovery.oddplatform.service.RelationshipsIngestionService;
import org.opendatadiscovery.oddplatform.service.ingestion.IngestionService;
import org.opendatadiscovery.oddplatform.service.ingestion.metric.IngestionMetricsService;
import org.springframework.http.HttpStatus;
Expand All @@ -32,6 +34,7 @@ public class IngestionController implements IngestionApi {
private final IngestionService ingestionService;
private final DataEntityGroupService dataEntityGroupService;
private final DataSourceIngestionService dataSourceIngestionService;
private final RelationshipsIngestionService relationshipsIngestionService;
private final IngestionMetricsService ingestionMetricsService;

@Override
Expand All @@ -44,6 +47,16 @@ public Mono<ResponseEntity<Void>> postDataEntityList(final Mono<DataEntityList>
.thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<Void>> postRelationshipList(final Mono<RelationshipList> relationshipList,
final ServerWebExchange exchange) {
return relationshipList
.filter(items -> CollectionUtils.isNotEmpty(items.getItems()))
.switchIfEmpty(Mono.error(() -> new BadUserRequestException("Ingestion payload is empty")))
.flatMap(relationshipsIngestionService::ingestRelationships)
.thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<Void>> createDataSource(final Mono<DataSourceList> dataSourceList,
final ServerWebExchange exchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
import org.opendatadiscovery.oddplatform.dto.DataEntityTypeDto;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataSetFieldEnumValue;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.ErdRelationshipPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.GraphRelationshipPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipPojo;

@Data
@Builder
Expand Down Expand Up @@ -43,9 +40,7 @@ public class DataEntityIngestionDto {
public record DataSetIngestionDto(String parentDatasetOddrn,
List<DatasetFieldIngestionDto> fieldList,
String structureHash,
Long rowsCount,
Map<RelationshipPojo, List<ErdRelationshipPojo>> erdRelationDto,
Map<RelationshipPojo, GraphRelationshipPojo> graphRelationDto) {
Long rowsCount) {
}

public record DataTransformerIngestionDto(List<String> sourceList, List<String> targetList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetRelationship;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetRelationshipList;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetRelationshipType;
import org.opendatadiscovery.oddplatform.api.contract.model.PageInfo;
import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipList;
import org.opendatadiscovery.oddplatform.dto.RelationshipDto;

@Mapper(config = MapperConfig.class,
Expand All @@ -15,8 +15,8 @@
DataEntityMapper.class,
})
public abstract class RelationshipMapper {
public RelationshipList mapListToRelationshipList(final List<RelationshipDto> relationshipDtos) {
return new RelationshipList()
public DatasetRelationshipList mapListToRelationshipList(final List<RelationshipDto> relationshipDtos) {
return new DatasetRelationshipList()
.items(mapToRelationshipList(relationshipDtos))
.pageInfo(new PageInfo().total((long) relationshipDtos.size()).hasNext(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipPojo;

public interface DatasetERDRelationIngestionMapper {
Map<RelationshipPojo, List<ErdRelationshipPojo>> mapERDRelations(List<Relationship> erdRelationship);
Map<RelationshipPojo, List<ErdRelationshipPojo>> mapERDRelations(final List<Relationship> erdRelationship,
final Long dataSourceId);

List<ErdRelationshipPojo> mapERDRelation(ERDRelationship field);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ public class DatasetERDRelationIngestionMapperImpl implements DatasetERDRelation
private final DatasetRelationIngestionMapper relationIngestionMapper;

@Override
public Map<RelationshipPojo, List<ErdRelationshipPojo>> mapERDRelations(final List<Relationship> relationships) {
public Map<RelationshipPojo, List<ErdRelationshipPojo>> mapERDRelations(final List<Relationship> relationships,
final Long dataSourceId) {
if (CollectionUtils.isEmpty(relationships)) {
return Map.of();
}

return relationships.stream()
.filter(item -> item.getErdRelationship() != null)
.collect(Collectors.toMap(relationIngestionMapper::mapToPojo,
.collect(Collectors.toMap(item -> relationIngestionMapper.mapToPojo(item, dataSourceId),
relationship -> new ArrayList<>(mapERDRelation(relationship.getErdRelationship())), (a, b) -> b));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipPojo;

public interface DatasetGraphRelationIngestionMapper {
Map<RelationshipPojo, GraphRelationshipPojo> mapGraphRelations(List<Relationship> relationships);
Map<RelationshipPojo, GraphRelationshipPojo> mapGraphRelations(final List<Relationship> relationships,
final Long dataSourceId);

GraphRelationshipPojo mapGraphRelation(GraphRelationship field);
GraphRelationshipPojo mapGraphRelation(final GraphRelationship field);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ public class DatasetGraphRelationIngestionMapperImpl implements DatasetGraphRela
private final DatasetRelationIngestionMapper relationIngestionMapper;

@Override
public Map<RelationshipPojo, GraphRelationshipPojo> mapGraphRelations(final List<Relationship> relationships) {
public Map<RelationshipPojo, GraphRelationshipPojo> mapGraphRelations(final List<Relationship> relationships,
final Long dataSourceId) {
if (CollectionUtils.isEmpty(relationships)) {
return Map.of();
}

return relationships.stream()
.filter(item -> item.getGraphRelationship() != null)
.collect(Collectors.toMap(relationIngestionMapper::mapToPojo,
.collect(Collectors.toMap(item -> relationIngestionMapper.mapToPojo(item, dataSourceId),
relationship -> mapGraphRelation(relationship.getGraphRelationship()), (a, b) -> b));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipPojo;

public interface DatasetRelationIngestionMapper {
RelationshipPojo mapToPojo(Relationship relationship);
RelationshipPojo mapToPojo(final Relationship relationship, final Long dataSourceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
@RequiredArgsConstructor
public class DatasetRelationIngestionMapperImpl implements DatasetRelationIngestionMapper {
@Override
public RelationshipPojo mapToPojo(final Relationship relationship) {
public RelationshipPojo mapToPojo(final Relationship relationship, final Long dataSourceId) {
if (relationship == null) {
return null;
}

final RelationshipPojo relationshipPojo = new RelationshipPojo();

relationshipPojo.setName(relationship.getName());
relationshipPojo.setDataSourceId(dataSourceId);
relationshipPojo.setRelationshipOddrn(relationship.getOddrn());
relationshipPojo.setIsManualyCreated(false);
relationshipPojo.setSourceDatasetOddrn(relationship.getSourceDatasetOddrn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
import org.opendatadiscovery.oddplatform.ingestion.contract.model.DataTransformerRun;
import org.opendatadiscovery.oddplatform.ingestion.contract.model.Tag;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.ErdRelationshipPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.GraphRelationshipPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipPojo;
import org.opendatadiscovery.oddplatform.service.ingestion.DatasetVersionHashCalculator;
import org.opendatadiscovery.oddplatform.service.ingestion.util.DateTimeUtil;
import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils;
Expand Down Expand Up @@ -82,8 +79,6 @@
@Slf4j
public class IngestionMapperImpl implements IngestionMapper {
private final DatasetFieldIngestionMapper datasetFieldIngestionMapper;
private final DatasetERDRelationIngestionMapper erdRelationIngestionMapper;
private final DatasetGraphRelationIngestionMapper graphRelationIngestionMapper;
private final DatasetVersionHashCalculator datasetVersionHashCalculator;

private static final List<Pair<Predicate<DataEntity>, DataEntityClassDto>> ENTITY_CLASS_DISCRIMINATOR = List.of(
Expand Down Expand Up @@ -263,20 +258,13 @@ private DataSetIngestionDto createDatasetIngestionDto(final DataEntity dataEntit
""", dataEntity.getOddrn());
}

final Map<RelationshipPojo, List<ErdRelationshipPojo>> erdMap =
erdRelationIngestionMapper.mapERDRelations(dataset.getRelationshipsList());
final Map<RelationshipPojo, GraphRelationshipPojo> grapMap =
graphRelationIngestionMapper.mapGraphRelations(dataset.getRelationshipsList());

final String structureHash = datasetVersionHashCalculator.calculateStructureHash(dataset.getFieldList());

return new DataSetIngestionDto(
dataset.getParentOddrn(),
datasetFieldIngestionMapper.mapFields(dataset.getFieldList()),
structureHash,
dataset.getRowsNumber(),
erdMap,
grapMap
dataset.getRowsNumber()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public interface ReactiveRelationshipsRepository extends ReactiveCRUDRepository<

Flux<RelationshipPojo> getRelationshipByDatasetOddrns(final Set<String> oddrns);

Flux<RelationshipPojo> getRelationshipByDataSourceId(final Long dataSourceId);

Flux<RelationshipDto> getRelationsByDatasetId(final Long dataEntityId);

Mono<RelationshipDto> getRelationshipById(Long relationshipId);
Mono<RelationshipDto> getRelationshipById(final Long relationshipId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.jooq.impl.DSL.select;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.ERD_RELATIONSHIP;
import static org.opendatadiscovery.oddplatform.model.Tables.RELATIONSHIP;
Expand Down Expand Up @@ -53,6 +54,14 @@ public Flux<RelationshipPojo> getRelationshipByDatasetOddrns(final Set<String> o
.map(r -> r.into(RelationshipPojo.class));
}

@Override
public Flux<RelationshipPojo> getRelationshipByDataSourceId(final Long dataSourceId) {
return jooqReactiveOperations.flux(select(RELATIONSHIP)
.from(RELATIONSHIP)
.where(RELATIONSHIP.DATA_SOURCE_ID.eq(dataSourceId)))
.map(r -> r.into(RelationshipPojo.class));
}

@Override
public Flux<RelationshipDto> getRelationsByDatasetId(final Long dataEntityId) {
final SelectConditionStep<Record1<String>> dataEntitySelect = DSL.select(DATA_ENTITY.ODDRN)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opendatadiscovery.oddplatform.service;

import org.opendatadiscovery.oddplatform.ingestion.contract.model.RelationshipList;
import reactor.core.publisher.Mono;

public interface RelationshipsIngestionService {
Mono<Void> ingestRelationships(final RelationshipList item);
}
Loading

0 comments on commit 48611e3

Please sign in to comment.