Skip to content

Commit

Permalink
fixing merge aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Feb 9, 2024
1 parent c0c8a28 commit e97b0c3
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public abstract class BaseSingleTreeBuilder {
}

// TODO : Removing hardcoding
_maxLeafRecords = 100; // builderConfig.getMaxLeafRecords();
_maxLeafRecords = 1; // builderConfig.getMaxLeafRecords();
}

private void constructStarTree(StarTreeBuilderUtils.TreeNode node, int startDocId, int endDocId) throws IOException {
Expand Down Expand Up @@ -317,6 +317,7 @@ private void createDocValuesIndices(DocValuesConsumer docValuesConsumer) throws

for (int docId = 0; docId < _numDocs; docId++) {
Record record = getStarTreeRecord(docId);
//logger.info("Record during doc values indices : {} ", record.toString());
for (int i = 0; i < record._dimensions.length; i++) {
long val = record._dimensions[i];
pendingDimArr[i].add(val);
Expand Down Expand Up @@ -381,6 +382,7 @@ private void appendToStarTree(Record record) throws IOException {
// if(star) {
// System.out.println("======Overall sum =====" + (long) record._metrics[0]);
// }
//logger.info("Record : {}", record.toString());
appendRecord(record);
_numDocs++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private static void writeNodes(IndexOutput output, TreeNode rootNode) throws IOE
}

private static void writeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) throws IOException {
//logger.info("Builder util : dim id : {} , dim value : {}", node._dimensionId, node._dimensionValue);
output.writeInt(node._dimensionId);
output.writeLong(node._dimensionValue);
output.writeInt(node._startDocId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.opensearch.index.codec.freshstartree.codec;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
Expand Down Expand Up @@ -47,6 +49,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opensearch.index.codec.freshstartree.query.StarTreeFilter;


/**
* Created a copy to initialize producer without field info stored in state which is the case for
Expand All @@ -55,6 +59,9 @@
* We don't create aggregated doc value fields in traditional add/update doc workflow where fieldInfo gets populated
*/
public class Lucene90DocValuesProducerCopy extends DocValuesProducer {

private static final Logger logger = LogManager.getLogger(Lucene90DocValuesProducerCopy.class);

private final Map<String, NumericEntry> numerics;
private final Map<String, SortedNumericEntry> sortedNumerics;
private final IndexInput data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.opensearch.index.codec.freshstartree.codec;

import java.util.LinkedHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
Expand All @@ -34,9 +37,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opensearch.index.codec.freshstartree.query.StarTreeFilter;


/** Custom star tree doc values reader */
public class StarTreeDocValuesReader extends DocValuesProducer {
private static final Logger logger = LogManager.getLogger(StarTreeDocValuesReader.class);

private DocValuesProducer delegate;

private IndexInput data;
Expand All @@ -59,7 +66,7 @@ public StarTreeDocValuesReader(DocValuesProducer producer, SegmentReadState stat
CodecUtil.checkIndexHeader(data, "STARTreeCodec", 0, 0, state.segmentInfo.getId(), state.segmentSuffix);
starTree = new OffHeapStarTree(data);
valuesProducer = new Lucene90DocValuesProducerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm", starTree.getDimensionNames());
dimensionValues = new HashMap<>();
dimensionValues = new LinkedHashMap<>();
}

@Override
Expand All @@ -71,6 +78,7 @@ public NumericDocValues getNumeric(FieldInfo field) throws IOException {
public StarTreeAggregatedValues getAggregatedDocValues() throws IOException {
// starTree.printTree(new HashMap<>());
List<String> dimensionsSplitOrder = starTree.getDimensionNames();
//logger.info("Dimension order {}", dimensionsSplitOrder);
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
dimensionValues.put(dimensionsSplitOrder.get(i), valuesProducer.getNumeric(dimensionsSplitOrder.get(i) + "_dim"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.opensearch.index.codec.freshstartree.codec;

import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
Expand Down Expand Up @@ -60,7 +62,12 @@ public StarTreeDocValuesWriter(DocValuesConsumer delegate, SegmentWriteState seg
this.state = segmentWriteState;
dimensionReaders = new HashMap<>();
dimensionsSplitOrder = new ArrayList<>();

// dimensionsSplitOrder.add("minute");
// dimensionsSplitOrder.add("hour");
// dimensionsSplitOrder.add("day");
// dimensionsSplitOrder.add("month");
// // dimensionsSplitOrder.add("year");
// dimensionsSplitOrder.add("status");
docValuesConsumer = new Lucene90DocValuesConsumerCopy(state, DATA_CODEC, "sttd", META_CODEC, "sttm");
}

Expand Down Expand Up @@ -100,7 +107,7 @@ public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProdu
} else {
// logger.info("Adding field : " + field.name);
dimensionReaders.put(field.name + "_dim", valuesProducer.getSortedNumeric(field));
dimensionsSplitOrder.add(field.name);
//dimensionsSplitOrder.add(field.name);
}
if (field.name.contains("status")) {
// TODO : change this metric type
Expand All @@ -121,16 +128,18 @@ public void merge(MergeState mergeState) throws IOException {

public void mergeAggregatedValues(MergeState mergeState) throws IOException {
List<StarTreeAggregatedValues> aggrList = new ArrayList<>();
List<String> dimNames = new ArrayList<>();
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
DocValuesProducer producer = mergeState.docValuesProducers[i];
Object obj = producer.getAggregatedDocValues();
StarTreeAggregatedValues starTree = (StarTreeAggregatedValues) obj;
dimNames = starTree.dimensionValues.keySet().stream().collect(Collectors.toList());
aggrList.add(starTree);
}
long startTime = System.currentTimeMillis();
builder = new OffHeapBufferedSingleTreeBuilder(
data,
dimensionsSplitOrder,
dimNames,
dimensionReaders,
state.segmentInfo.maxDoc(),
docValuesConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.opensearch.index.codec.freshstartree.node;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RandomAccessInput;

Expand All @@ -26,9 +28,12 @@
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.opensearch.index.codec.freshstartree.builder.OffHeapBufferedSingleTreeBuilder;


/** Off heap implementation of star tree. */
public class OffHeapStarTree implements StarTree {
private static final Logger logger = LogManager.getLogger(OffHeapStarTree.class);
public static final long MAGIC_MARKER = 0xBADDA55B00DAD00DL;
public static final int VERSION = 1;
private final OffHeapStarTreeNode _root;
Expand Down Expand Up @@ -108,6 +113,7 @@ private void printTreeHelper(Map<String, Map<String, String>> dictionaryMap, Off
.toString();

stringBuilder.append(formattedOutput);
logger.info(stringBuilder.toString());

if (!node.isLeaf()) {
Iterator<OffHeapStarTreeNode> childrenIterator = node.getChildrenIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private StarTreeResult traverseStarTree() throws IOException {
// If all predicate columns and group-by columns are matched, we can use aggregated document
if (remainingPredicateColumns.isEmpty() && remainingGroupByColumns.isEmpty()) {
adder = docsWithField.grow(1);
adder.add(starTreeNode.getAggregatedDocId());
int aggrDocId = starTreeNode.getAggregatedDocId();
adder.add(aggrDocId);
continue;
}

Expand All @@ -178,6 +179,7 @@ private StarTreeResult traverseStarTree() throws IOException {
if (starTreeNode.isLeaf()) {
for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) {
adder = docsWithField.grow(1);
logger.info("Adding normal doc id {}", i);
adder.add((int) i);
}
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public void collect(int doc, long bucket) throws IOException {
for (String field : fieldCols) {
fieldColToDocValuesMap.put(field, aggrVals.dimensionValues.get(field));
}

NumericDocValues dv = aggrVals.metricValues.get("status_sum");
if (dv.advanceExact(doc)) {

Expand Down

0 comments on commit e97b0c3

Please sign in to comment.