From e97b0c32df563d839723279ad30bc837e120a0ac Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 9 Feb 2024 19:54:28 +0530 Subject: [PATCH] fixing merge aggregation Signed-off-by: Bharathwaj G --- .../builder/BaseSingleTreeBuilder.java | 4 +++- .../builder/StarTreeBuilderUtils.java | 1 + .../codec/Lucene90DocValuesProducerCopy.java | 7 +++++++ .../codec/StarTreeDocValuesReader.java | 10 +++++++++- .../codec/StarTreeDocValuesWriter.java | 15 ++++++++++++--- .../codec/freshstartree/node/OffHeapStarTree.java | 6 ++++++ .../codec/freshstartree/query/StarTreeFilter.java | 4 +++- .../bucket/startree/StarTreeAggregator.java | 1 - 8 files changed, 41 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java index e77b30336104c..4faa60d4d7929 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java @@ -147,7 +147,7 @@ public abstract class BaseSingleTreeBuilder { } // TODO : Removing hardcoding - _maxLeafRecords = 100; // builderConfig.getMaxLeafRecords(); + _maxLeafRecords = 1; // builderConfig.getMaxLeafRecords(); } private void constructStarTree(StarTreeBuilderUtils.TreeNode node, int startDocId, int endDocId) throws IOException { @@ -317,6 +317,7 @@ private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws for (int docId = 0; docId < _numDocs; docId++) { Record record = getStarTreeRecord(docId); + //logger.info("Record during doc values indices : {} ", record.toString()); for (int i = 0; i < record._dimensions.length; i++) { long val = record._dimensions[i]; pendingDimArr[i].add(val); @@ -381,6 +382,7 @@ private void appendToStarTree(Record record) throws IOException { // if(star) { // System.out.println("======Overall sum =====" + (long) record._metrics[0]); // } + //logger.info("Record : {}", record.toString()); appendRecord(record); _numDocs++; } diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/StarTreeBuilderUtils.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/StarTreeBuilderUtils.java index f86468299d02f..ecb907db2d0cf 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/StarTreeBuilderUtils.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/StarTreeBuilderUtils.java @@ -116,6 +116,7 @@ private static void writeNodes(IndexOutput output, TreeNode rootNode) throws IOE } private static void writeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) throws IOException { + //logger.info("Builder util : dim id : {} , dim value : {}", node._dimensionId, node._dimensionValue); output.writeInt(node._dimensionId); output.writeLong(node._dimensionValue); output.writeInt(node._startDocId); diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java index 11b43c9c06916..9fdae1aacbcd7 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/Lucene90DocValuesProducerCopy.java @@ -16,6 +16,8 @@ */ package org.opensearch.index.codec.freshstartree.codec; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene90.IndexedDISI; @@ -47,6 +49,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.opensearch.index.codec.freshstartree.query.StarTreeFilter; + /** * Created a copy to initialize producer without field info stored in state which is the case for @@ -55,6 +59,9 @@ * We don't create aggregated doc value fields in traditional add/update doc workflow where fieldInfo gets populated */ public class Lucene90DocValuesProducerCopy extends DocValuesProducer { + + private static final Logger logger = LogManager.getLogger(Lucene90DocValuesProducerCopy.class); + private final Map numerics; private final Map sortedNumerics; private final IndexInput data; diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java index adc43aef29335..fad61a5a90d2e 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesReader.java @@ -16,6 +16,9 @@ */ package org.opensearch.index.codec.freshstartree.codec; +import java.util.LinkedHashMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.BinaryDocValues; @@ -34,9 +37,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.opensearch.index.codec.freshstartree.query.StarTreeFilter; + /** Custom star tree doc values reader */ public class StarTreeDocValuesReader extends DocValuesProducer { + private static final Logger logger = LogManager.getLogger(StarTreeDocValuesReader.class); + private DocValuesProducer delegate; private IndexInput data; @@ -59,7 +66,7 @@ public StarTreeDocValuesReader(DocValuesProducer producer, SegmentReadState stat CodecUtil.checkIndexHeader(data, "STARTreeCodec", 0, 0, state.segmentInfo.getId(), state.segmentSuffix); starTree = new OffHeapStarTree(data); valuesProducer = new Lucene90DocValuesProducerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm", starTree.getDimensionNames()); - dimensionValues = new HashMap<>(); + dimensionValues = new LinkedHashMap<>(); } @Override @@ -71,6 +78,7 @@ public NumericDocValues getNumeric(FieldInfo field) throws IOException { public StarTreeAggregatedValues getAggregatedDocValues() throws IOException { // starTree.printTree(new HashMap<>()); List dimensionsSplitOrder = starTree.getDimensionNames(); + //logger.info("Dimension order {}", dimensionsSplitOrder); for (int i = 0; i < dimensionsSplitOrder.size(); i++) { dimensionValues.put(dimensionsSplitOrder.get(i), valuesProducer.getNumeric(dimensionsSplitOrder.get(i) + "_dim")); } diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java index afb8a15f5717a..2ccadf0422a9a 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/codec/StarTreeDocValuesWriter.java @@ -16,6 +16,8 @@ */ package org.opensearch.index.codec.freshstartree.codec; +import java.util.Collection; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; @@ -60,7 +62,12 @@ public StarTreeDocValuesWriter(DocValuesConsumer delegate, SegmentWriteState seg this.state = segmentWriteState; dimensionReaders = new HashMap<>(); dimensionsSplitOrder = new ArrayList<>(); - +// dimensionsSplitOrder.add("minute"); +// dimensionsSplitOrder.add("hour"); +// dimensionsSplitOrder.add("day"); +// dimensionsSplitOrder.add("month"); +// // dimensionsSplitOrder.add("year"); +// dimensionsSplitOrder.add("status"); docValuesConsumer = new Lucene90DocValuesConsumerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm"); } @@ -100,7 +107,7 @@ public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProdu } else { // logger.info("Adding field : " + field.name); dimensionReaders.put(field.name + "_dim", valuesProducer.getSortedNumeric(field)); - dimensionsSplitOrder.add(field.name); + //dimensionsSplitOrder.add(field.name); } if (field.name.contains("status")) { // TODO : change this metric type @@ -121,16 +128,18 @@ public void merge(MergeState mergeState) throws IOException { public void mergeAggregatedValues(MergeState mergeState) throws IOException { List aggrList = new ArrayList<>(); + List dimNames = new ArrayList<>(); for (int i = 0; i < mergeState.docValuesProducers.length; i++) { DocValuesProducer producer = mergeState.docValuesProducers[i]; Object obj = producer.getAggregatedDocValues(); StarTreeAggregatedValues starTree = (StarTreeAggregatedValues) obj; + dimNames = starTree.dimensionValues.keySet().stream().collect(Collectors.toList()); aggrList.add(starTree); } long startTime = System.currentTimeMillis(); builder = new OffHeapBufferedSingleTreeBuilder( data, - dimensionsSplitOrder, + dimNames, dimensionReaders, state.segmentInfo.maxDoc(), docValuesConsumer, diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/node/OffHeapStarTree.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/node/OffHeapStarTree.java index 62d3c9c0342b5..a82e536d2f4c9 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/node/OffHeapStarTree.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/node/OffHeapStarTree.java @@ -16,6 +16,8 @@ */ package org.opensearch.index.codec.freshstartree.node; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RandomAccessInput; @@ -26,9 +28,12 @@ import java.util.List; import java.util.Map; import java.util.StringJoiner; +import org.opensearch.index.codec.freshstartree.builder.OffHeapBufferedSingleTreeBuilder; + /** Off heap implementation of star tree. */ public class OffHeapStarTree implements StarTree { + private static final Logger logger = LogManager.getLogger(OffHeapStarTree.class); public static final long MAGIC_MARKER = 0xBADDA55B00DAD00DL; public static final int VERSION = 1; private final OffHeapStarTreeNode _root; @@ -108,6 +113,7 @@ private void printTreeHelper(Map> dictionaryMap, Off .toString(); stringBuilder.append(formattedOutput); + logger.info(stringBuilder.toString()); if (!node.isLeaf()) { Iterator childrenIterator = node.getChildrenIterator(); diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java index 3f754f9286b6e..f8a203ac36414 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/query/StarTreeFilter.java @@ -166,7 +166,8 @@ private StarTreeResult traverseStarTree() throws IOException { // If all predicate columns and group-by columns are matched, we can use aggregated document if (remainingPredicateColumns.isEmpty() && remainingGroupByColumns.isEmpty()) { adder = docsWithField.grow(1); - adder.add(starTreeNode.getAggregatedDocId()); + int aggrDocId = starTreeNode.getAggregatedDocId(); + adder.add(aggrDocId); continue; } @@ -178,6 +179,7 @@ private StarTreeResult traverseStarTree() throws IOException { if (starTreeNode.isLeaf()) { for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { adder = docsWithField.grow(1); + logger.info("Adding normal doc id {}", i); adder.add((int) i); } continue; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java index 015ab02495bf0..941c75580dd42 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/StarTreeAggregator.java @@ -178,7 +178,6 @@ public void collect(int doc, long bucket) throws IOException { for (String field : fieldCols) { fieldColToDocValuesMap.put(field, aggrVals.dimensionValues.get(field)); } - NumericDocValues dv = aggrVals.metricValues.get("status_sum"); if (dv.advanceExact(doc)) {