Skip to content

Commit

Permalink
Use a competitive iterator in FiltersAggregator (elastic#98360)
Browse files Browse the repository at this point in the history
* Use a competitive iterator in FiltersAggregator.

The iterator is used to combine filtering with querying in leaf
collection. Its benefit is that rangers with docs that are filtered out
by all filters are skipped from doc collection.

The competitive iterator is restricted to FiltersAggregator, not used in
FilterByFilterAggregator that's already optimized. It only applies to
top-level filter aggregations with no "other" bucket defined; the latter
leads to collecting all docs so there's no point in skipping doc ranges.

Fixes elastic#97544

* Fix function name.

* Advance iterator on two-phase mismatch.

* Restore docid tracking.

* Fix failing tests.

* Fix failing test.

* Fix more tests.

* Update docs/changelog/98360.yaml

* More test fixes.

* Update docs/changelog/98360.yaml

* Skip checking useCompetitiveIterator in collect

* Find approximate matches in CompetitiveIterator

* Use DisiPriorityQueue to simplify FiltersAggregator

* Skip competitive iterator when all docs match.

* Check for empty priority queue.
  • Loading branch information
kkrik-es authored Sep 5, 2023
1 parent 0421c4f commit 8af9b4a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/98360.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 98360
summary: Use a competitive iterator in `FiltersAggregator`
area: Aggregations
type: enhancement
issues:
- 97544
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private FilterByFilterAggregator(
@Override
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
assert scoreMode().needsScores() == false;
if (QueryToFilterAdapter.MatchesNoDocs(filters())) {
if (QueryToFilterAdapter.matchesNoDocs(filters())) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
Bits live = aggCtx.getLeafReaderContext().reader().getLiveDocs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
package org.elasticsearch.search.aggregations.bucket.filter;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DisiPriorityQueue;
import org.apache.lucene.search.DisiWrapper;
import org.apache.lucene.search.DisjunctionDISIApproximation;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -38,7 +43,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.IntPredicate;
import java.util.function.LongPredicate;

/**
Expand Down Expand Up @@ -289,26 +293,80 @@ static class Compatible extends FiltersAggregator {

@Override
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
if (QueryToFilterAdapter.MatchesNoDocs(filters()) && otherBucketKey == null) {
if (QueryToFilterAdapter.matchesNoDocs(filters()) && otherBucketKey == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

IntPredicate[] docFilters = new IntPredicate[filters().size()];
for (int filterOrd = 0; filterOrd < filters().size(); filterOrd++) {
docFilters[filterOrd] = filters().get(filterOrd).matchingDocIds(aggCtx.getLeafReaderContext());
final int numFilters = filters().size();

// A DocIdSetIterator heap with one entry for each filter, ordered by doc ID
final DisiPriorityQueue filterIterators = new DisiPriorityQueue(numFilters);

long totalCost = 0;
for (int filterOrd = 0; filterOrd < numFilters; filterOrd++) {
Scorer randomAccessScorer = filters().get(filterOrd).randomAccessScorer(aggCtx.getLeafReaderContext());
if (randomAccessScorer == null) {
continue;
}
FilterMatchingDisiWrapper w = new FilterMatchingDisiWrapper(randomAccessScorer, filterOrd);
totalCost += randomAccessScorer.iterator().cost();
filterIterators.add(w);
}

// Restrict the use of competitive iterator when there's no parent agg, no 'other' bucket (all values are accessed then)
// and the total cost of per-filter doc iterators is smaller than maxDoc, indicating that there are docs matching the main
// query but no filter queries.
final boolean useCompetitiveIterator = (parent == null
&& otherBucketKey == null
&& filterIterators.size() > 0
&& totalCost < aggCtx.getLeafReaderContext().reader().maxDoc());

return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
boolean matched = false;
for (int i = 0; i < docFilters.length; i++) {
if (docFilters[i].test(doc)) {
collectBucket(sub, doc, bucketOrd(bucket, i));
matched = true;
if (filterIterators.size() > 0) {
// Advance filters if necessary. Filters will already be advanced if used as a competitive iterator.
DisiWrapper top = filterIterators.top();
while (top.doc < doc) {
top.doc = top.approximation.advance(doc);
top = filterIterators.updateTop();
}

if (top.doc == doc) {
for (DisiWrapper w = filterIterators.topList(); w != null; w = w.next) {
// It would be nice if DisiPriorityQueue supported generics to avoid unchecked casts.
FilterMatchingDisiWrapper topMatch = (FilterMatchingDisiWrapper) w;

// We need to cache the result of twoPhaseView.matches() since it's illegal to call it multiple times on the
// same doc, yet LeafBucketCollector#collect may be called multiple times with the same doc and multiple
// buckets.
if (topMatch.lastCheckedDoc < doc) {
topMatch.lastCheckedDoc = doc;
if (topMatch.twoPhaseView == null || topMatch.twoPhaseView.matches()) {
topMatch.lastMatchingDoc = doc;
}
}
if (topMatch.lastMatchingDoc == doc) {
collectBucket(sub, doc, bucketOrd(bucket, topMatch.filterOrd));
matched = true;
}
}
}
}

if (otherBucketKey != null && false == matched) {
collectBucket(sub, doc, bucketOrd(bucket, docFilters.length));
collectBucket(sub, doc, bucketOrd(bucket, numFilters));
}
}

@Override
public DocIdSetIterator competitiveIterator() throws IOException {
if (useCompetitiveIterator) {
// A DocIdSetIterator view of the filterIterators heap
return new DisjunctionDISIApproximation(filterIterators);
} else {
return null;
}
}
};
Expand All @@ -317,6 +375,20 @@ public void collect(int doc, long bucket) throws IOException {
final long bucketOrd(long owningBucketOrdinal, int filterOrd) {
return owningBucketOrdinal * totalNumKeys + filterOrd;
}

private static class FilterMatchingDisiWrapper extends DisiWrapper {
final int filterOrd;

// Tracks the last doc that matches the filter.
int lastMatchingDoc = -1;
// Tracks the last doc that was checked for filter matching.
int lastCheckedDoc = -1;

FilterMatchingDisiWrapper(Scorer scorer, int ord) {
super(scorer);
this.filterOrd = ord;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.IntPredicate;

/**
* Adapts a Lucene {@link Query} to the behaviors used be the
Expand Down Expand Up @@ -171,16 +171,20 @@ private static Query unwrap(Query query) {
}

/**
* Build a predicate that the "compatible" implementation of the
* {@link FiltersAggregator} will use to figure out if the filter matches.
* <p>
* Consumers of this method will always call it with non-negative,
* increasing {@code int}s. A sequence like {@code 0, 1, 7, 8, 10} is fine.
* It won't call with {@code 0, 1, 0} or {@code -1, 0, 1}.
* Returns the {@link Scorer} that the "compatible" implementation of the {@link FiltersAggregator} will use
* to get an iterator over the docs matching the filter. The scorer is optimized for random access, since
* it will be skipping documents that don't match the main query or other filters.
* If the passed context contains no scorer, it returns a dummy scorer that matches no docs.
*/
@SuppressWarnings("resource") // Closing the reader is someone else's problem
IntPredicate matchingDocIds(LeafReaderContext ctx) throws IOException {
return Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), weight().scorerSupplier(ctx))::get;
Scorer randomAccessScorer(LeafReaderContext ctx) throws IOException {
Weight weight = weight();
ScorerSupplier scorerSupplier = weight.scorerSupplier(ctx);
if (scorerSupplier == null) {
return null;
}

// A leading cost of 0 instructs the scorer to optimize for random access as opposed to sequential access
return scorerSupplier.get(0L);
}

/**
Expand Down Expand Up @@ -255,7 +259,7 @@ private Weight weight() throws IOException {
* @param filters list of filters to check
* @return true if all filters match no docs, otherwise false
*/
static boolean MatchesNoDocs(List<QueryToFilterAdapter> filters) {
static boolean matchesNoDocs(List<QueryToFilterAdapter> filters) {
for (QueryToFilterAdapter filter : filters) {
if (filter.query() instanceof MatchNoDocsQuery == false) {
return false;
Expand Down

0 comments on commit 8af9b4a

Please sign in to comment.