Skip to content

Commit

Permalink
Merge pull request #3192 from ingef/feature/cache-MultiSelectFilterNode
Browse files Browse the repository at this point in the history
move caching of selectedValues into QueryExecutionContext
  • Loading branch information
awildturtok authored Nov 1, 2023
2 parents afc7843 + beb8fee commit 8a3dc9e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@

import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import com.bakdata.conquery.io.storage.ModificationShieldedWorkerStorage;
import com.bakdata.conquery.models.common.CDate;
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.SecondaryIdDescription;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.ValidityDate;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketManager;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.query.entity.Entity;
import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator;
import groovy.lang.Tuple3;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -43,6 +50,28 @@ public class QueryExecutionContext {
@NonNull
private Optional<Aggregator<CDateSet>> queryDateAggregator = Optional.empty();

@Getter(AccessLevel.NONE)
private final Map<Tuple3<Column, Import, String[]>, int[]> multiSelectValuesCache = new ConcurrentHashMap<>();


private static int[] findIds(Column column, Bucket bucket, String[] values) {
final int[] selectedValues = new int[values.length];

final StringStore type = (StringStore) bucket.getStore(column);

for (int index = 0; index < values.length; index++) {
final String select = values[index];
final int parsed = type.getId(select);

selectedValues[index] = parsed;
}

return selectedValues;
}

public int[] getIdsFor(Column column, Bucket bucket, String[] values) {
return multiSelectValuesCache.computeIfAbsent(new Tuple3<>(column, bucket.getImp(), values), (ignored) -> findIds(column, bucket, values));
}

/**
* Only set when in {@link com.bakdata.conquery.models.query.queryplan.SecondaryIdQueryPlan}, to the selected {@link SecondaryIdDescriptionId}.
Expand All @@ -58,4 +87,5 @@ public List<Bucket> getEntityBucketsForTable(Entity entity, Table table) {
boolean isQueryCancelled() {
return executor.isCancelled(executionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import javax.validation.constraints.NotNull;

import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.entity.Entity;
import com.bakdata.conquery.models.query.queryplan.filter.EventFilterNode;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -32,46 +31,35 @@ public class MultiSelectFilterNode extends EventFilterNode<String[]> {

private final boolean empty;

/**
* Shared between all executing Threads to maximize utilization.
*/
private ConcurrentMap<Import, int[]> selectedValuesCache;
private int[] selectedValues;
private QueryExecutionContext context;

public MultiSelectFilterNode(Column column, String[] filterValue) {
super(filterValue);
this.column = column;
selectedValuesCache = new ConcurrentHashMap<>();
empty = Arrays.stream(filterValue).anyMatch(Strings::isEmpty);
}


@Override
public void init(Entity entity, QueryExecutionContext context) {
super.init(entity, context);
this.context = context;
selectedValues = null;
}

@Override
public void setFilterValue(String[] strings) {
selectedValuesCache = new ConcurrentHashMap<>();
selectedValues = null;
super.setFilterValue(strings);
}

@Override
public void nextBlock(Bucket bucket) {
selectedValues = selectedValuesCache.computeIfAbsent(bucket.getImp(),imp -> findIds(bucket, filterValue));
selectedValues = context.getIdsFor(column, bucket, filterValue);
}

private int[] findIds(Bucket bucket, String[] values) {
final int[] selectedValues = new int[values.length];

final StringStore type = (StringStore) bucket.getStore(getColumn());

for (int index = 0; index < values.length; index++) {
final String select = values[index];
final int parsed = type.getId(select);

selectedValues[index] = parsed;
}

return selectedValues;
}


@Override
Expand Down

0 comments on commit 8a3dc9e

Please sign in to comment.