diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index 6a40a74705..5bf80be772 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -37,7 +37,11 @@ public class LocalNamespaceHandler implements NamespaceHandler { private final SqlDialectFactory dialectFactory; @Override - public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry datasetRegistry, Environment environment) { + public LocalNamespace createNamespace( + NamespaceStorage namespaceStorage, + MetaStorage metaStorage, + DatasetRegistry datasetRegistry, + Environment environment) { NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry, environment); @@ -60,6 +64,11 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto return new LocalNamespace( namespaceData.getPreprocessMapper(), namespaceStorage, + config.getQueries().getExecutionPool(), + sqlConnectorConfig, + databaseConfig, + sqlDialect, + sqlExecutionService, executionManager, dslContextWrapper, sqlStorageHandler, diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java b/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java new file mode 100644 index 0000000000..959d69f980 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/UpdateMatchingStatsSqlJob.java @@ -0,0 +1,368 @@ +package com.bakdata.conquery.mode.local; + +import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.count; +import static org.jooq.impl.DSL.countDistinct; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.max; +import static org.jooq.impl.DSL.min; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.noCondition; +import static org.jooq.impl.DSL.noField; +import static org.jooq.impl.DSL.table; + +import java.sql.Date; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.bakdata.conquery.models.common.daterange.CDateRange; +import com.bakdata.conquery.models.config.DatabaseConfig; +import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; +import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.datasets.concepts.MatchingStats; +import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; +import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; +import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; +import com.bakdata.conquery.models.jobs.Job; +import com.bakdata.conquery.sql.conversion.SharedAliases; +import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; +import com.bakdata.conquery.sql.conversion.model.ColumnDateRange; +import com.bakdata.conquery.sql.execution.SqlExecutionService; +import com.bakdata.conquery.util.CalculatedValue; +import com.bakdata.conquery.util.TablePrimaryColumnUtil; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Name; +import org.jooq.Record; +import org.jooq.Select; +import org.jooq.SelectJoinStep; +import org.jooq.Table; + +@Slf4j +public class UpdateMatchingStatsSqlJob extends Job { + + private static final Name CONNECTOR_COLUMN = name("connector_column"); + private static final Name EVENTS = name("events"); + private static final Name ENTITIES = name("entities"); + private static final Name DATES = name("dates"); + + private final DatabaseConfig databaseConfig; + private final SqlExecutionService executionService; + private final DSLContext dslContext; + private final SqlFunctionProvider functionProvider; + private final Set concepts; + private final ListeningExecutorService executors; + private ListenableFuture all; + + public UpdateMatchingStatsSqlJob( + DatabaseConfig databaseConfig, + SqlExecutionService executionService, + SqlFunctionProvider functionProvider, + Set concepts, + ExecutorService executors + ) { + this.databaseConfig = databaseConfig; + this.executionService = executionService; + this.dslContext = executionService.getDslContext(); + this.functionProvider = functionProvider; + this.concepts = concepts; + this.executors = MoreExecutors.listeningDecorator(executors); + } + + @Override + public void execute() throws Exception { + + log.debug("BEGIN update Matching stats for {} Concepts.", concepts.size()); + final StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + final List> runningQueries = concepts.stream() + .map(ConceptId::resolve) + .filter(UpdateMatchingStatsSqlJob::isTreeConcept) + .map(TreeConcept.class::cast) + .map(treeConcept -> executors.submit(() -> calculateMatchingStats(treeConcept))) + .collect(Collectors.toList()); + + all = Futures.allAsList(runningQueries); + while (!all.isDone()) { + try { + all.get(1, TimeUnit.MINUTES); + } + catch (TimeoutException exception) { + log.debug("Still waiting for {}", this); + if (log.isTraceEnabled()) { + log.trace("Waiting for {}", executors); + } + } + } + + stopWatch.stop(); + log.debug("DONE collecting matching stats. Elapsed time: {} ms.", stopWatch.getTime()); + } + + @Override + public void cancel() { + if (all != null) { + all.cancel(true); + } + super.cancel(); + } + + @Override + public String getLabel() { + return "Calculating Matching Stats for %s.".formatted(executionService); + } + + private static boolean isTreeConcept(final Concept concept) { + if (!(concept instanceof TreeConcept)) { + log.error("Collecting MatchingStats is currently only supported for TreeConcepts."); + return false; + } + return true; + } + + private static void addEntryToConceptElement(final ConceptTreeNode mostSpecificChild, final String columnKey, final MatchingStats.Entry entry) { + if (mostSpecificChild.getMatchingStats() == null) { + ((ConceptElement) mostSpecificChild).setMatchingStats(new MatchingStats()); + } + + mostSpecificChild.getMatchingStats().putEntry(columnKey, entry); + } + + private void calculateMatchingStats(final TreeConcept treeConcept) { + + final Map>> relevantColumns = collectRelevantColumns(treeConcept); + final Map> validityDateMap = createColumnDateRanges(treeConcept); + + // union of all connectors of the concept + final Select unioned = treeConcept.getConnectors().stream() + .map(connector -> createConnectorQuery(connector, relevantColumns, validityDateMap)) + .reduce(Select::unionAll) + .orElseThrow(IllegalStateException::new); + + // all connectors need the same columns originating from the concept definition - they might have different names in the respective connector tables, + // but as we aliased them already, we can just use the unified aliases in the final query + final List> relevantColumnsAliased = relevantColumns.get(treeConcept.getConnectors().get(0)).stream() + .map(field -> field(field.getUnqualifiedName())) + .collect(Collectors.toList()); + + // if there is no validity date at all, no field is selected + final Field validityDateExpression = toValidityDateExpression(validityDateMap); + + final SelectJoinStep query = dslContext.select(relevantColumnsAliased) + .select( + count(asterisk()).as(EVENTS), + countDistinct(field(ENTITIES)).as(ENTITIES), + validityDateExpression.as(DATES) + ) + .from(unioned); + + // not all dialects accept an empty group by () clause + final Select finalQuery = relevantColumnsAliased.isEmpty() ? query : query.groupBy(relevantColumnsAliased); + + final ConceptTreeCache treeCache = new ConceptTreeCache(treeConcept); + executionService.fetchStream(finalQuery) + .forEach(record -> mapRecordToConceptElements(treeConcept, record, treeCache)); + } + + /** + * @return A map from a connector to all relevant columns the connector's concept defines. A relevant column is any column that is used by a + * {@link CTCondition} which is part of any child of a concept, or it's a concept's connector column. + */ + private Map>> collectRelevantColumns(final TreeConcept treeConcept) { + return treeConcept.getConnectors().stream() + .collect(Collectors.toMap( + Function.identity(), + connector -> collectRelevantColumns(connector, treeConcept) + )); + } + + private Set> collectRelevantColumns(final Connector connector, TreeConcept concept) { + final Set> out = new HashSet<>(); + + if (connector.getColumn() != null) { + out.add(field(name(connector.getColumn().resolve().getName())).as(CONNECTOR_COLUMN)); + } + + for (String name : collectRelevantColumns(concept.getChildren())) { + out.add(field(name(name))); + } + + return out; + } + + private Set collectRelevantColumns(final List children) { + return children.stream().flatMap(child -> collectRelevantColumns(child).stream()).collect(Collectors.toSet()); + } + + private Set collectRelevantColumns(final ConceptTreeChild child) { + final Set childColumns = new HashSet<>(); + // Recursively collect columns from the current child's children, if they exist + if (!child.getChildren().isEmpty()) { + final Set childrenColumns = collectRelevantColumns(child.getChildren()); + childColumns.addAll(childrenColumns); + } + // Add columns from the child's condition, if it exists + if (child.getCondition() != null) { + final Set conditionColumns = child.getCondition().getAuxillaryColumns(); + childColumns.addAll(conditionColumns); + } + return childColumns; + } + + private Map> createColumnDateRanges(final TreeConcept treeConcept) { + final Map> map = new HashMap<>(); + final AtomicInteger counter = new AtomicInteger(0); + for (final ConceptTreeConnector connector : treeConcept.getConnectors()) { + if (connector.getValidityDates().isEmpty()) { + continue; + } + map.put(connector, createColumnDateRanges(connector, counter)); + } + return map; + } + + private List createColumnDateRanges(final Connector connector, final AtomicInteger counter) { + return connector.getValidityDates().stream() + .map(functionProvider::forValidityDate) + .map(daterange -> daterange.as("%s-%d".formatted(SharedAliases.DATES_COLUMN.getAlias(), counter.incrementAndGet()))) + .toList(); + } + + private Select createConnectorQuery( + final ConceptTreeConnector connector, + final Map>> relevantColumns, + final Map> validityDateMap + ) { + final Table connectorTable = table(name(connector.getResolvedTable().getName())); + final Set> connectorColumns = relevantColumns.get(connector); + final Field primaryKey = TablePrimaryColumnUtil.findPrimaryColumn(connector.getResolvedTable(), databaseConfig).as(ENTITIES); + + final List> validityDates = new ArrayList<>(); + + for (Map.Entry> entry : validityDateMap.entrySet()) { + for (ColumnDateRange columnDateRange : entry.getValue()) { + + // we have to select all possible validity dates of all connectors because we have to union multiple connectors + ColumnDateRange dateRange = columnDateRange; + + // Therefore we usually select null + if (entry.getKey() != connector) { + dateRange = functionProvider.nulled(columnDateRange); + } + + validityDates.addAll(dateRange.toFields()); + } + } + + // connector might have a condition + final Condition connectorCondition = connector.getCondition() == null + ? noCondition() + : toJooqCondition(connector, connector.getCondition()); + + return dslContext.select(primaryKey) + .select(connectorColumns) + .select(validityDates) + .from(connectorTable) + .where(connectorCondition); + } + + private Condition toJooqCondition(final Connector connector, CTCondition childCondition) { + final CTConditionContext context = CTConditionContext.create(connector, functionProvider); + return childCondition.convertToSqlCondition(context).condition(); + } + + /** + * Select the minimum of the least start date and the maximum of the greatest end date of all validity dates of all connectors. + */ + private Field toValidityDateExpression(final Map> validityDateMap) { + + if (validityDateMap.isEmpty()) { + return noField(String.class); + } + + final List validityDates = validityDateMap.values().stream().flatMap(List::stream).map(functionProvider::toDualColumn).toList(); + final List> allStarts = validityDates.stream().map(ColumnDateRange::getStart).toList(); + final List> allEnds = validityDates.stream().map(ColumnDateRange::getEnd).toList(); + + final ColumnDateRange minAndMax = ColumnDateRange.of( + min(allStarts.size() > 1 ? functionProvider.least(allStarts) : allStarts.get(0)), + max(allEnds.size() > 1 ? functionProvider.greatest(allEnds) : allEnds.get(0)) + ); + return functionProvider.daterangeStringExpression(minAndMax); + } + + private void mapRecordToConceptElements(final TreeConcept treeConcept, final Record record, final ConceptTreeCache treeCache) { + final CalculatedValue> rowMap = new CalculatedValue<>(record::intoMap); + final MatchingStats.Entry entry = toMatchingStatsEntry(record); + + if (treeConcept.getChildren().isEmpty()) { + addEntryToConceptElement(treeConcept, treeConcept.getName(), entry); + return; + } + + try { + final String columnValue = record.get(CONNECTOR_COLUMN, String.class); + + final ConceptTreeChild mostSpecificChild = treeCache.findMostSpecificChild(columnValue, rowMap); + + // database value did not match any node of the concept + if (mostSpecificChild == null) { + return; + } + + // add child stats to all parents till concept root + ConceptTreeNode current = mostSpecificChild; + while (current != null) { + addEntryToConceptElement(current, columnValue, entry); + current = current.getParent(); + } + } + catch (ConceptConfigurationException e) { + throw new RuntimeException(e); + } + } + + private MatchingStats.Entry toMatchingStatsEntry(Record record) { + final long events = record.get(EVENTS, Integer.class).longValue(); + final long entities = record.get(ENTITIES, Integer.class).longValue(); + final CDateRange dateSpan = toDateRange(record.get(DATES, String.class)); + + return new MatchingStats.Entry(events, entities, dateSpan.getMinValue(), dateSpan.getMaxValue()); + } + + private CDateRange toDateRange(final String validityDateExpression) { + final List dateRange = executionService.getResultSetProcessor().getCDateSetParser().toEpochDayRange(validityDateExpression); + + if (dateRange.isEmpty()) { + return CDateRange.all(); + } + + return CDateRange.fromList(dateRange); + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java index ef66c3ca8a..19d6393526 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java @@ -1,12 +1,12 @@ package com.bakdata.conquery.models.config; import java.util.Map; -import jakarta.validation.Valid; import com.bakdata.conquery.models.datasets.Dataset; import com.fasterxml.jackson.annotation.JsonIgnore; import io.dropwizard.util.Duration; import io.dropwizard.validation.ValidationMethod; +import jakarta.validation.Valid; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/MatchingStats.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/MatchingStats.java index 293d845f7d..fa84124462 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/MatchingStats.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/MatchingStats.java @@ -9,7 +9,6 @@ import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.AllArgsConstructor; import lombok.Data; @@ -21,75 +20,76 @@ @Setter public class MatchingStats { - private Map entries = new HashMap<>(); - @JsonIgnore - private transient CDateRange span; - - @JsonIgnore - private transient long numberOfEvents = -1L; - - @JsonIgnore - private transient long numberOfEntities = -1L; - - public long countEvents() { - if (numberOfEvents == -1L) { - synchronized (this) { - if (numberOfEvents == -1L) { - numberOfEvents = entries.values().stream().mapToLong(Entry::getNumberOfEvents).sum(); - } - } - } - return numberOfEvents; - } - - - public long countEntities() { - if (numberOfEntities == -1L) { - synchronized (this) { - if (numberOfEntities == -1L) { - numberOfEntities = entries.values().stream().mapToLong(Entry::getNumberOfEntities).sum(); - } - } - } - return numberOfEntities; - } - - public CDateRange spanEvents() { - if (span == null) { - synchronized (this) { - if (span == null) { - span = entries.values().stream().map(Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed); - } - } - } - return span; - - } - - public void putEntry(WorkerId source, Entry entry) { - synchronized (this) { - entries.put(source, entry); - span = null; - numberOfEntities = -1L; - numberOfEvents = -1L; - } - } - - @Data - @NoArgsConstructor - @AllArgsConstructor - public static class Entry { - private long numberOfEvents; - - @JsonIgnore - private final Set foundEntities = new HashSet<>(); - private long numberOfEntities; + private final Map entries = new HashMap<>(); + + @JsonIgnore + private transient CDateRange span; + + @JsonIgnore + private transient long numberOfEvents = -1L; + + @JsonIgnore + private transient long numberOfEntities = -1L; + + public long countEvents() { + if (numberOfEvents == -1L) { + synchronized (this) { + if (numberOfEvents == -1L) { + numberOfEvents = entries.values().stream().mapToLong(MatchingStats.Entry::getNumberOfEvents).sum(); + } + } + } + return numberOfEvents; + } + + + public long countEntities() { + if (numberOfEntities == -1L) { + synchronized (this) { + if (numberOfEntities == -1L) { + numberOfEntities = entries.values().stream().mapToLong(MatchingStats.Entry::getNumberOfEntities).sum(); + } + } + } + return numberOfEntities; + } + + public CDateRange spanEvents() { + if (span == null) { + synchronized (this) { + if (span == null) { + span = entries.values().stream().map(MatchingStats.Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed); + } + } + } + return span; + + } + + public void putEntry(String source, MatchingStats.Entry entry) { + synchronized (this) { + entries.put(source, entry); + span = null; + numberOfEntities = -1L; + numberOfEvents = -1L; + } + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Entry { + private long numberOfEvents; + + @JsonIgnore + private final Set foundEntities = new HashSet<>(); + private long numberOfEntities; private int minDate = Integer.MAX_VALUE; private int maxDate = Integer.MIN_VALUE; @JsonIgnore public CDateRange getSpan() { - if(minDate == Integer.MAX_VALUE && maxDate == Integer.MIN_VALUE) { + if (minDate == Integer.MAX_VALUE && maxDate == Integer.MIN_VALUE) { return null; } @@ -99,32 +99,32 @@ public CDateRange getSpan() { ); } - public void addEvent(Table table, Bucket bucket, int event, String entityForEvent) { - numberOfEvents++; - if (foundEntities.add(entityForEvent)) { - numberOfEntities++; - } + public void addEvent(Table table, Bucket bucket, int event, String entityForEvent) { + numberOfEvents++; + if (foundEntities.add(entityForEvent)) { + numberOfEntities++; + } - for (Column c : table.getColumns()) { - if (!c.getType().isDateCompatible()) { - continue; - } + for (Column c : table.getColumns()) { + if (!c.getType().isDateCompatible()) { + continue; + } - if (!bucket.has(event, c)) { - continue; - } + if (!bucket.has(event, c)) { + continue; + } - final CDateRange time = bucket.getAsDateRange(event, c); + final CDateRange time = bucket.getAsDateRange(event, c); - if (time.hasUpperBound()){ + if (time.hasUpperBound()) { maxDate = Math.max(time.getMaxValue(), maxDate); } - if (time.hasLowerBound()){ + if (time.hasLowerBound()) { minDate = Math.min(time.getMinValue(), minDate); } - } - } - } + } + } + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/AndCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/AndCondition.java index 86c7dbc772..ed2f58edc1 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/AndCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/AndCondition.java @@ -1,7 +1,11 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; @@ -9,8 +13,6 @@ import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.util.CalculatedValue; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; import lombok.Getter; import lombok.Setter; @@ -52,4 +54,13 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { () -> new IllegalStateException("At least one condition is required to convert %s to a SQL condition.".formatted(getClass())) ); } + + @Override + public Set getAuxillaryColumns() { + final Set columns = new HashSet<>(); + for (CTCondition ctCondition : conditions) { + columns.addAll(ctCondition.getAuxillaryColumns()); + } + return columns; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/CTCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/CTCondition.java index cdcc880ad2..f93e9e63ea 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/CTCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/CTCondition.java @@ -1,6 +1,7 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; import java.util.Map; +import java.util.Set; import com.bakdata.conquery.io.cps.CPSBase; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; @@ -8,19 +9,24 @@ import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.util.CalculatedValue; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeInfo; /** * A general condition that serves as a guard for concept tree nodes. */ -@JsonTypeInfo(use=JsonTypeInfo.Id.CUSTOM, property="type") +@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @CPSBase public interface CTCondition { - default void init(ConceptTreeNode node) throws ConceptConfigurationException {} - + default void init(ConceptTreeNode node) throws ConceptConfigurationException { + } + boolean matches(String value, CalculatedValue> rowMap) throws ConceptConfigurationException; WhereCondition convertToSqlCondition(CTConditionContext context); + @JsonIgnore + Set getAuxillaryColumns(); + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/ColumnEqualCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/ColumnEqualCondition.java index 1b98784e95..a5adf79bd4 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/ColumnEqualCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/ColumnEqualCondition.java @@ -2,6 +2,7 @@ import java.util.Map; import java.util.Set; +import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; @@ -10,7 +11,6 @@ import com.bakdata.conquery.util.CalculatedValue; import com.bakdata.conquery.util.CollectionsUtil; import com.fasterxml.jackson.annotation.JsonCreator; -import jakarta.validation.constraints.NotEmpty; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -21,13 +21,17 @@ /** * This condition requires the value of another column to be equal to a given value. */ -@CPSType(id="COLUMN_EQUAL", base=CTCondition.class) +@CPSType(id = "COLUMN_EQUAL", base = CTCondition.class) @AllArgsConstructor(access = AccessLevel.PRIVATE) public class ColumnEqualCondition implements CTCondition { - @Setter @Getter @NotEmpty + @Setter + @Getter + @NotEmpty private Set values; - @NotEmpty @Setter @Getter + @NotEmpty + @Setter + @Getter private String column; @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) @@ -38,7 +42,7 @@ public static ColumnEqualCondition create(Set values, String column) { @Override public boolean matches(String value, CalculatedValue> rowMap) { Object checkedValue = rowMap.getValue().get(column); - if(checkedValue == null) { + if (checkedValue == null) { return false; } return values.contains(checkedValue.toString()); @@ -49,4 +53,9 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { Field field = DSL.field(DSL.name(context.getConnectorTable().getName(), column), String.class); return new MultiSelectCondition(field, values.toArray(String[]::new), context.getFunctionProvider()); } + + @Override + public Set getAuxillaryColumns() { + return Set.of(column); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/EqualCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/EqualCondition.java index 80e3e104a6..de6ff96d60 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/EqualCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/EqualCondition.java @@ -1,8 +1,8 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; +import java.util.Collections; import java.util.Map; import java.util.Set; - import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; @@ -21,11 +21,13 @@ /** * This condition requires each value to be exactly as given in the list. */ -@CPSType(id="EQUAL", base=CTCondition.class) +@CPSType(id = "EQUAL", base = CTCondition.class) @AllArgsConstructor public class EqualCondition implements CTCondition { - @Setter @Getter @NotEmpty + @Setter + @Getter + @NotEmpty private Set values; @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) @@ -43,4 +45,9 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { Field field = DSL.field(DSL.name(context.getConnectorTable().getName(), context.getConnectorColumn().getName()), String.class); return new MultiSelectCondition(field, values.toArray(String[]::new), context.getFunctionProvider()); } + + @Override + public Set getAuxillaryColumns() { + return Collections.emptySet(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/GroovyCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/GroovyCondition.java index 6dc5829a95..910a932c1b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/GroovyCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/GroovyCondition.java @@ -2,8 +2,8 @@ import java.time.LocalDate; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; - import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; @@ -81,6 +81,10 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { throw new UnsupportedOperationException("SQL conversion of CTCondition %s not supported yet.".formatted(getClass())); } + @Override + public Set getAuxillaryColumns() { + return Set.of(); + } public abstract static class ConditionScript extends Script { diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/IsPresentCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/IsPresentCondition.java index 0db1083b1f..dd8848a0b6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/IsPresentCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/IsPresentCondition.java @@ -1,6 +1,7 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; import java.util.Map; +import java.util.Set; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; @@ -17,10 +18,11 @@ /** * This condition requires that the selected Column has a value. */ -@CPSType(id="PRESENT", base=CTCondition.class) +@CPSType(id = "PRESENT", base = CTCondition.class) public class IsPresentCondition implements CTCondition { - @Getter @Setter + @Getter + @Setter @NonNull private String column; @@ -34,4 +36,9 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { Condition condition = DSL.field(DSL.name(context.getConnectorTable().getName(), column)).isNotNull(); return new WhereConditionWrapper(condition, ConditionType.PREPROCESSING); } + + @Override + public Set getAuxillaryColumns() { + return Set.of(column); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/NotCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/NotCondition.java index ac628ab371..01dcc3b6ab 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/NotCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/NotCondition.java @@ -1,7 +1,7 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; import java.util.Map; - +import java.util.Set; import jakarta.validation.Valid; import com.bakdata.conquery.io.cps.CPSType; @@ -16,10 +16,12 @@ /** * This condition matches if its child does not. */ -@CPSType(id="NOT", base=CTCondition.class) +@CPSType(id = "NOT", base = CTCondition.class) public class NotCondition implements CTCondition { - @Setter @Getter @Valid + @Setter + @Getter + @Valid private CTCondition condition; @Override @@ -37,4 +39,9 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { WhereCondition whereCondition = condition.convertToSqlCondition(context); return whereCondition.negate(); } + + @Override + public Set getAuxillaryColumns() { + return condition.getAuxillaryColumns(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/OrCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/OrCondition.java index 4773fdaf73..ef9bce878a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/OrCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/OrCondition.java @@ -1,8 +1,9 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; +import java.util.HashSet; import java.util.List; import java.util.Map; - +import java.util.Set; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; @@ -53,4 +54,13 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { () -> new IllegalStateException("At least one condition is required to convert %s to a SQL condition.".formatted(getClass())) ); } + + @Override + public Set getAuxillaryColumns() { + final Set columns = new HashSet<>(); + for (CTCondition ctCondition : conditions) { + columns.addAll(ctCondition.getAuxillaryColumns()); + } + return columns; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java index 0aa21b8cdc..5f53b98730 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java @@ -1,8 +1,11 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; import java.util.Arrays; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; @@ -10,7 +13,6 @@ import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.sql.conversion.model.filter.WhereConditionWrapper; import com.bakdata.conquery.util.CalculatedValue; -import jakarta.validation.constraints.NotEmpty; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -47,4 +49,9 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { Condition condition = context.getFunctionProvider().likeRegex(field, pattern); return new WhereConditionWrapper(condition, ConditionType.PREPROCESSING); } + + @Override + public Set getAuxillaryColumns() { + return Collections.emptySet(); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java index 48002cc68d..e24f094813 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java @@ -1,6 +1,8 @@ package com.bakdata.conquery.models.datasets.concepts.conditions; +import java.util.Collections; import java.util.Map; +import java.util.Set; import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.io.cps.CPSType; @@ -61,6 +63,11 @@ public WhereCondition convertToSqlCondition(CTConditionContext context) { return new WhereConditionWrapper(regexCondition, ConditionType.PREPROCESSING); } + @Override + public Set getAuxillaryColumns() { + return Collections.emptySet(); + } + private String buildSqlRegexPattern(SqlFunctionProvider functionProvider) { StringBuilder builder = new StringBuilder(); char[] minChars = min.toCharArray(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java index 9d5821b198..11546e447e 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java @@ -26,6 +26,7 @@ @Getter @ToString public class UpdateElementMatchingStats extends NamespaceMessage { + private final WorkerId source; @ToString.Exclude @@ -56,8 +57,9 @@ public void react(DistributedNamespace context) throws Exception { matchingStats = new MatchingStats(); target.setMatchingStats(matchingStats); } - matchingStats.putEntry(source, value); - } catch (Exception e) { + matchingStats.putEntry(source.getWorker(), value); + } + catch (Exception e) { log.error("Failed to set matching stats for '{}' (enable TRACE for exception)", entry.getKey(), (Exception) (log.isTraceEnabled() ? e : null)); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java index 58d00e6580..c8e62616f6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -10,11 +11,19 @@ import com.bakdata.conquery.io.storage.NamespaceStorage; import com.bakdata.conquery.mode.local.SqlEntityResolver; import com.bakdata.conquery.mode.local.SqlStorageHandler; +import com.bakdata.conquery.mode.local.UpdateMatchingStatsSqlJob; +import com.bakdata.conquery.models.config.DatabaseConfig; +import com.bakdata.conquery.models.config.SqlConnectorConfig; +import com.bakdata.conquery.models.config.ThreadPoolDefinition; import com.bakdata.conquery.models.datasets.Column; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; +import com.bakdata.conquery.models.jobs.Job; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.FilterSearch; import com.bakdata.conquery.sql.DSLContextWrapper; +import com.bakdata.conquery.sql.conversion.dialect.SqlDialect; +import com.bakdata.conquery.sql.execution.SqlExecutionService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -23,12 +32,23 @@ @Slf4j public class LocalNamespace extends Namespace { + private final ThreadPoolDefinition executionPool; + private final SqlConnectorConfig sqlConnectorConfig; + private final DatabaseConfig databaseConfig; + private final SqlDialect sqlDialect; + private final SqlExecutionService sqlExecutionService; private final DSLContextWrapper dslContextWrapper; private final SqlStorageHandler storageHandler; + private final ThreadPoolExecutor executorService; public LocalNamespace( ObjectMapper preprocessMapper, NamespaceStorage storage, + ThreadPoolDefinition executionPool, + SqlConnectorConfig sqlConnectorConfig, + DatabaseConfig databaseConfig, + SqlDialect sqlDialect, + SqlExecutionService sqlExecutionService, ExecutionManager executionManager, DSLContextWrapper dslContextWrapper, SqlStorageHandler storageHandler, @@ -38,13 +58,28 @@ public LocalNamespace( List injectables ) { super(preprocessMapper, storage, executionManager, jobManager, filterSearch, sqlEntityResolver, injectables); + this.executionPool = executionPool; + this.sqlConnectorConfig = sqlConnectorConfig; + this.databaseConfig = databaseConfig; + this.sqlDialect = sqlDialect; + this.sqlExecutionService = sqlExecutionService; this.dslContextWrapper = dslContextWrapper; this.storageHandler = storageHandler; + // TODO FK: hoist into Namespace and use at other places && is this the correct way to name them? + this.executorService = this.executionPool.createService("namespace %s worker".formatted(storage.getPathName())); } @Override void updateMatchingStats() { - // TODO Build basic statistic on data + final Set concepts = getConceptsWithoutMatchingStats(); + Job job = new UpdateMatchingStatsSqlJob( + databaseConfig, + sqlExecutionService, + sqlDialect.getFunctionProvider(), + concepts, + executorService + ); + getJobManager().addSlowJob(job); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java index 0e04fa8e81..7696cf4334 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/Namespace.java @@ -4,6 +4,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver; import com.bakdata.conquery.io.jackson.Injectable; @@ -11,8 +12,10 @@ import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.PreviewConfig; +import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Searchable; import com.bakdata.conquery.models.datasets.concepts.select.connector.specific.MappableSingleColumnSelect; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.jobs.SimpleJob; import com.bakdata.conquery.models.jobs.UpdateFilterSearchJob; @@ -54,14 +57,16 @@ public Dataset getDataset() { public void close() { try { jobManager.close(); - } catch (Exception e) { + } + catch (Exception e) { log.error("Unable to close namespace jobmanager of {}", this, e); } try { log.info("Closing namespace storage of {}", getStorage().getDataset().getId()); storage.close(); - } catch (IOException e) { + } + catch (IOException e) { log.error("Unable to close namespace storage of {}.", this, e); } } @@ -69,7 +74,8 @@ public void close() { public void remove() { try { jobManager.close(); - } catch (Exception e) { + } + catch (Exception e) { log.error("Unable to close namespace jobmanager of {}", this, e); } @@ -135,4 +141,11 @@ public void postprocessData() { )); } + + protected Set getConceptsWithoutMatchingStats() { + return getStorage().getAllConcepts() + .filter(concept -> concept.getMatchingStats() == null) + .map(Concept::getId) + .collect(Collectors.toSet()); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java index 1dcd4286cc..2e644ae12f 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java @@ -89,7 +89,8 @@ public ColumnDateRange forCDateRange(CDateRange daterange) { if (daterange.hasUpperBound()) { // end date is expected to be handled as exclusive, but if it's already the maximum date, we can't add +1 day if (Objects.equals(daterange.getMax(), LocalDate.ofEpochDay(CDateRange.POSITIVE_INFINITY))) { - throw new UnsupportedOperationException("Given daterange has an upper bound of CDateRange.POSITIVE_INFINITY, which is not supported by ConQuery's HANA dialect."); + throw new UnsupportedOperationException( + "Given daterange has an upper bound of CDateRange.POSITIVE_INFINITY, which is not supported by ConQuery's HANA dialect."); } LocalDate exclusiveMaxDate = daterange.getMax().plusDays(1); endDateExpression = exclusiveMaxDate.toString(); @@ -147,6 +148,11 @@ public ColumnDateRange aggregated(ColumnDateRange columnDateRange) { .as(columnDateRange.getAlias()); } + @Override + public ColumnDateRange nulled(ColumnDateRange columnDateRange) { + return ColumnDateRange.of(toDateField(null), toDateField(null)).as(columnDateRange.getAlias()); + } + @Override public ColumnDateRange toDualColumn(ColumnDateRange columnDateRange) { // HANA does not support single column ranges diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java index 8b1d166155..eca8ca1452 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java @@ -36,6 +36,8 @@ private static final String INFINITY_DATE_VALUE = "infinity"; private static final String MINUS_INFINITY_DATE_VALUE = "-infinity"; private static final String ANY_CHAR_REGEX = "%"; + private static final String RANGE_EXCLUSIVE_END = "[)"; + private static final String RANGE_INCLUSIVE_END = "[]"; @Override public String getMaxDateExpression() { @@ -90,7 +92,7 @@ public ColumnDateRange forCDateRange(CDateRange daterange) { Object.class, DSL.val(startDateExpression), DSL.val(endDateExpression), - DSL.val("[]") + DSL.val(RANGE_INCLUSIVE_END) ); return ColumnDateRange.of(daterangeField); @@ -133,6 +135,12 @@ public ColumnDateRange aggregated(ColumnDateRange columnDateRange) { return ColumnDateRange.of(rangeAgg(columnDateRange)).as(columnDateRange.getAlias()); } + @Override + public ColumnDateRange nulled(ColumnDateRange columnDateRange) { + ensureIsSingleColumnRange(columnDateRange); + return ColumnDateRange.of(DSL.field("null::daterange")).as(columnDateRange.getAlias()); + } + @Override public ColumnDateRange toDualColumn(ColumnDateRange columnDateRange) { Field daterange = columnDateRange.getRange(); @@ -176,10 +184,14 @@ public Field daterangeStringAggregation(ColumnDateRange columnDateRange) @Override public Field daterangeStringExpression(ColumnDateRange columnDateRange) { + Field daterange; if (!columnDateRange.isSingleColumnRange()) { - throw new UnsupportedOperationException("All column date ranges should have been converted to single column ranges."); + daterange = daterange(columnDateRange.getStart(), columnDateRange.getEnd(), RANGE_EXCLUSIVE_END); + } + else { + daterange = columnDateRange.getRange(); } - Field aggregatedValidityDate = DSL.field("({0})::{1}", String.class, columnDateRange.getRange(), DSL.keyword("varchar")); + Field aggregatedValidityDate = DSL.field("({0})::{1}", String.class, daterange, DSL.keyword("varchar")); return replace(aggregatedValidityDate, INFINITY_DATE_VALUE, INFINITY_SIGN); } @@ -303,7 +315,7 @@ private ColumnDateRange toColumnDateRange(CDateRange dateRestriction) { Object.class, toDateField(startDateExpression), toDateField(endDateExpression), - DSL.val("[]") + DSL.val(RANGE_INCLUSIVE_END) ); return ColumnDateRange.of(dateRestrictionRange); @@ -333,14 +345,14 @@ private ColumnDateRange ofSingleColumn(String tableName, Column column) { DSL.function("upper", Date.class, daterange), toDateField(INFINITY_DATE_VALUE) ); - yield daterange(startColumn, endColumn, "[]"); + yield daterange(startColumn, endColumn, RANGE_INCLUSIVE_END); } // if the validity date column is not of daterange type, we construct it manually case DATE -> { Field singleDate = DSL.field(DSL.name(tableName, column.getName()), Date.class); Field startColumn = DSL.coalesce(singleDate, toDateField(MINUS_INFINITY_DATE_VALUE)); Field endColumn = DSL.coalesce(singleDate, toDateField(INFINITY_DATE_VALUE)); - yield daterange(startColumn, endColumn, "[]"); + yield daterange(startColumn, endColumn, RANGE_INCLUSIVE_END); } default -> throw new IllegalArgumentException( "Given column type '%s' can't be converted to a proper date restriction.".formatted(column.getType()) @@ -361,13 +373,13 @@ private ColumnDateRange ofStartAndEnd(String tableName, Column startColumn, Colu toDateField(INFINITY_DATE_VALUE) ); - return ColumnDateRange.of(daterange(start, end, "[]")); + return ColumnDateRange.of(daterange(start, end, RANGE_INCLUSIVE_END)); } private ColumnDateRange ensureIsSingleColumnRange(ColumnDateRange daterange) { return daterange.isSingleColumnRange() ? daterange - : ColumnDateRange.of(daterange(daterange.getStart(), daterange.getEnd(), "[)")); // end is already exclusive + : ColumnDateRange.of(daterange(daterange.getStart(), daterange.getEnd(), RANGE_EXCLUSIVE_END)); // end is already exclusive } } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java index 46572215ea..f6fd446e9f 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java @@ -84,6 +84,11 @@ public interface SqlFunctionProvider { ColumnDateRange aggregated(ColumnDateRange columnDateRange); + /** + * Create an aliased null-value {@link ColumnDateRange} from the given range. Example: {@code null::daterange as "date_range"} + */ + ColumnDateRange nulled(ColumnDateRange columnDateRange); + /** * Given a single-column {@link ColumnDateRange}, it will create a new {@link ColumnDateRange} with a start and end field. * For dialects that don't support single-column ranges, it will create a copy of the given {@link ColumnDateRange}. diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultResultSetProcessor.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultResultSetProcessor.java index b17d3db261..531bd7fbe5 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultResultSetProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultResultSetProcessor.java @@ -11,13 +11,15 @@ import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.util.DateReader; +import lombok.Getter; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor class DefaultResultSetProcessor implements ResultSetProcessor { private final ConqueryConfig config; - private final SqlCDateSetParser sqlCDateSetParser; + @Getter + private final SqlCDateSetParser cDateSetParser; @Override public String getString(ResultSet resultSet, int columnIndex) throws SQLException { @@ -60,12 +62,12 @@ public Integer getDate(ResultSet resultSet, int columnIndex) throws SQLException @Override public List getDateRange(ResultSet resultSet, int columnIndex) throws SQLException { - return this.sqlCDateSetParser.toEpochDayRange(resultSet.getString(columnIndex)); + return this.cDateSetParser.toEpochDayRange(resultSet.getString(columnIndex)); } @Override public List> getDateRangeList(ResultSet resultSet, int columnIndex) throws SQLException { - return this.sqlCDateSetParser.toEpochDayRangeList(resultSet.getString(columnIndex)); + return this.cDateSetParser.toEpochDayRangeList(resultSet.getString(columnIndex)); } @Override @@ -119,7 +121,7 @@ private List fromString(ResultSet resultSet, int columnIndex, Function * For example, calling a primitives' ResultSet getter like getDouble, getInt etc. straightaway will never return null. */ - private static T checkForNullElseGet(ResultSet resultSet, int columnIndex, Getter getter, Class resultType) throws SQLException { + private static T checkForNullElseGet(ResultSet resultSet, int columnIndex, GetMethod getter, Class resultType) throws SQLException { if (resultSet.getObject(columnIndex) == null) { return null; diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultSqlCDateSetParser.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultSqlCDateSetParser.java index 29b7fca7dc..8f7bf0c839 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultSqlCDateSetParser.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/DefaultSqlCDateSetParser.java @@ -12,6 +12,11 @@ public class DefaultSqlCDateSetParser implements SqlCDateSetParser { + /** + * Postgres daterange function creates this expression when called with null-arguments instead of null. + */ + public static final String POSTGRES_NULL_RANGE = "(,)"; + public static final String EMPTY_RANGE_BRACES = "{}"; public static final String DATE_SEPARATOR = ","; public static final char INCLUDED_START_CHAR = '['; @@ -37,12 +42,12 @@ public List> toEpochDayRangeList(String multiDateRange) { @Override public List toEpochDayRange(String daterange) { - if (daterange == null) { + if (daterange == null || daterange.equals(POSTGRES_NULL_RANGE)) { return Collections.emptyList(); } String[] dates = daterange.split(DATE_SEPARATOR); - Preconditions.checkArgument(dates.length == 2, "Dateranges must have a start and end."); + Preconditions.checkArgument(dates.length == 2, "Dateranges must have a start and end. Input was: %s".formatted(daterange)); // the dateranges have always an included start date marked by a [ String startDateExpression = dates[0]; diff --git a/backend/src/main/java/com/bakdata/conquery/sql/execution/ResultSetProcessor.java b/backend/src/main/java/com/bakdata/conquery/sql/execution/ResultSetProcessor.java index 074716073d..a7e6498751 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/execution/ResultSetProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/execution/ResultSetProcessor.java @@ -9,6 +9,8 @@ public interface ResultSetProcessor { char UNIT_SEPARATOR = (char) 31; // https://www.ascii-code.com/character/%E2%90%9F + SqlCDateSetParser getCDateSetParser(); + String getString(ResultSet resultSet, int columnIndex) throws SQLException; Integer getInteger(ResultSet resultSet, int columnIndex) throws SQLException; diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java index c6573cb181..353fd2d31c 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java @@ -3,17 +3,15 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.LocalDate; +import java.util.Set; import com.bakdata.conquery.integration.IntegrationTest; import com.bakdata.conquery.integration.json.ConqueryTestSpec; import com.bakdata.conquery.integration.json.JsonIntegrationTest; import com.bakdata.conquery.models.common.daterange.CDateRange; -import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.exceptions.ValidatorHelper; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.messages.namespaces.specific.UpdateMatchingStatsMessage; -import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.util.support.StandaloneSupport; import com.github.powerlibraries.io.In; import lombok.extern.slf4j.Slf4j; @@ -21,6 +19,11 @@ @Slf4j public class MetadataCollectionTest extends IntegrationTest.Simple implements ProgrammaticIntegrationTest { + @Override + public Set forModes() { + return Set.of(StandaloneSupport.Mode.SQL, StandaloneSupport.Mode.WORKER); + } + @Override public void execute(StandaloneSupport conquery) throws Exception { //read test sepcification @@ -34,10 +37,7 @@ public void execute(StandaloneSupport conquery) throws Exception { test.importRequiredData(conquery); //ensure the metadata is collected - DistributedNamespace namespace = (DistributedNamespace) conquery.getNamespace(); - namespace.getWorkerHandler() - .sendToAll(new UpdateMatchingStatsMessage(conquery.getNamespace().getStorage().getAllConcepts().map(Concept::getId).toList())); - + conquery.getNamespace().postprocessData(); conquery.waitUntilWorkDone(); TreeConcept concept = (TreeConcept) conquery.getNamespace().getStorage().getAllConcepts().iterator().next(); @@ -47,13 +47,13 @@ public void execute(StandaloneSupport conquery) throws Exception { assertThat(concept.getChildren()).allSatisfy(c -> { assertThat(c.getMatchingStats().countEvents()).isEqualTo(2); }); - + //check the date ranges assertThat(concept.getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2013-11-10"))); + .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2013-11-10"))); assertThat(concept.getChildren().get(0).getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2012-01-01"), LocalDate.parse("2013-11-10"))); + .isEqualTo(CDateRange.of(LocalDate.parse("2012-01-01"), LocalDate.parse("2013-11-10"))); assertThat(concept.getChildren().get(1).getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2012-11-11"))); + .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2012-11-11"))); } } diff --git a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java index 8938a83e1c..f8e9e4e576 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java @@ -11,80 +11,78 @@ public class MatchingStatsTests { - private final WorkerId workerId1 = new WorkerId(new DatasetId("sampleDataset"), "sampleWorker"); - private final WorkerId workerId2 = new WorkerId(new DatasetId("sampleDataset2"), "sampleWorker2"); + private final WorkerId workerId1 = new WorkerId(new DatasetId("sampleDataset"), "sampleWorker"); + private final WorkerId workerId2 = new WorkerId(new DatasetId("sampleDataset2"), "sampleWorker2"); - @Test - public void entitiesCountTest() { + @Test + public void entitiesCountTest() { - MatchingStats stats = new MatchingStats(); + MatchingStats stats = new MatchingStats(); - assertThat(stats.countEntities()).isEqualTo(0); + assertThat(stats.countEntities()).isEqualTo(0); - stats.putEntry(workerId1, new MatchingStats.Entry(5, 5, 10, 20)); - assertThat(stats.countEntities()).isEqualTo(5); + stats.putEntry(workerId1.getWorker(), new MatchingStats.Entry(5, 5, 10, 20)); + assertThat(stats.countEntities()).isEqualTo(5); - stats.putEntry(workerId1, new MatchingStats.Entry(5, 8, 10, 20)); - assertThat(stats.countEntities()).isEqualTo(8); + stats.putEntry(workerId1.getWorker(), new MatchingStats.Entry(5, 8, 10, 20)); + assertThat(stats.countEntities()).isEqualTo(8); - stats.putEntry(workerId2, new MatchingStats.Entry(5, 2, 10, 20)); - assertThat(stats.countEntities()).isEqualTo(10); + stats.putEntry(workerId2.getWorker(), new MatchingStats.Entry(5, 2, 10, 20)); + assertThat(stats.countEntities()).isEqualTo(10); - } + } - @Test - public void addEventTest(){ - MatchingStats stats = new MatchingStats(); - Table table = new Table(); - table.setColumns(new Column[0]); + @Test + public void addEventTest() { + MatchingStats stats = new MatchingStats(); + Table table = new Table(); + table.setColumns(new Column[0]); - assertThat(stats.countEvents()).isEqualTo(0); - assertThat(stats.countEntities()).isEqualTo(0); + assertThat(stats.countEvents()).isEqualTo(0); + assertThat(stats.countEntities()).isEqualTo(0); - MatchingStats.Entry entry1 = new MatchingStats.Entry(); - entry1.addEvent(table, null, 1, "1"); - entry1.addEvent(table, null, 2, "1"); + MatchingStats.Entry entry1 = new MatchingStats.Entry(); + entry1.addEvent(table, null, 1, "1"); + entry1.addEvent(table, null, 2, "1"); - entry1.addEvent(table, null, 3, "2"); - entry1.addEvent(table, null, 4, "2"); + entry1.addEvent(table, null, 3, "2"); + entry1.addEvent(table, null, 4, "2"); - entry1.addEvent(table, null, 5, "3"); - entry1.addEvent(table, null, 6, "3"); + entry1.addEvent(table, null, 5, "3"); + entry1.addEvent(table, null, 6, "3"); - entry1.addEvent(table, null, 7, "4"); - entry1.addEvent(table, null, 8, "4"); + entry1.addEvent(table, null, 7, "4"); + entry1.addEvent(table, null, 8, "4"); + stats.putEntry(workerId1.getWorker(), entry1); + assertThat(stats.countEvents()).isEqualTo(8); + assertThat(stats.countEntities()).isEqualTo(4); - stats.putEntry(workerId1, entry1); - assertThat(stats.countEvents()).isEqualTo(8); - assertThat(stats.countEntities()).isEqualTo(4); + MatchingStats.Entry entry2 = new MatchingStats.Entry(); - MatchingStats.Entry entry2 = new MatchingStats.Entry(); + entry2.addEvent(table, null, 1, "1"); + entry2.addEvent(table, null, 2, "2"); - entry2.addEvent(table, null, 1, "1"); - entry2.addEvent(table, null, 2, "2"); + entry2.addEvent(table, null, 3, "3"); + entry2.addEvent(table, null, 4, "4"); - entry2.addEvent(table, null, 3, "3"); - entry2.addEvent(table, null, 4, "4"); + entry2.addEvent(table, null, 5, "5"); + entry2.addEvent(table, null, 6, "6"); - entry2.addEvent(table, null, 5, "5"); - entry2.addEvent(table, null, 6, "6"); + entry2.addEvent(table, null, 7, "7"); + entry2.addEvent(table, null, 8, "8"); - entry2.addEvent(table, null, 7, "7"); - entry2.addEvent(table, null, 8, "8"); + entry2.addEvent(table, null, 9, "9"); + entry2.addEvent(table, null, 10, "10"); - entry2.addEvent(table, null, 9, "9"); - entry2.addEvent(table, null, 10, "10"); + stats.putEntry(workerId2.getWorker(), entry2); + assertThat(stats.countEvents()).isEqualTo(18); + assertThat(stats.countEntities()).isEqualTo(14); - stats.putEntry(workerId2, entry2); - assertThat(stats.countEvents()).isEqualTo(18); - assertThat(stats.countEntities()).isEqualTo(14); - - - } + } }