Skip to content

Commit

Permalink
1484 - Implementation of Data Quality Dashboard. Created API for Dash…
Browse files Browse the repository at this point in the history
…board (#1502)

Co-authored-by: Andrey Nenashev <[email protected]>
  • Loading branch information
Vladysl and AndreyNenashev authored Nov 8, 2023
1 parent 0bf14e5 commit 7c705f5
Show file tree
Hide file tree
Showing 84 changed files with 2,491 additions and 1,088 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
spring-webflux = '6.0.9'
reactor-extra = '3.5.1'
micrometer-registry-prometheus = '1.9.0'
ingestion-contract-server = '0.1.24'
ingestion-contract-server = '0.1.27'
oddrn-generator-java = '0.1.20'
odd-integration-manifests = '0.0.4'
apache-collections = '4.4'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opendatadiscovery.oddplatform.controller;

import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.DataQualityRunsApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityResults;
import org.opendatadiscovery.oddplatform.service.DataQualityRunsService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class DataQualityRunsController implements DataQualityRunsApi {
private final DataQualityRunsService service;

@Override
public Mono<ResponseEntity<DataQualityResults>> getDataQualityTestsRuns(
final ServerWebExchange exchange) {
return service.getDataQualityTestsRuns()
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opendatadiscovery.oddplatform.dto;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;

import static java.util.function.Function.identity;

@Getter
public enum DataQualityCategory {
ASSERTION("Assertion Tests"),
VOLUME_ANOMALY("Volume Anomalies"),
FRESHNESS_ANOMALY("Freshness Anomalies"),
COLUMN_VALUES_ANOMALY("Column Values Anomalies"),
SCHEMA_CHANGE("Schema Changes"),
UNKNOWN("Unknown category");

private final String description;

DataQualityCategory(final String description) {
this.description = description;
}

private static final Map<String, DataQualityCategory> DICT = Arrays
.stream(values())
.collect(Collectors.toMap(DataQualityCategory::name, identity()));

public static DataQualityCategory resolveByName(final String name) {
return DICT.getOrDefault(name, UNKNOWN);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record3;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityCategoryResults;

public interface DataQualityCategoryMapper {
String TASK_RUNS_COUNT = "TASK_RUNS_COUNT";
String TASK_RUN_CATEGORY = "TASK_RUN_CATEGORY";

List<DataQualityCategoryResults> mapToDto(List<Record3<String, String, Integer>> items);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jooq.Record3;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityCategoryResults;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityRunStatusCount;
import org.opendatadiscovery.oddplatform.dto.DataQualityCategory;
import org.springframework.stereotype.Component;

import static org.opendatadiscovery.oddplatform.model.tables.DataEntityTaskLastRun.DATA_ENTITY_TASK_LAST_RUN;

@Component
public class DataQualityCategoryMapperImpl implements DataQualityCategoryMapper {
private static final String TASK_RUNS_COUNT = "TASK_RUNS_COUNT";
private static final String TASK_RUN_CATEGORY = "TASK_RUN_CATEGORY";

@Override
public List<DataQualityCategoryResults> mapToDto(final List<Record3<String, String, Integer>> items) {
final Map<DataQualityCategory, DataQualityCategoryResults> categoryResults =
Arrays.stream(DataQualityCategory.values())
.collect(Collectors.toMap(Function.identity(),
value ->
new DataQualityCategoryResults()
.category(value.getDescription())
.results(new ArrayList<>())));

items.forEach(row -> categoryResults.get(DataQualityCategory
.resolveByName(row.get(TASK_RUN_CATEGORY, String.class)))
.getResults().add(new DataQualityRunStatusCount()
.status(DataEntityRunStatus
.fromValue(row.getValue(DATA_ENTITY_TASK_LAST_RUN.STATUS)))
.count(Long.valueOf(row.get(TASK_RUNS_COUNT, Integer.class)))));

return addMissingStatuses(categoryResults.values()
.stream()
.toList());
}

private List<DataQualityCategoryResults> addMissingStatuses(final List<DataQualityCategoryResults> resultsList) {
for (final DataQualityCategoryResults dataQualityCategoryResults : resultsList) {
final Set<DataEntityRunStatus> existedElements = dataQualityCategoryResults.getResults()
.stream().map(DataQualityRunStatusCount::getStatus)
.collect(Collectors.toSet());

Arrays.stream(DataEntityRunStatus.values())
.filter(value -> !existedElements.contains(value))
.forEach(value -> dataQualityCategoryResults.getResults()
.add(new DataQualityRunStatusCount()
.status(value)
.count(0L)));
}

return resultsList;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opendatadiscovery.oddplatform.mapper;

import com.fasterxml.jackson.core.type.TypeReference;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
Expand Down Expand Up @@ -104,9 +103,6 @@ public FacetStateDto mapForm(final SearchFormData formData) {
@Override
public FacetStateDto mapForm(final TermSearchFormData formData) {
final TermSearchFormDataFilters filters = formData.getFilters();
if (filters == null) {
return FacetStateDto.empty();
}

final Map<FacetType, List<SearchFilterDto>> state = TERM_FORM_MAPPINGS.entrySet().stream()
.map(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Metric mapMetric(final MetricType metricType,
final List<MetricPointPojo> points,
final List<MetricSeriesDto> series,
final List<MetricLabelValueDto> labelValues) {
final Metric metric = new Metric();
final Metric metric = new Metric(new MetricPoint());
final List<MetricLabel> metricLabels = labelValues.stream()
.filter(dto -> metricLabelValues.contains(dto.labelValue().getId()))
.map(dto -> new MetricLabel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private List<Metric> mapMetrics(final MetricType type,
final Metric existingMetric = metricByLabels.get();
enrichMetric(existingMetric, type, metric);
} else {
final Metric newMetric = new Metric();
final Metric newMetric = new Metric(new MetricPoint());
newMetric.setLabels(metricLabels);
enrichMetric(newMetric, type, metric);
result.add(newMetric);
Expand All @@ -94,9 +94,6 @@ private List<Metric> mapMetrics(final MetricType type,
private void enrichMetric(final Metric metric,
final MetricType metricType,
final PrometheusMetric prometheusMetric) {
if (metric.getMetricPoint() == null) {
metric.setMetricPoint(new MetricPoint());
}
metric.getMetricPoint().setTimestamp(getTimestampValue(prometheusMetric.getValue().get(0)));
switch (metricType) {
case GAUGE -> enrichGaugeValue(metric, prometheusMetric);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record2;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesDashboard;

public interface TablesDashboardMapper {
String TABLE_STATUS = "STATUS";
String GOOD_HEALTH = "GOOD_HEALTH";
String ERROR_HEALTH = "ERROR";
String WARNING_HEALTH = "WARNING";
String MONITORED_TABLES = "MONITORED_TABLES";
String NOT_MONITORED_TABLES = "NOT_MONITORED_TABLES";
String COUNT = "COUNT";

TablesDashboard mapToDto(final List<Record2<Integer, String>> tableHealth,
final List<Record2<Integer, String>> monitoredTablesStatus);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.List;
import org.jooq.Record2;
import org.opendatadiscovery.oddplatform.api.contract.model.MonitoredTablesDashboard;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesDashboard;
import org.opendatadiscovery.oddplatform.api.contract.model.TablesHealthDashboard;
import org.springframework.stereotype.Component;

@Component
public class TablesDashboardMapperImpl implements TablesDashboardMapper {
@Override
public TablesDashboard mapToDto(final List<Record2<Integer, String>> tableHealth,
final List<Record2<Integer, String>> monitoredTablesStatus) {
final TablesHealthDashboard tablesHealthDashboard = new TablesHealthDashboard();

tableHealth.forEach(row -> {
switch (row.get(TABLE_STATUS, String.class)) {
case GOOD_HEALTH -> tablesHealthDashboard.setHealthyTables(row.get(COUNT, Integer.class));
case ERROR_HEALTH -> tablesHealthDashboard.setErrorTables(row.get(COUNT, Integer.class));
case WARNING_HEALTH -> tablesHealthDashboard.setWarningTables(row.get(COUNT, Integer.class));
}
});

final MonitoredTablesDashboard monitoredTablesDashboard = new MonitoredTablesDashboard();

monitoredTablesStatus.forEach(row -> {
switch (row.get(TABLE_STATUS, String.class)) {
case MONITORED_TABLES -> monitoredTablesDashboard.setMonitoredTables(row.get(COUNT, Integer.class));
case NOT_MONITORED_TABLES ->
monitoredTablesDashboard.setNotMonitoredTables(row.get(COUNT, Integer.class));
}
});

return new TablesDashboard()
.tablesHealth(tablesHealthDashboard)
.monitoredTables(monitoredTablesDashboard);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import org.jooq.Record2;
import org.jooq.Record3;
import reactor.core.publisher.Flux;

public interface ReactiveDataQualityRunsRepository {

Flux<Record3<String, String, Integer>> getLatestDataQualityRunsResults();

Flux<Record2<Integer, String>> getLatestTablesHealth();

Flux<Record2<Integer, String>> getMonitoredTables();
}
Loading

0 comments on commit 7c705f5

Please sign in to comment.