Skip to content

Commit

Permalink
1416 - added datasetField to term usage count. (#1457)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladysl authored Oct 5, 2023
1 parent 1f78288 commit b482cbe
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.opendatadiscovery.oddplatform.api.contract.api.TermApi;
import org.opendatadiscovery.oddplatform.api.contract.model.CountableSearchFilter;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldList;
import org.opendatadiscovery.oddplatform.api.contract.model.MultipleFacetType;
import org.opendatadiscovery.oddplatform.api.contract.model.Ownership;
import org.opendatadiscovery.oddplatform.api.contract.model.OwnershipFormData;
Expand All @@ -19,6 +20,7 @@
import org.opendatadiscovery.oddplatform.api.contract.model.TermSearchFacetsData;
import org.opendatadiscovery.oddplatform.api.contract.model.TermSearchFormData;
import org.opendatadiscovery.oddplatform.service.DataEntityService;
import org.opendatadiscovery.oddplatform.service.DatasetFieldService;
import org.opendatadiscovery.oddplatform.service.term.TermOwnershipService;
import org.opendatadiscovery.oddplatform.service.term.TermSearchService;
import org.opendatadiscovery.oddplatform.service.term.TermService;
Expand All @@ -34,6 +36,7 @@ public class TermController implements TermApi {

private final TermService termService;
private final DataEntityService dataEntityService;
private final DatasetFieldService datasetFieldService;
private final TermSearchService termSearchService;
private final TermOwnershipService termOwnershipService;

Expand Down Expand Up @@ -94,6 +97,16 @@ public Mono<ResponseEntity<DataEntityList>> getTermLinkedEntities(final Long ter
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<DatasetFieldList>> getTermLinkedColumns(final Long termId, final Integer page,
final Integer size,
final String query,
final ServerWebExchange exchange) {
return datasetFieldService
.listByTerm(termId, query, page, size)
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Flux<Tag>>> createTermTagsRelations(final Long termId,
final Mono<TagsFormData> tagsFormData,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.opendatadiscovery.oddplatform.dto;

import java.util.List;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.jooq.JSONB;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataSourcePojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.NamespacePojo;

@EqualsAndHashCode
@ToString
@Data
@Builder
public class DatasetFieldTermsDto {
private Long id;
private String name;
private String internalName;
private String oddrn;
private JSONB type;
private Boolean isKey;
private Boolean isValue;
private String externalDescription;
private String internalDescription;
private Boolean isPrimaryKey;
private Boolean isSortKey;
private String defaultValue;
private List<OwnershipDto> ownership;
private DataSourcePojo dataSource;
private String dataEntityName;
private Long dataEntityId;
private NamespacePojo namespace;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldList;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldTermsDto;

public interface DatasetFieldListMapper {
DatasetFieldList mapPojos(List<DatasetFieldTermsDto> item);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetFieldType;
import org.opendatadiscovery.oddplatform.api.contract.model.DatasetFieldList;
import org.opendatadiscovery.oddplatform.api.contract.model.PageInfo;
import org.opendatadiscovery.oddplatform.api.contract.model.TermSearchDataSetField;
import org.opendatadiscovery.oddplatform.dto.DataSourceDto;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldTermsDto;
import org.opendatadiscovery.oddplatform.utils.JSONSerDeUtils;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class DatasetFieldListMapperImpl implements DatasetFieldListMapper {
private final DataSourceMapper dataSourceMapper;
private final OwnershipMapper ownershipMapper;

@Override
public DatasetFieldList mapPojos(final List<DatasetFieldTermsDto> dataFieldsDto) {
final List<TermSearchDataSetField> entities = dataFieldsDto.stream().map(this::mapPojo).toList();
final PageInfo pageInfo = pageInfo(dataFieldsDto.size());
return new DatasetFieldList(entities, pageInfo);
}

private TermSearchDataSetField mapPojo(final DatasetFieldTermsDto dto) {
return new TermSearchDataSetField()
.id(dto.getId())
.internalName(dto.getInternalName())
.name(dto.getName())
.oddrn(dto.getOddrn())
.defaultValue(dto.getDefaultValue())
.externalDescription(dto.getExternalDescription())
.internalDescription(dto.getInternalDescription())
.isKey(dto.getIsKey())
.isPrimaryKey(dto.getIsPrimaryKey())
.isSortKey(dto.getIsSortKey())
.isValue(dto.getIsValue())
.type(JSONSerDeUtils.deserializeJson(dto.getType().data(), DataSetFieldType.class))
.dataEntityId(dto.getDataEntityId())
.dataEntityName(dto.getDataEntityName())
.ownership(ownershipMapper.mapDtos(dto.getOwnership()))
.dataSource(dataSourceMapper.mapDto(new DataSourceDto(dto.getDataSource(), dto.getNamespace(), null)));
}

private PageInfo pageInfo(final long total) {
return new PageInfo(total, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.opendatadiscovery.oddplatform.repository.mapper;

import lombok.RequiredArgsConstructor;
import org.jooq.Record;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldTermsDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataSourcePojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.NamespacePojo;
import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper;
import org.springframework.stereotype.Component;

import static org.opendatadiscovery.oddplatform.model.Tables.DATASET_FIELD;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_SOURCE;
import static org.opendatadiscovery.oddplatform.model.Tables.NAMESPACE;

@Component
@RequiredArgsConstructor
public class DatasetFieldTermsDtoMapper {
public static final String DATASET_FIELD_CTE_NAME = "dataset_field_cte";

private final JooqRecordHelper jooqRecordHelper;
private final DataEntityDtoMapper dataEntityDtoMapper;

public DatasetFieldTermsDto mapRecordToDto(final Record record) {
final Record deRecord = jooqRecordHelper.remapCte(record, DATASET_FIELD_CTE_NAME, DATASET_FIELD);

final DatasetFieldPojo datasetFieldPojo
= jooqRecordHelper.extractRelation(deRecord, DATASET_FIELD, DatasetFieldPojo.class);

final DataEntityPojo dataEntityPojo =
jooqRecordHelper.extractRelation(record, DATA_ENTITY, DataEntityPojo.class);

return DatasetFieldTermsDto.builder()
.id(datasetFieldPojo.getId())
.isKey(datasetFieldPojo.getIsKey())
.name(datasetFieldPojo.getName())
.internalName(datasetFieldPojo.getInternalName())
.oddrn(datasetFieldPojo.getOddrn())
.defaultValue(datasetFieldPojo.getDefaultValue())
.internalDescription(datasetFieldPojo.getInternalDescription())
.externalDescription(datasetFieldPojo.getExternalDescription())
.isValue(datasetFieldPojo.getIsValue())
.type(datasetFieldPojo.getType())
.isSortKey(datasetFieldPojo.getIsSortKey())
.isPrimaryKey(datasetFieldPojo.getIsPrimaryKey())
.dataSource(jooqRecordHelper.extractRelation(record, DATA_SOURCE, DataSourcePojo.class))
.ownership(dataEntityDtoMapper.extractOwnershipRelation(record))
.namespace(jooqRecordHelper.extractRelation(record, NAMESPACE, NamespacePojo.class))
.dataEntityName(dataEntityPojo.getExternalName())
.dataEntityId(dataEntityPojo.getId())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.List;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldTermsDto;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldWithTagsDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import reactor.core.publisher.Flux;
Expand All @@ -16,4 +17,7 @@ public interface ReactiveDatasetFieldRepository extends ReactiveCRUDRepository<D
Flux<DatasetFieldPojo> getLastVersionDatasetFieldsByOddrns(final List<String> oddrns);

Mono<Long> getDataEntityIdByDatasetFieldId(final long datasetFieldId);

Flux<DatasetFieldTermsDto> listByTerm(final long termId, final String query,
final int page, final int size);
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.SelectSeekStepN;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldTermsDto;
import org.opendatadiscovery.oddplatform.dto.DatasetFieldWithTagsDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DatasetFieldPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.TagPojo;
import org.opendatadiscovery.oddplatform.model.tables.records.DatasetFieldRecord;
import org.opendatadiscovery.oddplatform.repository.mapper.DatasetFieldTermsDtoMapper;
import org.opendatadiscovery.oddplatform.repository.util.JooqQueryHelper;
import org.opendatadiscovery.oddplatform.repository.util.JooqReactiveOperations;
import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper;
Expand All @@ -24,10 +33,16 @@
import static org.jooq.impl.DSL.jsonArrayAgg;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.partitionBy;
import static org.jooq.impl.DSL.select;
import static org.opendatadiscovery.oddplatform.model.Tables.DATASET_FIELD;
import static org.opendatadiscovery.oddplatform.model.Tables.DATASET_FIELD_TO_TERM;
import static org.opendatadiscovery.oddplatform.model.Tables.DATASET_STRUCTURE;
import static org.opendatadiscovery.oddplatform.model.Tables.DATASET_VERSION;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_SOURCE;
import static org.opendatadiscovery.oddplatform.model.Tables.NAMESPACE;
import static org.opendatadiscovery.oddplatform.model.Tables.OWNER;
import static org.opendatadiscovery.oddplatform.model.Tables.OWNERSHIP;
import static org.opendatadiscovery.oddplatform.model.Tables.TAG;
import static org.opendatadiscovery.oddplatform.model.Tables.TAG_TO_DATASET_FIELD;

Expand All @@ -36,14 +51,20 @@
public class ReactiveDatasetFieldRepositoryImpl
extends ReactiveAbstractCRUDRepository<DatasetFieldRecord, DatasetFieldPojo>
implements ReactiveDatasetFieldRepository {
public static final String DATASET_FIELD_CTE_NAME = "dataset_field_cte";
public static final String AGG_OWNER_FIELD = "owner";
public static final String AGG_OWNERSHIP_FIELD = "ownership";

private final JooqRecordHelper jooqRecordHelper;
private final DatasetFieldTermsDtoMapper datasetFieldTermsDtoMapper;

public ReactiveDatasetFieldRepositoryImpl(final JooqReactiveOperations jooqReactiveOperations,
final JooqQueryHelper jooqQueryHelper,
final JooqRecordHelper jooqRecordHelper) {
final JooqRecordHelper jooqRecordHelper,
final DatasetFieldTermsDtoMapper datasetFieldTermsDtoMapper) {
super(jooqReactiveOperations, jooqQueryHelper, DATASET_FIELD, DatasetFieldPojo.class);
this.jooqRecordHelper = jooqRecordHelper;
this.datasetFieldTermsDtoMapper = datasetFieldTermsDtoMapper;
}

@Override
Expand Down Expand Up @@ -73,7 +94,7 @@ public Flux<DatasetFieldPojo> getLastVersionDatasetFieldsByOddrns(final List<Str
final String maxVersion = "max_version";
final Name cteName = name("cte");

final var cte = cteName.as(DSL.select(DATASET_FIELD.fields())
final var cte = cteName.as(select(DATASET_FIELD.fields())
.select(DATASET_VERSION.VERSION.as(version))
.select(DSL.max(DATASET_VERSION.VERSION).over(partitionBy(DATASET_FIELD.ODDRN)).as(maxVersion))
.from(DATASET_FIELD)
Expand All @@ -91,7 +112,7 @@ public Flux<DatasetFieldPojo> getLastVersionDatasetFieldsByOddrns(final List<Str

@Override
public Mono<Long> getDataEntityIdByDatasetFieldId(final long datasetFieldId) {
final var query = DSL.select(DATA_ENTITY.ID)
final var query = select(DATA_ENTITY.ID)
.from(DATASET_FIELD)
.join(DATASET_STRUCTURE).on(DATASET_STRUCTURE.DATASET_FIELD_ID.eq(DATASET_FIELD.ID))
.join(DATASET_VERSION).on(DATASET_STRUCTURE.DATASET_VERSION_ID.eq(DATASET_VERSION.ID))
Expand All @@ -103,7 +124,7 @@ public Mono<Long> getDataEntityIdByDatasetFieldId(final long datasetFieldId) {

@Override
public Mono<DatasetFieldWithTagsDto> getDatasetFieldWithTags(final long datasetFieldId) {
final var query = DSL.select(DATASET_FIELD.fields())
final var query = select(DATASET_FIELD.fields())
.select(jsonArrayAgg(field(TAG.asterisk().toString())).as("tags"))
.from(DATASET_FIELD)
.leftJoin(TAG_TO_DATASET_FIELD).on(DATASET_FIELD.ID.eq(TAG_TO_DATASET_FIELD.DATASET_FIELD_ID))
Expand All @@ -115,6 +136,69 @@ public Mono<DatasetFieldWithTagsDto> getDatasetFieldWithTags(final long datasetF
.map(this::mapRecordToDatasetFieldWithTags);
}

@Override
public Flux<DatasetFieldTermsDto> listByTerm(final long termId, final String query,
final int page, final int size) {
final List<Field<?>> selectFields = new ArrayList<>(Arrays.stream(DATASET_FIELD.fields()).toList());
final List<Condition> conditions = new ArrayList<>();

if (StringUtils.isNotBlank(query)) {
conditions.add(DATASET_FIELD.NAME.contains(query));
}

final SelectSeekStepN<Record> records = select(selectFields)
.from(DATASET_FIELD)
.where(conditions)
.orderBy(List.of(field(DATASET_FIELD.ID).desc()));

final Table<Record> datasetCte = records.asTable(DATASET_FIELD_CTE_NAME);

final List<Field<?>> groupByFields = Stream.of(datasetCte.fields(), NAMESPACE.fields(),
DATA_SOURCE.fields(),
DATA_ENTITY.fields())
.flatMap(Arrays::stream)
.toList();

final List<Field<?>> aggregatedFields = List.of(
jsonArrayAgg(field(OWNER.asterisk().toString())).as(AGG_OWNER_FIELD),
jsonArrayAgg(field(OWNERSHIP.asterisk().toString())).as(AGG_OWNERSHIP_FIELD));

final Table<?> fromTable = DSL.table(DATASET_FIELD_CTE_NAME)
.leftJoin(DATA_ENTITY)
.on(DATA_ENTITY.ODDRN.eq(select(DATASET_VERSION.DATASET_ODDRN)
.from(DATASET_VERSION)
.where(DATASET_VERSION.ID.eq(
select(DSL.max(DATASET_STRUCTURE.DATASET_VERSION_ID))
.from(DATASET_STRUCTURE)
.where(
DATASET_STRUCTURE.DATASET_FIELD_ID.eq(
jooqQueryHelper.getField(datasetCte, DATASET_FIELD.ID)))
.groupBy(DATASET_STRUCTURE.DATASET_FIELD_ID))
)))
.leftJoin(DATA_SOURCE)
.on(DATA_SOURCE.ID.eq(DATA_ENTITY.DATA_SOURCE_ID))
.leftJoin(NAMESPACE).on(NAMESPACE.ID.eq(DATA_ENTITY.NAMESPACE_ID))
.or(NAMESPACE.ID.eq(DATA_SOURCE.NAMESPACE_ID))
.leftJoin(OWNERSHIP).on(OWNERSHIP.DATA_ENTITY_ID.eq(DATA_ENTITY.ID))
.leftJoin(OWNER).on(OWNER.ID.eq(OWNERSHIP.OWNER_ID))
.leftJoin(DATASET_FIELD_TO_TERM)
.on(DATASET_FIELD_TO_TERM.DATASET_FIELD_ID.eq(jooqQueryHelper.getField(datasetCte, DATASET_FIELD.ID)));

final var jooqQuery = DSL.with(DATASET_FIELD_CTE_NAME)
.asMaterialized(records)
.select(groupByFields)
.select(aggregatedFields)
.from(fromTable)
.where(DATASET_FIELD_TO_TERM.TERM_ID.eq(termId))
.groupBy(groupByFields)
.orderBy(List.of(jooqQueryHelper.getField(datasetCte, DATASET_FIELD.ID).desc()))
.limit(size)
.offset((page - 1) * size);

return jooqReactiveOperations.flux(jooqQuery)
.map(datasetFieldTermsDtoMapper::mapRecordToDto);
}

@NotNull
private DatasetFieldWithTagsDto mapRecordToDatasetFieldWithTags(final Record datasetFieldRecord) {
final DatasetFieldPojo pojo = datasetFieldRecord.into(DatasetFieldPojo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,15 @@ public Mono<Page<TermDto>> findByState(final FacetStateDto state, final int page
.select(jsonArrayAgg(field(TERM_OWNERSHIP.asterisk().toString())).as(AGG_OWNERSHIPS_FIELD))
.select(jsonArrayAgg(field(OWNER.asterisk().toString())).as(AGG_OWNERS_FIELD))
.select(jsonArrayAgg(field(TITLE.asterisk().toString())).as(AGG_TITLES_FIELD))
.select(DSL.countDistinct(DATA_ENTITY_TO_TERM.DATA_ENTITY_ID).as(ENTITIES_COUNT))
.select(DSL.countDistinct(DATA_ENTITY_TO_TERM.DATA_ENTITY_ID)
.plus(DSL.countDistinct(DATASET_FIELD_TO_TERM.DATASET_FIELD_ID)).as(ENTITIES_COUNT))
.from(termCTE.getName())
.join(NAMESPACE).on(NAMESPACE.ID.eq(termCTE.field(TERM.NAMESPACE_ID)))
.leftJoin(TERM_OWNERSHIP).on(TERM_OWNERSHIP.TERM_ID.eq(termCTE.field(TERM.ID)))
.leftJoin(OWNER).on(OWNER.ID.eq(TERM_OWNERSHIP.OWNER_ID))
.leftJoin(TITLE).on(TITLE.ID.eq(TERM_OWNERSHIP.TITLE_ID))
.leftJoin(DATA_ENTITY_TO_TERM).on(DATA_ENTITY_TO_TERM.TERM_ID.eq(termCTE.field(TERM.ID)))
.leftJoin(DATASET_FIELD_TO_TERM).on(DATASET_FIELD_TO_TERM.TERM_ID.eq(termCTE.field(TERM.ID)))
.groupBy(groupByFields);

return jooqReactiveOperations.flux(query)
Expand Down
Loading

0 comments on commit b482cbe

Please sign in to comment.