diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java index 62da9fde6d831..3753b20a8bea3 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java @@ -99,9 +99,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, if (compositeFieldSet.isEmpty()) { for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) { if (mappedType instanceof StarTreeMapper.StarTreeFieldType) { - StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService); - starTreesBuilder.build(); - starTreesBuilder.close(); + try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) { + starTreesBuilder.build(); + } } } } diff --git a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java similarity index 93% rename from server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java rename to server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index 69786ec60cb8b..0333db7256994 100644 --- a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -5,11 +5,14 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.apache.lucene.index; +package org.opensearch.index.compositeindex.datacube.startree.builder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SegmentWriteState; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -19,9 +22,6 @@ import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; import org.opensearch.index.compositeindex.datacube.startree.aggregators.ValueAggregator; import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType; -import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeBuilder; -import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeDocValuesIteratorAdapter; -import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode; import org.opensearch.index.fielddata.IndexNumericFieldData; @@ -70,6 +70,7 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { protected SequentialDocValuesIterator[] dimensionReaders; + // We do not close these producers as they are empty doc value producers (where close() is unsupported) protected Map fieldProducerMap; private final StarTreeDocValuesIteratorAdapter starTreeDocValuesIteratorAdapter; @@ -206,10 +207,9 @@ public List generateMetricAggregatorInfos(MapperService ma * Sorts and aggregates the star-tree document in the segment, and returns a star-tree document iterator for all the * aggregated star-tree document. * - * @param numDocs number of documents in the given segment * @return Iterator for the aggregated star-tree document */ - public abstract Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException; + public abstract Iterator sortAndAggregateStarTreeDocuments() throws IOException; /** * Generates aggregated star-tree documents for star-node. @@ -242,25 +242,35 @@ protected StarTreeDocument getSegmentStarTreeDocument(int currentDocId) throws I private Long[] getStarTreeDimensionsFromSegment(int currentDocId) throws IOException { Long[] dimensions = new Long[numDimensions]; for (int i = 0; i < numDimensions; i++) { - if (dimensionReaders[i] != null) { - try { - starTreeDocValuesIteratorAdapter.nextDoc(dimensionReaders[i], currentDocId); - } catch (IOException e) { - logger.error("unable to iterate to next doc", e); - throw new RuntimeException("unable to iterate to next doc", e); - } catch (Exception e) { - logger.error("unable to read the dimension values from the segment", e); - throw new IllegalStateException("unable to read the dimension values from the segment", e); - } - - dimensions[i] = starTreeDocValuesIteratorAdapter.getNextValue(dimensionReaders[i], currentDocId); - } else { - throw new IllegalStateException("dimension readers are empty"); + try { + dimensions[i] = getValuesFromSegment(dimensionReaders[i], currentDocId); + } catch (Exception e) { + logger.error("unable to read the dimension values from the segment", e); + throw new IllegalStateException("unable to read the dimension values from the segment", e); } + } return dimensions; } + /** + * Returns the next value from the iterator of respective field + * + * @param iterator respective field iterator + * @param currentDocId current document id + * @return the next value for the field + * @throws IOException when we are unable to iterate to the next doc for the given iterator + */ + private Long getValuesFromSegment(SequentialDocValuesIterator iterator, int currentDocId) throws IOException { + try { + starTreeDocValuesIteratorAdapter.nextDoc(iterator, currentDocId); + } catch (IOException e) { + logger.error("unable to iterate to next doc", e); + throw new RuntimeException("unable to iterate to next doc", e); + } + return starTreeDocValuesIteratorAdapter.getNextValue(iterator, currentDocId); + } + /** * Returns the metric values for the next document from the segment * @@ -273,15 +283,11 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOExcept SequentialDocValuesIterator metricStatReader = metricAggregatorInfos.get(i).getMetricStatReader(); if (metricStatReader != null) { try { - starTreeDocValuesIteratorAdapter.nextDoc(metricStatReader, currentDocId); - } catch (IOException e) { - logger.error("unable to iterate to next doc", e); - throw new RuntimeException("unable to iterate to next doc", e); + metrics[i] = getValuesFromSegment(metricStatReader, currentDocId); } catch (Exception e) { logger.error("unable to read the metric values from the segment", e); throw new IllegalStateException("unable to read the metric values from the segment", e); } - metrics[i] = starTreeDocValuesIteratorAdapter.getNextValue(metricStatReader, currentDocId); } else { throw new IllegalStateException("metric readers are empty"); } @@ -297,6 +303,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOExcept * @param segmentDocument segment star-tree document * @return merged star-tree document */ + @SuppressWarnings({ "unchecked", "rawtypes" }) protected StarTreeDocument reduceSegmentStarTreeDocuments( StarTreeDocument aggregatedSegmentDocument, StarTreeDocument segmentDocument @@ -370,6 +377,7 @@ private static long getLong(Object metric) { * @param starTreeDocument segment star-tree document * @return merged star-tree document */ + @SuppressWarnings("unchecked") public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocument, StarTreeDocument starTreeDocument) { // aggregate the documents if (aggregatedDocument == null) { @@ -410,7 +418,12 @@ public void build() throws IOException { long startTime = System.currentTimeMillis(); logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); - Iterator starTreeDocumentIterator = sortAndAggregateStarTreeDocuments(totalSegmentDocs); + if (totalSegmentDocs == 0) { + logger.debug("No documents found in the segment"); + return; + } + + Iterator starTreeDocumentIterator = sortAndAggregateStarTreeDocuments(); logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); build(starTreeDocumentIterator); logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); @@ -422,7 +435,7 @@ public void build() throws IOException { * @param starTreeDocumentIterator contains the sorted and aggregated documents * @throws IOException when we are unable to build star-tree */ - public void build(Iterator starTreeDocumentIterator) throws IOException { + void build(Iterator starTreeDocumentIterator) throws IOException { int numSegmentStarTreeDocument = totalSegmentDocs; while (starTreeDocumentIterator.hasNext()) { @@ -432,7 +445,7 @@ public void build(Iterator starTreeDocumentIterator) throws IO logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); if (numStarTreeDocs == 0) { - // TODO: Uncomment when segment codec is ready + // TODO: Uncomment when segment codec and file formats is ready // StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes); return; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index 489004884acd3..caeb24838da62 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -8,7 +8,6 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.index.BaseStarTreeBuilder; import org.apache.lucene.index.SegmentWriteState; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument; @@ -72,31 +71,26 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException { } @Override - public Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException { + public Iterator sortAndAggregateStarTreeDocuments() throws IOException { + int numDocs = totalSegmentDocs; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[numDocs]; for (int currentDocId = 0; currentDocId < numDocs; currentDocId++) { starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId); } + return sortAndAggregateStarTreeDocuments(starTreeDocuments); } /** * Sort, aggregates and merges the star-tree documents + * * @param starTreeDocuments star-tree documents * @return iterator for star-tree documents - * @throws IOException throws when unable to sort, merge and aggregate star-tree documents */ - Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) throws IOException { + Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) { - // sort the documents - Arrays.sort(starTreeDocuments, (o1, o2) -> { - for (int i = 0; i < numDimensions; i++) { - if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) { - return Long.compare(o1.dimensions[i], o2.dimensions[i]); - } - } - return 0; - }); + // sort all the documents + sortStarTreeDocumentsFromDimensionId(starTreeDocuments, 0); // merge the documents return mergeStarTreeDocuments(starTreeDocuments); @@ -104,6 +98,7 @@ Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] /** * Merges the star-tree documents + * * @param starTreeDocuments star-tree documents * @return iterator to aggregate star-tree documents */ @@ -111,6 +106,7 @@ private Iterator mergeStarTreeDocuments(StarTreeDocument[] sta return new Iterator<>() { boolean hasNext = true; StarTreeDocument currentStarTreeDocument = starTreeDocuments[0]; + // starting from 1 since we have already fetched the 0th document int docId = 1; @Override @@ -123,8 +119,9 @@ public StarTreeDocument next() { // aggregate as we move on to the next doc StarTreeDocument next = reduceSegmentStarTreeDocuments(null, currentStarTreeDocument); while (docId < starTreeDocuments.length) { - StarTreeDocument starTreeDocument = starTreeDocuments[docId++]; - if (!Arrays.equals(starTreeDocument.dimensions, next.dimensions)) { + StarTreeDocument starTreeDocument = starTreeDocuments[docId]; + docId++; + if (Arrays.equals(starTreeDocument.dimensions, next.dimensions) == false) { currentStarTreeDocument = starTreeDocument; return next; } else { @@ -139,6 +136,7 @@ public StarTreeDocument next() { /** * Generates a star-tree for a given star-node + * * @param startDocId Start document id in the star-tree * @param endDocId End document id (exclusive) in the star-tree * @param dimensionId Dimension id of the star-node @@ -153,14 +151,10 @@ public Iterator generateStarTreeDocumentsForStarNode(int start for (int i = 0; i < numDocs; i++) { starTreeDocuments[i] = getStarTreeDocument(startDocId + i); } - Arrays.sort(starTreeDocuments, (o1, o2) -> { - for (int i = dimensionId + 1; i < numDimensions; i++) { - if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) { - return Long.compare(o1.dimensions[i], o2.dimensions[i]); - } - } - return 0; - }); + + // sort star tree documents from given dimension id (as previous dimension ids have already been processed) + sortStarTreeDocumentsFromDimensionId(starTreeDocuments, dimensionId + 1); + return new Iterator() { boolean hasNext = true; StarTreeDocument currentStarTreeDocument = starTreeDocuments[0]; @@ -185,7 +179,8 @@ public StarTreeDocument next() { StarTreeDocument next = reduceStarTreeDocuments(null, currentStarTreeDocument); next.dimensions[dimensionId] = Long.valueOf(STAR_IN_DOC_VALUES_INDEX); while (docId < numDocs) { - StarTreeDocument starTreeDocument = starTreeDocuments[docId++]; + StarTreeDocument starTreeDocument = starTreeDocuments[docId]; + docId++; if (!hasSameDimensions(starTreeDocument, currentStarTreeDocument)) { currentStarTreeDocument = starTreeDocument; return next; @@ -198,4 +193,21 @@ public StarTreeDocument next() { } }; } + + /** + * Sorts the star-tree documents from the given dimension id + * + * @param starTreeDocuments star-tree documents + * @param dimensionId id of the dimension + */ + private void sortStarTreeDocumentsFromDimensionId(StarTreeDocument[] starTreeDocuments, int dimensionId) { + Arrays.sort(starTreeDocuments, (o1, o2) -> { + for (int i = dimensionId; i < numDimensions; i++) { + if (!Objects.equals(o1.dimensions[i], o2.dimensions[i])) { + return Long.compare(o1.dimensions[i], o2.dimensions[i]); + } + } + return 0; + }); + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index f9fd5fc91a9ad..eaf9ae1dcdaa1 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -79,9 +79,8 @@ public void build() throws IOException { logger.debug("Starting building {} star-trees with star-tree fields", numStarTrees); // Build all star-trees - for (int i = 0; i < numStarTrees; i++) { - StarTreeField starTreeField = starTreeFields.get(i); - try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, fieldProducerMap, state, mapperService)) { + for (StarTreeField starTreeField : starTreeFields) { + try (StarTreeBuilder starTreeBuilder = getStarTreeBuilder(starTreeField, fieldProducerMap, state, mapperService)) { starTreeBuilder.build(); } } @@ -93,7 +92,7 @@ public void close() throws IOException { } - static StarTreeBuilder getSingleTreeBuilder( + StarTreeBuilder getStarTreeBuilder( StarTreeField starTreeField, Map fieldProducerMap, SegmentWriteState state, diff --git a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java index 6c6d26656e4de..31df9a49bebfb 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java @@ -12,12 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene99.Lucene99Codec; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase; -import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.common.Rounding; import org.opensearch.index.codec.composite.Composite99Codec; @@ -31,7 +26,6 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.StarTreeMapper; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -77,34 +71,4 @@ private static StarTreeField getStarTreeField(List d1Cale return new StarTreeField("starTree", dims, metrics, config); } - - public void testStarTreeDocValues() throws IOException { - Directory directory = newDirectory(); - IndexWriterConfig conf = newIndexWriterConfig(null); - conf.setMergePolicy(newLogMergePolicy()); - RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); - Document doc = new Document(); - doc.add(new SortedNumericDocValuesField("sndv", 1)); - doc.add(new SortedNumericDocValuesField("dv", 1)); - doc.add(new SortedNumericDocValuesField("field", 1)); - iw.addDocument(doc); - doc.add(new SortedNumericDocValuesField("sndv", 1)); - doc.add(new SortedNumericDocValuesField("dv", 1)); - doc.add(new SortedNumericDocValuesField("field", 1)); - iw.addDocument(doc); - iw.forceMerge(1); - doc.add(new SortedNumericDocValuesField("sndv", 2)); - doc.add(new SortedNumericDocValuesField("dv", 2)); - doc.add(new SortedNumericDocValuesField("field", 2)); - iw.addDocument(doc); - doc.add(new SortedNumericDocValuesField("sndv", 2)); - doc.add(new SortedNumericDocValuesField("dv", 2)); - doc.add(new SortedNumericDocValuesField("field", 2)); - iw.addDocument(doc); - iw.forceMerge(1); - iw.close(); - - // TODO : validate star tree structures that got created - directory.close(); - } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index a042cefea879f..b78130e72aba1 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; -import org.apache.lucene.index.BaseStarTreeBuilder; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; @@ -176,7 +175,7 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException { } @Override - public Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException { + public Iterator sortAndAggregateStarTreeDocuments() throws IOException { return null; } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java index 88003c183f98b..518c6729c2e1a 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java @@ -94,17 +94,19 @@ public void test_buildWithNoStarTreeFields() throws IOException { verifyNoInteractions(docValuesProducer); } - public void test_getSingleTreeBuilder() throws IOException { + public void test_getStarTreeBuilder() throws IOException { when(mapperService.getCompositeFieldTypes()).thenReturn(Set.of(starTreeFieldType)); - StarTreeBuilder starTreeBuilder = StarTreesBuilder.getSingleTreeBuilder(starTreeField, fieldProducerMap, segmentWriteState, mapperService); + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, segmentWriteState, mapperService); + StarTreeBuilder starTreeBuilder = starTreesBuilder.getStarTreeBuilder(starTreeField, fieldProducerMap, segmentWriteState, mapperService); assertTrue(starTreeBuilder instanceof OnHeapStarTreeBuilder); } - public void test_getSingleTreeBuilder_illegalArgument() { + public void test_getStarTreeBuilder_illegalArgument() { when(mapperService.getCompositeFieldTypes()).thenReturn(Set.of(starTreeFieldType)); StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(1, new HashSet<>(), StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP); StarTreeField starTreeField = new StarTreeField("star_tree", new ArrayList<>(), new ArrayList<>(), starTreeFieldConfiguration); - assertThrows(IllegalArgumentException.class, () -> StarTreesBuilder.getSingleTreeBuilder(starTreeField, fieldProducerMap, segmentWriteState, mapperService)); + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, segmentWriteState, mapperService); + assertThrows(IllegalArgumentException.class, () -> starTreesBuilder.getStarTreeBuilder(starTreeField, fieldProducerMap, segmentWriteState, mapperService)); } public void test_closeWithNoStarTreeFields() throws IOException {