Skip to content

Commit

Permalink
fix: The aggs result of NestedAggregator with sub NestedAggregator ma…
Browse files Browse the repository at this point in the history
…y be not accurately

Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Apr 22, 2024
1 parent b4692c8 commit e9775d7
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,16 @@ protected AggregatorFactory doBuild(QueryShardContext queryShardContext, Aggrega
ObjectMapper childObjectMapper = queryShardContext.getObjectMapper(path);
if (childObjectMapper == null) {
// in case the path has been unmapped:
return new NestedAggregatorFactory(name, null, null, queryShardContext, parent, subFactoriesBuilder, metadata);
return new NestedAggregatorFactory(name, null, queryShardContext, parent, subFactoriesBuilder, metadata);
}

if (childObjectMapper.nested().isNested() == false) {
throw new AggregationExecutionException("[nested] nested path [" + path + "] is not nested");
}
try {
ObjectMapper parentObjectMapper = queryShardContext.nestedScope().nextLevel(childObjectMapper);
queryShardContext.nestedScope().nextLevel(childObjectMapper);
return new NestedAggregatorFactory(
name,
parentObjectMapper,
childObjectMapper,
queryShardContext,
parent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.ObjectMapper;
Expand Down Expand Up @@ -79,7 +80,6 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
NestedAggregator(
String name,
AggregatorFactories factories,
ObjectMapper parentObjectMapper,
ObjectMapper childObjectMapper,
SearchContext context,
Aggregator parent,
Expand All @@ -88,8 +88,7 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(Queries.newNonNestedFilter());
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}
Expand All @@ -108,19 +107,16 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getChildAndRootParent(parentDocs, childDocs, parentDoc);
int currentRootDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentRootDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +126,30 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

static Tuple<Integer, Integer> getChildAndRootParent(BitSet parentDocs, DocIdSetIterator childDocs, int parentDoc
) throws IOException {
int currentRootDoc;
int prevParentDoc = parentDocs.prevSetBit(parentDoc);
if (prevParentDoc == -1) {
currentRootDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentDoc) {
currentRootDoc = parentDoc;
if (currentRootDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentRootDoc - 1);
}
} else {
currentRootDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
return Tuple.tuple(currentRootDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -191,9 +211,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,13 +233,11 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getChildAndRootParent(parentDocs, childDocs, currentParentDoc);
int currentRootDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentRootDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
for (var bucket : bucketBuffer) {
collectBucket(sub, childDocId, bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,17 @@
*/
public class NestedAggregatorFactory extends AggregatorFactory {

private final ObjectMapper parentObjectMapper;
private final ObjectMapper childObjectMapper;

NestedAggregatorFactory(
String name,
ObjectMapper parentObjectMapper,
ObjectMapper childObjectMapper,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactories,
Map<String, Object> metadata
) throws IOException {
super(name, queryShardContext, parent, subFactories, metadata);
this.parentObjectMapper = parentObjectMapper;
this.childObjectMapper = childObjectMapper;
}

Expand All @@ -79,7 +76,7 @@ public Aggregator createInternal(
if (childObjectMapper == null) {
return new Unmapped(name, searchContext, parent, factories, metadata);
}
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, searchContext, parent, cardinality, metadata);
return new NestedAggregator(name, factories, childObjectMapper, searchContext, parent, cardinality, metadata);
}

/**
Expand Down
Loading

0 comments on commit e9775d7

Please sign in to comment.