Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement matching stats for SQL #3610

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2a04364
Implement MatchingStats for SQL mode
jnsrnhld Oct 16, 2024
f5955d5
Submit matchings stats collection tasks in parallel
jnsrnhld Oct 16, 2024
467abcc
Track time while calculating matching stats
jnsrnhld Oct 17, 2024
63a5832
Deactivate mappings and filter job
jnsrnhld Oct 17, 2024
2e130ba
Draft for fast matching stats calculation of only-equal-condition trees
jnsrnhld Oct 25, 2024
cecd791
Revert "Deactivate mappings and filter job"
jnsrnhld Oct 25, 2024
7e21052
Revert changes to MetadataCollectionTest
jnsrnhld Oct 25, 2024
52d097e
Revert changes to ConceptTreeNode
jnsrnhld Oct 25, 2024
7f7523c
Simplify condition match methods
jnsrnhld Oct 26, 2024
a87950c
Use SqlMatchingStats::empty for more fluent API
jnsrnhld Oct 27, 2024
de11866
Remove parallel calls
jnsrnhld Oct 27, 2024
4797e82
Use parallel in setAndAggregate
jnsrnhld Oct 29, 2024
a8bf45c
Simplify SQL matching stats calc
jnsrnhld Oct 30, 2024
9bf3c17
Add missing getter and setter
jnsrnhld Oct 30, 2024
1c02de1
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Oct 30, 2024
944aca7
Cleanup
jnsrnhld Oct 31, 2024
7e60026
Remove kh-content.csv
jnsrnhld Oct 31, 2024
c23e4b6
More cleanup
jnsrnhld Oct 31, 2024
c850c4e
Use ListenableFutures
jnsrnhld Oct 31, 2024
5be35d6
Revert changes in UpdateMatchingStatsMessage
jnsrnhld Oct 31, 2024
0988c8c
Draft
jnsrnhld Nov 5, 2024
d0b02b7
Refactoring
jnsrnhld Nov 6, 2024
20fc92e
Revert changes
jnsrnhld Nov 6, 2024
dba40f9
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Nov 6, 2024
2f2a4ae
Revert "Revert changes"
jnsrnhld Nov 6, 2024
edd2f73
Fix
jnsrnhld Nov 6, 2024
944fc14
cleanup CTCondition.getAuxillaryColumns
awildturtok Nov 6, 2024
6f4cd6a
ignore auxillary columns
awildturtok Nov 6, 2024
98f5902
dont min/max when ungrouped
awildturtok Nov 11, 2024
9ab227e
Fix least/greatest usage when there is only 1 validity date
jnsrnhld Nov 12, 2024
e794f67
Revert "Fix least/greatest usage when there is only 1 validity date"
jnsrnhld Nov 12, 2024
546b0e4
Revert "dont min/max when ungrouped"
jnsrnhld Nov 13, 2024
4c724a3
Only use least/greatest for multiple validity dates
jnsrnhld Nov 13, 2024
073f10e
Merge branch 'develop' into sql/matching-stats-perf
jnsrnhld Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.bakdata.conquery.mode.cluster;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.MatchingStats;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptElementId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.messages.namespaces.specific.UpdateElementMatchingStats;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import com.google.common.base.Functions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class WorkerUpdateMatchingStatsJob extends Job {
private final Worker worker;
private final Collection<ConceptId> concepts;

@Override
public void execute() throws Exception {
if (worker.getStorage().getAllCBlocks().findAny().isEmpty()) {
log.debug("Worker {} is empty, skipping.", worker);
return;
}

final ProgressReporter progressReporter = getProgressReporter();
progressReporter.setMax(concepts.size());

log.info("BEGIN update Matching stats for {} Concepts", concepts.size());

final Map<? extends ConceptId, CompletableFuture<Void>>
subJobs =
concepts.stream()
.collect(Collectors.toMap(Functions.identity(),
concept -> CompletableFuture.runAsync(() -> {
final Concept<?> resolved = concept.resolve();
final Map<ConceptElementId<?>, MatchingStats.Entry> matchingStats = new HashMap<>(resolved.countElements());

calculateConceptMatches(resolved, matchingStats, worker);
worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats));

progressReporter.report(1);
}, worker.getJobsExecutorService())
));


log.debug("All jobs submitted. Waiting for completion.");


final CompletableFuture<Void> all = CompletableFuture.allOf(subJobs.values().toArray(CompletableFuture[]::new));

do {
try {
all.get(1, TimeUnit.MINUTES);
}
catch (TimeoutException exception) {
// Count unfinished matching stats jobs.
if (log.isDebugEnabled()) {
final long unfinished = subJobs.values().stream().filter(Predicate.not(CompletableFuture::isDone)).count();
log.debug("{} still waiting for {} tasks", worker.getInfo().getDataset(), unfinished);
}

// When trace, also log the unfinished jobs.
if (log.isTraceEnabled()) {
subJobs.forEach((concept, future) -> {
if (future.isDone()) {
return;
}

log.trace("Still waiting for `{}`", concept);

});
}
}
} while (!all.isDone());

log.debug("DONE collecting matching stats for {}", worker.getInfo().getDataset());

}

@Override
public String getLabel() {
return String.format("Calculate Matching Stats for %s", worker.getInfo().getDataset());
}

private static void calculateConceptMatches(Concept<?> concept, Map<ConceptElementId<?>, MatchingStats.Entry> results, Worker worker) {
log.debug("BEGIN calculating for `{}`", concept.getId());

for (CBlock cBlock : worker.getStorage().getAllCBlocks().toList()) {

if (!cBlock.getConnector().getConcept().equals(concept.getId())) {
continue;
}

try {
final Bucket bucket = cBlock.getBucket().resolve();
final Table table = bucket.getTable().resolve();

for (String entity : bucket.entities()) {

final int entityEnd = bucket.getEntityEnd(entity);

for (int event = bucket.getEntityStart(entity); event < entityEnd; event++) {

final int[] localIds = cBlock.getPathToMostSpecificChild(event);


if (!(concept instanceof TreeConcept) || localIds == null) {
results.computeIfAbsent(concept.getId(), (ignored) -> new MatchingStats.Entry()).addEvent(table, bucket, event, entity);
continue;
}

if (Connector.isNotContained(localIds)) {
continue;
}

ConceptTreeNode<?> element = ((TreeConcept) concept).getElementByLocalIdPath(localIds);

while (element != null) {
results.computeIfAbsent(((ConceptElement<?>) element).getId(), (ignored) -> new MatchingStats.Entry())
.addEvent(table, bucket, event, entity);
element = element.getParent();
}
}
}

}
catch (Exception e) {
log.error("Failed to collect the matching stats for {}", cBlock, e);
}
}

log.trace("DONE calculating for `{}`", concept.getId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final SqlDialectFactory dialectFactory;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<LocalNamespace> datasetRegistry, Environment environment) {
public LocalNamespace createNamespace(
NamespaceStorage namespaceStorage,
MetaStorage metaStorage,
DatasetRegistry<LocalNamespace> datasetRegistry,
Environment environment) {

NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry, environment);

Expand All @@ -60,6 +64,10 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
return new LocalNamespace(
namespaceData.getPreprocessMapper(),
namespaceStorage,
sqlConnectorConfig,
databaseConfig,
sqlDialect,
sqlExecutionService,
executionManager,
dslContextWrapper,
sqlStorageHandler,
Expand Down
Loading
Loading