Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement matching stats for SQL #3610

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2a04364
Implement MatchingStats for SQL mode
jnsrnhld Oct 16, 2024
f5955d5
Submit matchings stats collection tasks in parallel
jnsrnhld Oct 16, 2024
467abcc
Track time while calculating matching stats
jnsrnhld Oct 17, 2024
63a5832
Deactivate mappings and filter job
jnsrnhld Oct 17, 2024
2e130ba
Draft for fast matching stats calculation of only-equal-condition trees
jnsrnhld Oct 25, 2024
cecd791
Revert "Deactivate mappings and filter job"
jnsrnhld Oct 25, 2024
7e21052
Revert changes to MetadataCollectionTest
jnsrnhld Oct 25, 2024
52d097e
Revert changes to ConceptTreeNode
jnsrnhld Oct 25, 2024
7f7523c
Simplify condition match methods
jnsrnhld Oct 26, 2024
a87950c
Use SqlMatchingStats::empty for more fluent API
jnsrnhld Oct 27, 2024
de11866
Remove parallel calls
jnsrnhld Oct 27, 2024
4797e82
Use parallel in setAndAggregate
jnsrnhld Oct 29, 2024
a8bf45c
Simplify SQL matching stats calc
jnsrnhld Oct 30, 2024
9bf3c17
Add missing getter and setter
jnsrnhld Oct 30, 2024
1c02de1
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Oct 30, 2024
944aca7
Cleanup
jnsrnhld Oct 31, 2024
7e60026
Remove kh-content.csv
jnsrnhld Oct 31, 2024
c23e4b6
More cleanup
jnsrnhld Oct 31, 2024
c850c4e
Use ListenableFutures
jnsrnhld Oct 31, 2024
5be35d6
Revert changes in UpdateMatchingStatsMessage
jnsrnhld Oct 31, 2024
0988c8c
Draft
jnsrnhld Nov 5, 2024
d0b02b7
Refactoring
jnsrnhld Nov 6, 2024
20fc92e
Revert changes
jnsrnhld Nov 6, 2024
dba40f9
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Nov 6, 2024
2f2a4ae
Revert "Revert changes"
jnsrnhld Nov 6, 2024
edd2f73
Fix
jnsrnhld Nov 6, 2024
944fc14
cleanup CTCondition.getAuxillaryColumns
awildturtok Nov 6, 2024
6f4cd6a
ignore auxillary columns
awildturtok Nov 6, 2024
98f5902
dont min/max when ungrouped
awildturtok Nov 11, 2024
9ab227e
Fix least/greatest usage when there is only 1 validity date
jnsrnhld Nov 12, 2024
e794f67
Revert "Fix least/greatest usage when there is only 1 validity date"
jnsrnhld Nov 12, 2024
546b0e4
Revert "dont min/max when ungrouped"
jnsrnhld Nov 13, 2024
4c724a3
Only use least/greatest for multiple validity dates
jnsrnhld Nov 13, 2024
073f10e
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final SqlDialectFactory dialectFactory;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<LocalNamespace> datasetRegistry, Environment environment) {
public LocalNamespace createNamespace(
NamespaceStorage namespaceStorage,
MetaStorage metaStorage,
DatasetRegistry<LocalNamespace> datasetRegistry,
Environment environment) {

NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry, environment);

Expand All @@ -60,6 +64,11 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
return new LocalNamespace(
namespaceData.getPreprocessMapper(),
namespaceStorage,
config.getQueries().getExecutionPool(),
sqlConnectorConfig,
databaseConfig,
sqlDialect,
sqlExecutionService,
executionManager,
dslContextWrapper,
sqlStorageHandler,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
package com.bakdata.conquery.mode.local;

import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.max;
import static org.jooq.impl.DSL.min;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.noCondition;
import static org.jooq.impl.DSL.table;

import java.sql.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.bakdata.conquery.models.common.daterange.CDateRange;
import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.MatchingStats;
import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.exceptions.ConceptConfigurationException;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.sql.conversion.SharedAliases;
import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext;
import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.bakdata.conquery.util.CalculatedValue;
import com.bakdata.conquery.util.TablePrimaryColumnUtil;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.Select;
import org.jooq.Table;

@Slf4j
public class UpdateMatchingStatsSqlJob extends Job {

private static final Name CONNECTOR_COLUMN = name("connector_column");
private static final Name EVENTS = name("events");
private static final Name ENTITIES = name("entities");
private static final Name DATES = name("dates");

private final DatabaseConfig databaseConfig;
private final SqlExecutionService executionService;
private final DSLContext dslContext;
private final SqlFunctionProvider functionProvider;
private final Set<ConceptId> concepts;
private final ListeningExecutorService executors;

public UpdateMatchingStatsSqlJob(
DatabaseConfig databaseConfig,
SqlExecutionService executionService,
SqlFunctionProvider functionProvider,
Set<ConceptId> concepts,
ListeningExecutorService executors
) {
this.databaseConfig = databaseConfig;
this.executionService = executionService;
this.dslContext = executionService.getDslContext();
this.functionProvider = functionProvider;
this.concepts = concepts;
this.executors = executors;
}

@Override
public String getLabel() {
return "Calculating Matching Stats for %s.".formatted(executionService);
}

@Override
public void execute() throws Exception {

log.debug("BEGIN update Matching stats for {} Concepts.", concepts.size());
final StopWatch stopWatch = new StopWatch();
stopWatch.start();

final List<ListenableFuture<?>> runningQueries = concepts.stream()
.map(ConceptId::resolve)
.filter(UpdateMatchingStatsSqlJob::isTreeConcept)
.map(TreeConcept.class::cast)
.map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept)))
.collect(Collectors.toList());

Futures.whenAllComplete(runningQueries).run(() -> {
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
stopWatch.stop();
log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime());
runningQueries.forEach(UpdateMatchingStatsSqlJob::checkForError);
}, executors);
}

@Override
public void cancel() {
super.cancel();
executors.shutdownNow();
}

private static boolean isTreeConcept(final Concept<?> concept) {
if (!(concept instanceof TreeConcept)) {
log.error("Collecting MatchingStats is currently only supported for TreeConcepts.");
return false;
}
return true;
}

private static void checkForError(final Future<?> future) {
try {
future.get();
}
catch (ExecutionException | InterruptedException e) {
log.error("Unknown error while querying SQL matching stats. Cause: \n", e.getCause());
}
}

public void calculateMatchingStats(final TreeConcept treeConcept) {

final Map<Connector, Set<Field<?>>> relevantColumns = collectRelevantColumns(treeConcept);
final Map<Connector, List<ColumnDateRange>> validityDateMap = createColumnDateRanges(treeConcept);

// union of all connectors of the concept
final Select<?> unioned = treeConcept.getConnectors().stream()
.map(connector -> this.createConnectorQuery(connector, relevantColumns, validityDateMap))
.reduce(Select::unionAll)
.orElseThrow(IllegalStateException::new);

// select the minimum of the least start date and the maximum of the greatest end date of all validity dates of all connectors
final List<ColumnDateRange> validityDates = validityDateMap.values().stream().flatMap(List::stream).map(functionProvider::toDualColumn).toList();
final List<Field<Date>> allStarts = validityDates.stream().map(ColumnDateRange::getStart).toList();
final List<Field<Date>> allEnds = validityDates.stream().map(ColumnDateRange::getEnd).toList();
final ColumnDateRange minAndMax = ColumnDateRange.of(min(functionProvider.least(allStarts)), max(functionProvider.greatest((allEnds))));
final Field<String> validityDateExpression = functionProvider.daterangeStringExpression(minAndMax).as(DATES);

// all connectors need the same columns originating from the concept definition - they might have different names in the respective connector tables,
// but as we aliased them already, we can just use the unified aliases in the final query
final List<Field<?>> relevantColumnsAliased = relevantColumns.get(treeConcept.getConnectors().get(0)).stream()
.map(field -> field(field.getUnqualifiedName()))
.collect(Collectors.toList());

final Select<? extends Record> query = dslContext.select(relevantColumnsAliased)
.select(
count(asterisk()).as(EVENTS),
countDistinct(field(ENTITIES)).as(ENTITIES),
validityDateExpression
)
.from(unioned)
.groupBy(relevantColumnsAliased);

final ConceptTreeCache treeCache = new ConceptTreeCache(treeConcept);
executionService.fetchStream(query).forEach(record -> mapRecordToConceptElements(treeConcept, record, relevantColumnsAliased, treeCache));
}

/**
* @return A map from a connector to all relevant columns the connector's concept defines. A relevant column is any column that is used by a
* {@link CTCondition} which is part of any child of a concept, or it's a concept's connector column.
*/
private Map<Connector, Set<Field<?>>> collectRelevantColumns(final TreeConcept treeConcept) {
return treeConcept.getConnectors().stream().collect(Collectors.toMap(
Function.identity(),
connector -> collectRelevantColumns(connector, treeConcept.getChildren())
.stream()
.map(column -> {
final Field<Object> field = field(name(column));
// connector columns are unioned, thus they need the same alias
if (connector.getColumn() != null && connector.getColumn().resolve().getName().equals(column)) {
return field.as(CONNECTOR_COLUMN);
}
// a condition which does not operate on the connector column MUST have the same name in all connector's tables
return field;
})
.collect(Collectors.toSet())
));
}

private Set<String> collectRelevantColumns(final Connector connector, final List<ConceptTreeChild> children) {
final Set<String> relevantColumns = new HashSet<>();

for (ConceptTreeChild child : children) {
if (child.getCondition() == null && child.getChildren().isEmpty()) {
continue;
}

final Set<String> childColumns = new HashSet<>();

// Recursively collect columns from the current child's children, if they exist
if (!child.getChildren().isEmpty()) {
final Set<String> childrenColumns = collectRelevantColumns(connector, child.getChildren());
childColumns.addAll(childrenColumns);
}

// Add columns from the child's condition, if it exists
if (child.getCondition() != null) {
final Set<String> conditionColumns = child.getCondition().getColumns(connector);
childColumns.addAll(conditionColumns);
}

relevantColumns.addAll(childColumns);
}

return relevantColumns;
}

private Map<Connector, List<ColumnDateRange>> createColumnDateRanges(final TreeConcept treeConcept) {
final AtomicInteger counter = new AtomicInteger(0);
return treeConcept.getConnectors().stream().collect(Collectors.toMap(
Function.identity(),
connector -> createColumnDateRanges(connector, counter)
));
}

private List<ColumnDateRange> createColumnDateRanges(final Connector connector, final AtomicInteger counter) {
return connector.getValidityDates().stream()
.map(functionProvider::forValidityDate)
.map(daterange -> daterange.as("%s-%d".formatted(SharedAliases.DATES_COLUMN.getAlias(), counter.incrementAndGet())))
.toList();
}

private Select<?> createConnectorQuery(
final ConceptTreeConnector connector,
final Map<Connector, Set<Field<?>>> relevantColumns,
final Map<Connector, List<ColumnDateRange>> validityDateMap
) {
final Table<Record> connectorTable = table(name(connector.getResolvedTable().getName()));
final Set<Field<?>> connectorColumns = relevantColumns.get(connector);
final Field<Object> primaryKey = TablePrimaryColumnUtil.findPrimaryColumn(connector.getResolvedTable(), databaseConfig).as(ENTITIES);

// we have to select all possible validity dates of all connectors because we have to union multiple connectors
final List<Field<?>> validityDates =
validityDateMap.entrySet().stream()
.flatMap(entry -> entry.getValue().stream().map(columnDateRange -> entry.getKey() == connector
? columnDateRange
: functionProvider.nulled(columnDateRange))
.flatMap(columnDateRange -> columnDateRange.toFields().stream()))
.toList();

// connector might have a condition
final Condition connectorCondition = toJooqCondition(connector, Optional.ofNullable(connector.getCondition()));

return dslContext.select(primaryKey)
.select(connectorColumns)
.select(validityDates)
.from(connectorTable)
.where(connectorCondition);
}

private Condition toJooqCondition(final Connector connector, final Optional<CTCondition> childCondition) {
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
final CTConditionContext context = CTConditionContext.create(connector, functionProvider);
return childCondition.or(() -> Optional.ofNullable(connector.getCondition()))
.map(condition -> condition.convertToSqlCondition(context).condition())
.orElse(noCondition());
}

private void mapRecordToConceptElements(
final TreeConcept treeConcept,
final Record record,
final List<Field<?>> relevantColumns,
final ConceptTreeCache treeCache
) {
final CalculatedValue<Map<String, Object>> rowMap = new CalculatedValue<>(record::intoMap);
final MatchingStats.Entry entry = toMatchingStatsEntry(record);

if (treeConcept.getChildren().isEmpty()) {
addEntryToConceptElement(treeConcept, treeConcept.getName(), entry);
return;
}

relevantColumns.stream().map(field -> record.get(field, String.class)).forEach(relevantColumnValue -> {
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
try {
final ConceptTreeChild mostSpecificChild = treeCache.findMostSpecificChild(relevantColumnValue, rowMap);

// database value did not match any node of the concept
if (mostSpecificChild == null) {
return;
}

// add stats for most specific child
addEntryToConceptElement(mostSpecificChild, relevantColumnValue, entry);

// add child stats to all parents till concept root
ConceptTreeNode<?> current = mostSpecificChild.getParent();
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
while (current != null) {
addEntryToConceptElement(current, relevantColumnValue, entry);
current = current.getParent();
}
}
catch (ConceptConfigurationException e) {
throw new RuntimeException(e);
}
});
}

private MatchingStats.Entry toMatchingStatsEntry(Record record) {
final long events = record.get(EVENTS, Integer.class).longValue();
final long entities = record.get(ENTITIES, Integer.class).longValue();
final CDateRange dateSpan = toDateRange(record.get(DATES, String.class));
return new MatchingStats.Entry(events, entities, dateSpan.getMinValue(), dateSpan.getMaxValue());
}

private CDateRange toDateRange(final String validityDateExpression) {
final List<Integer> dateRange = executionService.getResultSetProcessor().getCDateSetParser().toEpochDayRange(validityDateExpression);
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
return !dateRange.isEmpty() ? CDateRange.fromList(dateRange) : CDateRange.all();
}

private static void addEntryToConceptElement(final ConceptTreeNode<?> mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) {
final MatchingStats childMatchingStats;
if (mostSpecificChild.getMatchingStats() == null) {
childMatchingStats = new MatchingStats();
((ConceptElement<?>) mostSpecificChild).setMatchingStats(childMatchingStats);
}
else {
childMatchingStats = mostSpecificChild.getMatchingStats();
}
childMatchingStats.putEntry(columnKey, entry);
jnsrnhld marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.bakdata.conquery.models.config;

import java.util.Map;
import jakarta.validation.Valid;

import com.bakdata.conquery.models.datasets.Dataset;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.ValidationMethod;
import jakarta.validation.Valid;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down
Loading
Loading