Skip to content

Commit

Permalink
addressed nits
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jul 16, 2024
1 parent 106500b commit bd317d8
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService);
starTreesBuilder.build();
starTreesBuilder.close();
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) {
starTreesBuilder.build();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.apache.lucene.index;
package org.opensearch.index.compositeindex.datacube.startree.builder;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -19,9 +22,6 @@
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.ValueAggregator;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeBuilder;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeDocValuesIteratorAdapter;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode;
import org.opensearch.index.fielddata.IndexNumericFieldData;
Expand Down Expand Up @@ -70,6 +70,7 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder {

protected SequentialDocValuesIterator[] dimensionReaders;

// We do not close these producers as they are empty doc value producers (where close() is unsupported)
protected Map<String, DocValuesProducer> fieldProducerMap;

private final StarTreeDocValuesIteratorAdapter starTreeDocValuesIteratorAdapter;
Expand Down Expand Up @@ -206,10 +207,9 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
* Sorts and aggregates the star-tree document in the segment, and returns a star-tree document iterator for all the
* aggregated star-tree document.
*
* @param numDocs number of documents in the given segment
* @return Iterator for the aggregated star-tree document
*/
public abstract Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments(int numDocs) throws IOException;
public abstract Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments() throws IOException;

/**
* Generates aggregated star-tree documents for star-node.
Expand Down Expand Up @@ -242,25 +242,35 @@ protected StarTreeDocument getSegmentStarTreeDocument(int currentDocId) throws I
private Long[] getStarTreeDimensionsFromSegment(int currentDocId) throws IOException {
Long[] dimensions = new Long[numDimensions];
for (int i = 0; i < numDimensions; i++) {
if (dimensionReaders[i] != null) {
try {
starTreeDocValuesIteratorAdapter.nextDoc(dimensionReaders[i], currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the dimension values from the segment", e);
throw new IllegalStateException("unable to read the dimension values from the segment", e);
}

dimensions[i] = starTreeDocValuesIteratorAdapter.getNextValue(dimensionReaders[i], currentDocId);
} else {
throw new IllegalStateException("dimension readers are empty");
try {
dimensions[i] = getValuesFromSegment(dimensionReaders[i], currentDocId);
} catch (Exception e) {
logger.error("unable to read the dimension values from the segment", e);
throw new IllegalStateException("unable to read the dimension values from the segment", e);
}

}
return dimensions;
}

/**
* Returns the next value from the iterator of respective field
*
* @param iterator respective field iterator
* @param currentDocId current document id
* @return the next value for the field
* @throws IOException when we are unable to iterate to the next doc for the given iterator
*/
private Long getValuesFromSegment(SequentialDocValuesIterator iterator, int currentDocId) throws IOException {
try {
starTreeDocValuesIteratorAdapter.nextDoc(iterator, currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
}
return starTreeDocValuesIteratorAdapter.getNextValue(iterator, currentDocId);
}

/**
* Returns the metric values for the next document from the segment
*
Expand All @@ -273,15 +283,11 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOExcept
SequentialDocValuesIterator metricStatReader = metricAggregatorInfos.get(i).getMetricStatReader();
if (metricStatReader != null) {
try {
starTreeDocValuesIteratorAdapter.nextDoc(metricStatReader, currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
metrics[i] = getValuesFromSegment(metricStatReader, currentDocId);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
metrics[i] = starTreeDocValuesIteratorAdapter.getNextValue(metricStatReader, currentDocId);
} else {
throw new IllegalStateException("metric readers are empty");
}
Expand All @@ -297,6 +303,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOExcept
* @param segmentDocument segment star-tree document
* @return merged star-tree document
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
protected StarTreeDocument reduceSegmentStarTreeDocuments(
StarTreeDocument aggregatedSegmentDocument,
StarTreeDocument segmentDocument
Expand Down Expand Up @@ -370,6 +377,7 @@ private static long getLong(Object metric) {
* @param starTreeDocument segment star-tree document
* @return merged star-tree document
*/
@SuppressWarnings("unchecked")
public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocument, StarTreeDocument starTreeDocument) {
// aggregate the documents
if (aggregatedDocument == null) {
Expand Down Expand Up @@ -410,7 +418,12 @@ public void build() throws IOException {
long startTime = System.currentTimeMillis();
logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName());

Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateStarTreeDocuments(totalSegmentDocs);
if (totalSegmentDocs == 0) {
logger.debug("No documents found in the segment");
return;
}

Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateStarTreeDocuments();
logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
build(starTreeDocumentIterator);
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
Expand All @@ -422,7 +435,7 @@ public void build() throws IOException {
* @param starTreeDocumentIterator contains the sorted and aggregated documents
* @throws IOException when we are unable to build star-tree
*/
public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IOException {
void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IOException {
int numSegmentStarTreeDocument = totalSegmentDocs;

while (starTreeDocumentIterator.hasNext()) {
Expand All @@ -432,7 +445,7 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument);

if (numStarTreeDocs == 0) {
// TODO: Uncomment when segment codec is ready
// TODO: Uncomment when segment codec and file formats is ready
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.opensearch.index.compositeindex.datacube.startree.builder;

import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BaseStarTreeBuilder;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
Expand Down Expand Up @@ -72,45 +71,42 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException {
}

@Override
public Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments(int numDocs) throws IOException {
public Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments() throws IOException {
int numDocs = totalSegmentDocs;
StarTreeDocument[] starTreeDocuments = new StarTreeDocument[numDocs];
for (int currentDocId = 0; currentDocId < numDocs; currentDocId++) {
starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId);
}

return sortAndAggregateStarTreeDocuments(starTreeDocuments);
}

/**
* Sort, aggregates and merges the star-tree documents
*
* @param starTreeDocuments star-tree documents
* @return iterator for star-tree documents
* @throws IOException throws when unable to sort, merge and aggregate star-tree documents
*/
Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) throws IOException {
Iterator<StarTreeDocument> sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) {

// sort the documents
Arrays.sort(starTreeDocuments, (o1, o2) -> {
for (int i = 0; i < numDimensions; i++) {
if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) {
return Long.compare(o1.dimensions[i], o2.dimensions[i]);
}
}
return 0;
});
// sort all the documents
sortStarTreeDocumentsFromDimensionId(starTreeDocuments, 0);

// merge the documents
return mergeStarTreeDocuments(starTreeDocuments);
}

/**
* Merges the star-tree documents
*
* @param starTreeDocuments star-tree documents
* @return iterator to aggregate star-tree documents
*/
private Iterator<StarTreeDocument> mergeStarTreeDocuments(StarTreeDocument[] starTreeDocuments) {
return new Iterator<>() {
boolean hasNext = true;
StarTreeDocument currentStarTreeDocument = starTreeDocuments[0];
// starting from 1 since we have already fetched the 0th document
int docId = 1;

@Override
Expand All @@ -123,8 +119,9 @@ public StarTreeDocument next() {
// aggregate as we move on to the next doc
StarTreeDocument next = reduceSegmentStarTreeDocuments(null, currentStarTreeDocument);
while (docId < starTreeDocuments.length) {
StarTreeDocument starTreeDocument = starTreeDocuments[docId++];
if (!Arrays.equals(starTreeDocument.dimensions, next.dimensions)) {
StarTreeDocument starTreeDocument = starTreeDocuments[docId];
docId++;
if (Arrays.equals(starTreeDocument.dimensions, next.dimensions) == false) {
currentStarTreeDocument = starTreeDocument;
return next;
} else {
Expand All @@ -139,6 +136,7 @@ public StarTreeDocument next() {

/**
* Generates a star-tree for a given star-node
*
* @param startDocId Start document id in the star-tree
* @param endDocId End document id (exclusive) in the star-tree
* @param dimensionId Dimension id of the star-node
Expand All @@ -153,14 +151,10 @@ public Iterator<StarTreeDocument> generateStarTreeDocumentsForStarNode(int start
for (int i = 0; i < numDocs; i++) {
starTreeDocuments[i] = getStarTreeDocument(startDocId + i);
}
Arrays.sort(starTreeDocuments, (o1, o2) -> {
for (int i = dimensionId + 1; i < numDimensions; i++) {
if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) {
return Long.compare(o1.dimensions[i], o2.dimensions[i]);
}
}
return 0;
});

// sort star tree documents from given dimension id (as previous dimension ids have already been processed)
sortStarTreeDocumentsFromDimensionId(starTreeDocuments, dimensionId + 1);

return new Iterator<StarTreeDocument>() {
boolean hasNext = true;
StarTreeDocument currentStarTreeDocument = starTreeDocuments[0];
Expand All @@ -185,7 +179,8 @@ public StarTreeDocument next() {
StarTreeDocument next = reduceStarTreeDocuments(null, currentStarTreeDocument);
next.dimensions[dimensionId] = Long.valueOf(STAR_IN_DOC_VALUES_INDEX);
while (docId < numDocs) {
StarTreeDocument starTreeDocument = starTreeDocuments[docId++];
StarTreeDocument starTreeDocument = starTreeDocuments[docId];
docId++;
if (!hasSameDimensions(starTreeDocument, currentStarTreeDocument)) {
currentStarTreeDocument = starTreeDocument;
return next;
Expand All @@ -198,4 +193,21 @@ public StarTreeDocument next() {
}
};
}

/**
* Sorts the star-tree documents from the given dimension id
*
* @param starTreeDocuments star-tree documents
* @param dimensionId id of the dimension
*/
private void sortStarTreeDocumentsFromDimensionId(StarTreeDocument[] starTreeDocuments, int dimensionId) {
Arrays.sort(starTreeDocuments, (o1, o2) -> {
for (int i = dimensionId; i < numDimensions; i++) {
if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) {
return Long.compare(o1.dimensions[i], o2.dimensions[i]);
}
}
return 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public void build() throws IOException {
logger.debug("Starting building {} star-trees with star-tree fields", numStarTrees);

// Build all star-trees
for (int i = 0; i < numStarTrees; i++) {
StarTreeField starTreeField = starTreeFields.get(i);
try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, fieldProducerMap, state, mapperService)) {
for (StarTreeField starTreeField : starTreeFields) {
try (StarTreeBuilder starTreeBuilder = getStarTreeBuilder(starTreeField, fieldProducerMap, state, mapperService)) {
starTreeBuilder.build();
}
}
Expand All @@ -93,7 +92,7 @@ public void close() throws IOException {

}

static StarTreeBuilder getSingleTreeBuilder(
StarTreeBuilder getStarTreeBuilder(
StarTreeField starTreeField,
Map<String, DocValuesProducer> fieldProducerMap,
SegmentWriteState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.common.Rounding;
import org.opensearch.index.codec.composite.Composite99Codec;
Expand All @@ -31,7 +26,6 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -77,34 +71,4 @@ private static StarTreeField getStarTreeField(List<Rounding.DateTimeUnit> d1Cale

return new StarTreeField("starTree", dims, metrics, config);
}

public void testStarTreeDocValues() throws IOException {
Directory directory = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(null);
conf.setMergePolicy(newLogMergePolicy());
RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf);
Document doc = new Document();
doc.add(new SortedNumericDocValuesField("sndv", 1));
doc.add(new SortedNumericDocValuesField("dv", 1));
doc.add(new SortedNumericDocValuesField("field", 1));
iw.addDocument(doc);
doc.add(new SortedNumericDocValuesField("sndv", 1));
doc.add(new SortedNumericDocValuesField("dv", 1));
doc.add(new SortedNumericDocValuesField("field", 1));
iw.addDocument(doc);
iw.forceMerge(1);
doc.add(new SortedNumericDocValuesField("sndv", 2));
doc.add(new SortedNumericDocValuesField("dv", 2));
doc.add(new SortedNumericDocValuesField("field", 2));
iw.addDocument(doc);
doc.add(new SortedNumericDocValuesField("sndv", 2));
doc.add(new SortedNumericDocValuesField("dv", 2));
doc.add(new SortedNumericDocValuesField("field", 2));
iw.addDocument(doc);
iw.forceMerge(1);
iw.close();

// TODO : validate star tree structures that got created
directory.close();
}
}
Loading

0 comments on commit bd317d8

Please sign in to comment.