Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1484 - Implementation of Data Quality Dashboard. Created API for Dashboard #1502

Merged
merged 14 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
spring-webflux = '6.0.9'
reactor-extra = '3.5.1'
micrometer-registry-prometheus = '1.9.0'
ingestion-contract-server = '0.1.24'
ingestion-contract-server = '0.1.27'
oddrn-generator-java = '0.1.20'
odd-integration-manifests = '0.0.4'
apache-collections = '4.4'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opendatadiscovery.oddplatform.controller;

import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.DataQualityRunsApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityResults;
import org.opendatadiscovery.oddplatform.service.DataQualityRunsService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class DataQualityRunsController implements DataQualityRunsApi {
private final DataQualityRunsService service;

@Override
public Mono<ResponseEntity<DataQualityResults>> getDataQualityTestsRuns(
final ServerWebExchange exchange) {
return service.getDataQualityTestsRuns()
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opendatadiscovery.oddplatform.dto;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;

import static java.util.function.Function.identity;

@Getter
public enum DataQualityCategory {
ASSERTION("Assertion Tests"),
VOLUME_ANOMALY("Volume Anomalies"),
FRESHNESS_ANOMALY("Freshness Anomalies"),
COLUMN_VALUES_ANOMALY("Column Values Anomalies"),
SCHEMA_CHANGE("Schema Changes"),
UNKNOWN("Unknown category");

private final String description;

DataQualityCategory(final String description) {
this.description = description;
}

private static final Map<String, DataQualityCategory> DICT = Arrays
.stream(values())
.collect(Collectors.toMap(DataQualityCategory::name, identity()));

public static DataQualityCategory resolveByName(final String name) {
return DICT.getOrDefault(name, UNKNOWN);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record3;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityCategoryResults;

public interface DataQualityCategoryMapper {
String TASK_RUNS_COUNT = "TASK_RUNS_COUNT";
String TASK_RUN_CATEGORY = "TASK_RUN_CATEGORY";

List<DataQualityCategoryResults> mapToDto(List<Record3<String, String, Integer>> items);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jooq.Record3;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityCategoryResults;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityRunStatusCount;
import org.opendatadiscovery.oddplatform.dto.DataQualityCategory;
import org.springframework.stereotype.Component;

import static org.opendatadiscovery.oddplatform.model.tables.DataEntityTaskLastRun.DATA_ENTITY_TASK_LAST_RUN;

@Component
public class DataQualityCategoryMapperImpl implements DataQualityCategoryMapper {
private static final String TASK_RUNS_COUNT = "TASK_RUNS_COUNT";
private static final String TASK_RUN_CATEGORY = "TASK_RUN_CATEGORY";

@Override
public List<DataQualityCategoryResults> mapToDto(final List<Record3<String, String, Integer>> items) {
final Map<DataQualityCategory, DataQualityCategoryResults> categoryResults =
Arrays.stream(DataQualityCategory.values())
.collect(Collectors.toMap(Function.identity(),
value ->
new DataQualityCategoryResults()
.category(value.getDescription())
.results(new ArrayList<>())));

items.forEach(row -> categoryResults.get(DataQualityCategory
.resolveByName(row.get(TASK_RUN_CATEGORY, String.class)))
.getResults().add(new DataQualityRunStatusCount()
.status(DataEntityRunStatus
.fromValue(row.getValue(DATA_ENTITY_TASK_LAST_RUN.STATUS)))
.count(Long.valueOf(row.get(TASK_RUNS_COUNT, Integer.class)))));

return addMissingStatuses(categoryResults.values()
.stream()
.toList());
}

private List<DataQualityCategoryResults> addMissingStatuses(final List<DataQualityCategoryResults> resultsList) {
for (final DataQualityCategoryResults dataQualityCategoryResults : resultsList) {
final Set<DataEntityRunStatus> existedElements = dataQualityCategoryResults.getResults()
.stream().map(DataQualityRunStatusCount::getStatus)
.collect(Collectors.toSet());

Arrays.stream(DataEntityRunStatus.values())
.filter(value -> !existedElements.contains(value))
.forEach(value -> dataQualityCategoryResults.getResults()
.add(new DataQualityRunStatusCount()
.status(value)
.count(0L)));
}

return resultsList;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opendatadiscovery.oddplatform.mapper;

import com.fasterxml.jackson.core.type.TypeReference;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
Expand Down Expand Up @@ -104,9 +103,6 @@ public FacetStateDto mapForm(final SearchFormData formData) {
@Override
public FacetStateDto mapForm(final TermSearchFormData formData) {
final TermSearchFormDataFilters filters = formData.getFilters();
if (filters == null) {
return FacetStateDto.empty();
}

final Map<FacetType, List<SearchFilterDto>> state = TERM_FORM_MAPPINGS.entrySet().stream()
.map(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Metric mapMetric(final MetricType metricType,
final List<MetricPointPojo> points,
final List<MetricSeriesDto> series,
final List<MetricLabelValueDto> labelValues) {
final Metric metric = new Metric();
final Metric metric = new Metric(new MetricPoint());
final List<MetricLabel> metricLabels = labelValues.stream()
.filter(dto -> metricLabelValues.contains(dto.labelValue().getId()))
.map(dto -> new MetricLabel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private List<Metric> mapMetrics(final MetricType type,
final Metric existingMetric = metricByLabels.get();
enrichMetric(existingMetric, type, metric);
} else {
final Metric newMetric = new Metric();
final Metric newMetric = new Metric(new MetricPoint());
newMetric.setLabels(metricLabels);
enrichMetric(newMetric, type, metric);
result.add(newMetric);
Expand All @@ -94,9 +94,6 @@ private List<Metric> mapMetrics(final MetricType type,
private void enrichMetric(final Metric metric,
final MetricType metricType,
final PrometheusMetric prometheusMetric) {
if (metric.getMetricPoint() == null) {
metric.setMetricPoint(new MetricPoint());
}
metric.getMetricPoint().setTimestamp(getTimestampValue(prometheusMetric.getValue().get(0)));
switch (metricType) {
case GAUGE -> enrichGaugeValue(metric, prometheusMetric);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record2;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesDashboard;

public interface TablesDashboardMapper {
String TABLE_STATUS = "STATUS";
String GOOD_HEALTH = "GOOD_HEALTH";
String ERROR_HEALTH = "ERROR";
String WARNING_HEALTH = "WARNING";
String MONITORED_TABLES = "MONITORED_TABLES";
String NOT_MONITORED_TABLES = "NOT_MONITORED_TABLES";
String COUNT = "COUNT";

TablesDashboard mapToDto(final List<Record2<Integer, String>> tableHealth,
final List<Record2<Integer, String>> monitoredTablesStatus);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record2;
import org.opendatadiscovery.oddplatform.api.contract.model.MonitoredTablesDashboard;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesDashboard;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesHealthDashboard;
import org.springframework.stereotype.Component;

@Component
public class TablesDashboardMapperImpl implements TablesDashboardMapper {
@Override
public TablesDashboard mapToDto(final List<Record2<Integer, String>> tableHealth,
final List<Record2<Integer, String>> monitoredTablesStatus) {
final TablesHealthDashboard tablesHealthDashboard = new TablesHealthDashboard();

tableHealth.forEach(row -> {
switch (row.get(TABLE_STATUS, String.class)) {
case GOOD_HEALTH -> tablesHealthDashboard.setHealthyTables(row.get(COUNT, Integer.class));
case ERROR_HEALTH -> tablesHealthDashboard.setErrorTables(row.get(COUNT, Integer.class));
case WARNING_HEALTH -> tablesHealthDashboard.setWarningTables(row.get(COUNT, Integer.class));
}
});

final MonitoredTablesDashboard monitoredTablesDashboard = new MonitoredTablesDashboard();

monitoredTablesStatus.forEach(row -> {
switch (row.get(TABLE_STATUS, String.class)) {
case MONITORED_TABLES -> monitoredTablesDashboard.setMonitoredTables(row.get(COUNT, Integer.class));
case NOT_MONITORED_TABLES ->
monitoredTablesDashboard.setNotMonitoredTables(row.get(COUNT, Integer.class));
}
});

return new TablesDashboard()
.tablesHealth(tablesHealthDashboard)
.monitoredTables(monitoredTablesDashboard);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import org.jooq.Record2;
import org.jooq.Record3;
import reactor.core.publisher.Flux;

public interface ReactiveDataQualityRunsRepository {

Flux<Record3<String, String, Integer>> getLatestDataQualityRunsResults();

Flux<Record2<Integer, String>> getLatestTablesHealth();

Flux<Record2<Integer, String>> getMonitoredTables();
}
Loading
Loading