From 60e0fdd5cfeb23d2c09cea21eda1fe416b3f4fd4 Mon Sep 17 00:00:00 2001 From: Sunjeet Singh Date: Fri, 30 Aug 2024 16:35:03 -0700 Subject: [PATCH] Resharding: Consumer-side support for list, set, map types --- docs/advanced-topics.md | 2 +- .../diffview/HollowDiffUIServerTest.java | 4 +- .../core/read/engine/HollowBlobReader.java | 19 +- .../read/engine/HollowTypeDataElements.java | 23 ++ .../engine/HollowTypeDataElementsJoiner.java | 75 +++++ .../HollowTypeDataElementsSplitter.java | 73 +++++ .../core/read/engine/HollowTypeReadState.java | 22 +- .../read/engine/HollowTypeReadStateShard.java | 8 + .../engine/HollowTypeReshardingStrategy.java | 191 +++++++++++ .../hollow/core/read/engine/ShardsHolder.java | 6 + ...HollowListDeltaHistoricalStateCreator.java | 14 +- .../list/HollowListTypeDataElements.java | 38 ++- .../HollowListTypeDataElementsJoiner.java | 73 +++++ .../HollowListTypeDataElementsSplitter.java | 83 +++++ .../engine/list/HollowListTypeReadState.java | 185 +++++++---- .../list/HollowListTypeReadStateShard.java | 112 ++----- .../HollowListTypeReshardingStrategy.java | 18 ++ .../list/HollowListTypeShardsHolder.java | 47 +++ .../HollowMapDeltaHistoricalStateCreator.java | 23 +- .../engine/map/HollowMapTypeDataElements.java | 49 ++- .../map/HollowMapTypeDataElementsJoiner.java | 92 ++++++ .../HollowMapTypeDataElementsSplitter.java | 91 ++++++ .../engine/map/HollowMapTypeReadState.java | 302 +++++++++++++----- .../map/HollowMapTypeReadStateShard.java | 232 +++----------- .../map/HollowMapTypeReshardingStrategy.java | 18 ++ .../engine/map/HollowMapTypeShardsHolder.java | 47 +++ ...llowObjectDeltaHistoricalStateCreator.java | 3 +- .../object/HollowObjectTypeDataElements.java | 50 +-- .../HollowObjectTypeDataElementsJoiner.java | 89 +++--- .../HollowObjectTypeDataElementsSplitter.java | 79 ++--- .../object/HollowObjectTypeReadState.java | 239 +++----------- .../HollowObjectTypeReadStateShard.java | 17 +- .../HollowObjectTypeReshardingStrategy.java | 18 ++ .../object/HollowObjectTypeShardsHolder.java | 47 +++ .../HollowSetDeltaHistoricalStateCreator.java | 15 +- .../engine/set/HollowSetTypeDataElements.java | 47 ++- .../set/HollowSetTypeDataElementsJoiner.java | 89 ++++++ .../HollowSetTypeDataElementsSplitter.java | 89 ++++++ .../engine/set/HollowSetTypeReadState.java | 267 +++++++++++----- .../set/HollowSetTypeReadStateShard.java | 182 +++-------- .../set/HollowSetTypeReshardingStrategy.java | 18 ++ .../engine/set/HollowSetTypeShardsHolder.java | 47 +++ .../write/HollowObjectTypeWriteState.java | 11 - .../core/write/HollowTypeWriteState.java | 16 + .../hollow/tools/history/HollowHistory.java | 6 +- .../HollowIncrementalProducerTest.java | 3 +- .../api/producer/HollowProducerTest.java | 80 +++-- .../producer/model/CustomReferenceType.java | 8 + .../api/producer/model/HasAllTypeStates.java | 19 ++ .../core/read/HollowBlobOptionalPartTest.java | 3 +- ...ctHollowTypeDataElementsSplitJoinTest.java | 55 ++++ .../HollowTypeReshardingStrategyTest.java | 236 ++++++++++++++ ...llowListTypeDataElementsSplitJoinTest.java | 87 +++++ .../HollowListTypeDataElementsJoinerTest.java | 91 ++++++ ...llowListTypeDataElementsSplitJoinTest.java | 72 +++++ ...ollowListTypeDataElementsSplitterTest.java | 49 +++ .../list/HollowListTypeReadStateTest.java | 45 +++ ...ollowMapTypeDataElementsSplitJoinTest.java | 121 +++++++ .../HollowMapTypeDataElementsJoinerTest.java | 112 +++++++ ...ollowMapTypeDataElementsSplitJoinTest.java | 71 ++++ ...HollowMapTypeDataElementsSplitterTest.java | 52 +++ .../map/HollowMapTypeReadStateTest.java | 45 +++ ...owObjectTypeDataElementsSplitJoinTest.java | 47 +-- ...ollowObjectTypeDataElementsJoinerTest.java | 16 +- ...owObjectTypeDataElementsSplitJoinTest.java | 93 +++--- ...lowObjectTypeDataElementsSplitterTest.java | 14 +- .../object/HollowObjectTypeReadStateTest.java | 166 +--------- ...ollowSetTypeDataElementsSplitJoinTest.java | 89 ++++++ .../HollowSetTypeDataElementsJoinerTest.java | 105 ++++++ ...ollowSetTypeDataElementsSplitJoinTest.java | 71 ++++ ...HollowSetTypeDataElementsSplitterTest.java | 49 +++ .../set/HollowSetTypeReadStateTest.java | 45 +++ .../core/read/map/HollowMapDeltaTest.java | 4 - .../write/HollowObjectTypeWriteStateTest.java | 178 ----------- .../core/write/HollowTypeWriteStateTest.java | 229 ++++++++++++- 75 files changed, 3905 insertions(+), 1526 deletions(-) create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElements.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElementsJoiner.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElementsSplitter.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReadStateShard.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeReshardingStrategy.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/ShardsHolder.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoiner.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReshardingStrategy.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeShardsHolder.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoiner.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitter.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReshardingStrategy.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeShardsHolder.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReshardingStrategy.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeShardsHolder.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsJoiner.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitter.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReshardingStrategy.java create mode 100644 hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeShardsHolder.java create mode 100644 hollow/src/test/java/com/netflix/hollow/api/producer/model/CustomReferenceType.java create mode 100644 hollow/src/test/java/com/netflix/hollow/api/producer/model/HasAllTypeStates.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/AbstractHollowTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/HollowTypeReshardingStrategyTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/list/AbstractHollowListTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsJoinerTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitterTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadStateTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/map/AbstractHollowMapTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoinerTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitterTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadStateTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/set/AbstractHollowSetTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsJoinerTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitJoinTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitterTest.java create mode 100644 hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadStateTest.java diff --git a/docs/advanced-topics.md b/docs/advanced-topics.md index a6bc52f871..89e0a60dda 100644 --- a/docs/advanced-topics.md +++ b/docs/advanced-topics.md @@ -372,7 +372,7 @@ The number of bits used to represent a field which is one of the types (`INT`, ` 32 bits are used to represent a `FLOAT`, and 64 bits are used to represent a `DOUBLE`. -`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of number of bits required to represent the maximum offset, plus one. +`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of bits required to represent the maximum offset, plus one. Each field type may be assigned a null value. For `INT`, `LONG`, and `REFERENCE` fields, null is encoded as a value with all ones. For `FLOAT` and `DOUBLE` fields, null is encoded as special bit sequences. For `STRING` and `BYTES` fields, null is encoded by setting a designated null bit at the beginning of each field, followed by the end offset of the last populated value for that field. diff --git a/hollow-diff-ui/src/test/java/com/netflix/hollow/diffview/HollowDiffUIServerTest.java b/hollow-diff-ui/src/test/java/com/netflix/hollow/diffview/HollowDiffUIServerTest.java index e69d8cbdf2..077c8756dd 100644 --- a/hollow-diff-ui/src/test/java/com/netflix/hollow/diffview/HollowDiffUIServerTest.java +++ b/hollow-diff-ui/src/test/java/com/netflix/hollow/diffview/HollowDiffUIServerTest.java @@ -10,7 +10,7 @@ public class HollowDiffUIServerTest { public void test() throws Exception { HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff(); - HollowDiffUIServer server = new HollowDiffUIServer(); + HollowDiffUIServer server = new HollowDiffUIServer(0); server.addDiff("diff", testDiff); @@ -22,7 +22,7 @@ public void test() throws Exception { public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception { HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff(); - com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(); + com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(0); server.addDiff("diff", testDiff); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java index 8c6bf60893..072775afaa 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java @@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro if(!filter.includes(typeName)) { HollowListTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards); } } else if(schema instanceof HollowSetSchema) { if(!filter.includes(typeName)) { HollowSetTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards); } } else if(schema instanceof HollowMapSchema) { if(!filter.includes(typeName)) { HollowMapTypeReadState.discardSnapshot(in, numShards); } else { - populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards)); + populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards); } } return typeName; } - private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException { - stateEngine.addTypeState(typeState); - typeState.readSnapshot(in, stateEngine.getMemoryRecycler()); - } - private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException { if (numShards<=0 || ((numShards&(numShards-1))!=0)) { throw new IllegalArgumentException("Number of shards must be a power of 2!"); @@ -376,6 +371,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException { int numShards = readNumShards(in); HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName()); if(typeState != null) { + if (shouldReshard(typeState.numShards(), numShards)) { + HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState); + reshardingStrategy.reshard(typeState, typeState.numShards(), numShards); + } typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards); } else { discardDelta(in, schema, numShards); @@ -384,6 +383,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException { return schema.getName(); } + private boolean shouldReshard(int currNumShards, int deltaNumShards) { + return currNumShards != 0 && deltaNumShards != 0 && currNumShards != deltaNumShards; + } + private int readNumShards(HollowBlobInput in) throws IOException { int backwardsCompatibilityBytes = VarInt.readVInt(in); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElements.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElements.java new file mode 100644 index 0000000000..43fdc8e29e --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElements.java @@ -0,0 +1,23 @@ +package com.netflix.hollow.core.read.engine; + +import com.netflix.hollow.core.memory.MemoryMode; +import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; +import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; + +public abstract class HollowTypeDataElements { + + public int maxOrdinal; + + public GapEncodedVariableLengthIntegerReader encodedAdditions; + public GapEncodedVariableLengthIntegerReader encodedRemovals; + + public final ArraySegmentRecycler memoryRecycler; + public final MemoryMode memoryMode; + + public HollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) { + this.memoryMode = memoryMode; + this.memoryRecycler = memoryRecycler; + } + + public abstract void destroy(); +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElementsJoiner.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElementsJoiner.java new file mode 100644 index 0000000000..0ca7d2a029 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowTypeDataElementsJoiner.java @@ -0,0 +1,75 @@ +package com.netflix.hollow.core.read.engine; + +import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; + +public abstract class HollowTypeDataElementsJoiner { + public final int fromMask; + public final int fromOrdinalShift; + public final T[] from; + + public T to; + + public HollowTypeDataElementsJoiner(T[] from) { + this.from = from; + this.fromMask = from.length - 1; + this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length); + + if (from.length<=0 || !((from.length&(from.length-1))==0)) { + throw new IllegalStateException("No. of DataElements to be joined must be a power of 2"); + } + + for (int i=0;i (1<<29) + || from[i].maxOrdinal != 0 && (from.length > (1<<29)/from[i].maxOrdinal) + || from[i].maxOrdinal * from.length + i > (1<<29)) { + throw new IllegalArgumentException("Too large to join, maxOrdinal would exceed 2<<29"); + } + } + + for (HollowTypeDataElements elements : from) { + if (elements.encodedAdditions != null) { + throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " + + "since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " + + "delta data elements are never split/joined"); + } + } + } + + public T join() { + + initToElements(); + to.maxOrdinal = -1; + + populateStats(); + + copyRecords(); + + GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length]; + for (int i=0;i { + public final int numSplits; + public final int toMask; + public final int toOrdinalShift; + public final T from; + + public T[] to; + + public HollowTypeDataElementsSplitter(T from, int numSplits) { + this.from = from; + this.numSplits = numSplits; + this.toMask = numSplits - 1; + this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits); + + if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) { + throw new IllegalStateException("Must split by power of 2"); + } + + if (from.encodedAdditions != null) { + throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " + + "since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " + + "delta data elements are never split/joined"); + } + } + + public T[] split() { + + initToElements(); + for(int i=0;i prevNumShards) { // split existing shards + // Step 1: Grow the number of shards. Each original shard will result in N child shards where N is the sharding factor. + // The child shards will reference into the existing data elements as-is, and reuse existing shardOrdinalShift. + // However since the shards array is resized, a read will map into the new shard index, as a result a subset of + // ordinals in each shard will be accessed. In the next "splitting" step, the data elements in these new shards + // will be filtered to only retain the subset of ordinals that are actually accessed. + // + // This is an atomic update to shardsVolatile: full construction happens-before the store to shardsVolatile, + // in other words a fully constructed object as visible to this thread will be visible to other threads that + // load the new shardsVolatile. + typeState.updateShardsVolatile(expandWithOriginalDataElements(typeState.getShardsVolatile(), shardingFactor)); + + // Step 2: Split each original data element into N child data elements where N is the sharding factor. + // Then update each of the N child shards with the respective split of data element, this will be + // sufficient to serve all reads into this shard. Once all child shards for a pre-split parent + // shard have been assigned the split data elements, the parent data elements can be discarded. + for (int i = 0; i < prevNumShards; i++) { + HollowTypeDataElements originalDataElements = typeState.getShardsVolatile().getShards()[i].getDataElements(); + + typeState.updateShardsVolatile(splitDataElementsForOneShard(typeState, i, prevNumShards, shardingFactor)); + + typeState.destroyOriginalDataElements(originalDataElements); + } + // Re-sharding done. + // shardsVolatile now contains newNumShards shards where each shard contains + // a split of original data elements. + + } else { // join existing shards + // Step 1: Join N data elements to create one, where N is the sharding factor. Then update each of the + // N shards to reference the joined result, but with a new shardOrdinalShift. + // Reads will continue to reference the same shard index as before, but the new shardOrdinalShift + // will help these reads land at the right ordinal in the joined shard. When all N old shards + // corresponding to one new shard have been updated, the N pre-join data elements can be destroyed. + for (int i = 0; i < newNumShards; i++) { + HollowTypeDataElements destroyCandidates[] = joinCandidates(typeState, i, shardingFactor); + + typeState.updateShardsVolatile(joinDataElementsForOneShard(typeState, i, shardingFactor)); // atomic update to shardsVolatile + + for (int j = 0; j < shardingFactor; j++) { + typeState.destroyOriginalDataElements(destroyCandidates[j]); + } + } + + // Step 2: Resize the shards array to only keep the first newNumShards shards. + newDataElements = typeState.createTypeDataElements(typeState.getShardsVolatile().getShards().length); + shardOrdinalShifts = new int[typeState.getShardsVolatile().getShards().length]; + copyShardDataElements(typeState.getShardsVolatile(), newDataElements, shardOrdinalShifts); + + HollowTypeReadStateShard[] newShards = Arrays.copyOfRange(typeState.getShardsVolatile().getShards(), 0, newNumShards); + typeState.updateShardsVolatile(newShards); + + // Re-sharding done. + // shardsVolatile now contains newNumShards shards where each shard contains + // a join of original data elements. + } + } catch (Exception e) { + throw new RuntimeException("Error in re-sharding", e); + } + } + + /** + * Given old and new numShards, this method returns the shard resizing multiplier. + */ + public static int shardingFactor(int oldNumShards, int newNumShards) { + if (newNumShards <= 0 || oldNumShards <= 0 || newNumShards == oldNumShards) { + throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards); + } + + boolean isNewGreater = newNumShards > oldNumShards; + int dividend = isNewGreater ? newNumShards : oldNumShards; + int divisor = isNewGreater ? oldNumShards : newNumShards; + + if (dividend % divisor != 0) { + throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards); + } + return dividend / divisor; + } + + private void copyShardDataElements(ShardsHolder from, HollowTypeDataElements[] newDataElements, int[] shardOrdinalShifts) { + for (int i=0; i historicalDataElements.bitsPerElement) { + historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement; + } + } ordinalMapping = new IntMap(removedEntryCount); } @@ -110,8 +112,8 @@ private void copyRecord(int ordinal) { int shardOrdinal = ordinal >> shardOrdinalShift; long bitsPerElement = stateEngineDataElements[shard].bitsPerElement; - long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer); - long fromEndElement = stateEngineDataElements[shard].listPointerData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer); + long fromStartElement = stateEngineDataElements[shard].getStartElement(shardOrdinal); + long fromEndElement = stateEngineDataElements[shard].getEndElement(shardOrdinal); long size = fromEndElement - fromStartElement; historicalDataElements.elementData.copyBits(stateEngineDataElements[shard].elementData, fromStartElement * bitsPerElement, nextStartElement * bitsPerElement, size * bitsPerElement); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElements.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElements.java index 385e15b02b..ff5b85ad82 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElements.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElements.java @@ -23,6 +23,7 @@ import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import java.io.IOException; /** @@ -31,30 +32,21 @@ * During a delta, the HollowListTypeReadState will create a new HollowListTypeDataElements and atomically swap * with the existing one to make sure a consistent view of the data is always available. */ -public class HollowListTypeDataElements { - - int maxOrdinal; +public class HollowListTypeDataElements extends HollowTypeDataElements { FixedLengthData listPointerData; FixedLengthData elementData; - GapEncodedVariableLengthIntegerReader encodedAdditions; - GapEncodedVariableLengthIntegerReader encodedRemovals; - int bitsPerListPointer; int bitsPerElement; - long totalNumberOfElements = 0; - - final ArraySegmentRecycler memoryRecycler; - final MemoryMode memoryMode; + long totalNumberOfElements; public HollowListTypeDataElements(ArraySegmentRecycler memoryRecycler) { this(MemoryMode.ON_HEAP, memoryRecycler); } public HollowListTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) { - this.memoryMode = memoryMode; - this.memoryRecycler = memoryRecycler; + super(memoryMode, memoryRecycler); } void readSnapshot(HollowBlobInput in) throws IOException { @@ -109,9 +101,31 @@ public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataEl new HollowListDeltaApplicator(fromData, deltaData, this).applyDelta(); } + @Override public void destroy() { FixedLengthDataFactory.destroy(listPointerData, memoryRecycler); FixedLengthDataFactory.destroy(elementData, memoryRecycler); } + long getStartElement(int ordinal) { + return ordinal == 0 ? 0 : listPointerData.getElementValue(((long)(ordinal-1) * bitsPerListPointer), bitsPerListPointer); + } + + long getEndElement(int ordinal) { + return listPointerData.getElementValue((long)ordinal * bitsPerListPointer, bitsPerListPointer); + } + + void copyElementsFrom(long startElement, HollowListTypeDataElements src, long srcStartElement, long srcEndElement) { + if (bitsPerElement == src.bitsPerElement) { + // fast path can bulk copy elements + long numElements = srcEndElement - srcStartElement; + elementData.copyBits(src.elementData, srcStartElement * bitsPerElement, startElement * bitsPerElement, numElements * bitsPerElement); + } else { + for (long element=srcStartElement;element { + + public HollowListTypeDataElementsJoiner(HollowListTypeDataElements[] from) { + super(from); + } + + @Override + public void initToElements() { + this.to = new HollowListTypeDataElements(from[0].memoryMode, from[0].memoryRecycler); + } + + @Override + public void populateStats() { + for(int fromIndex=0;fromIndex to.bitsPerElement) { + // uneven bitsPerElement could be the case for consumers that skip type shards with no additions, so pick max across all shards + to.bitsPerElement = from[fromIndex].bitsPerElement; + } + } + + long totalOfListSizes = 0; + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + long startElement = from[fromIndex].getStartElement(fromOrdinal); + long endElement = from[fromIndex].getEndElement(fromOrdinal); + long numElements = endElement - startElement; + totalOfListSizes += numElements; + + } + to.bitsPerListPointer = totalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalOfListSizes); + to.totalNumberOfElements = totalOfListSizes; + } + + @Override + public void copyRecords() { + long elementCounter = 0; + + to.listPointerData = FixedLengthDataFactory.get((long)to.bitsPerListPointer * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler); + to.elementData = FixedLengthDataFactory.get(to.bitsPerElement * to.totalNumberOfElements, to.memoryMode, to.memoryRecycler); + + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shard for e.g. when consumers skip type shards with no additions + HollowListTypeDataElements source = from[fromIndex]; + long startElement = source.getStartElement(fromOrdinal); + long endElement = source.getEndElement(fromOrdinal); + + long numElements = endElement - startElement; + to.copyElementsFrom(elementCounter, source, startElement, endElement); + elementCounter += numElements; + } + to.listPointerData.setElementValue((long)to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java new file mode 100644 index 0000000000..ca2802d07c --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitter.java @@ -0,0 +1,83 @@ +package com.netflix.hollow.core.read.engine.list; + +import com.netflix.hollow.core.memory.FixedLengthDataFactory; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter; + +/** + * Split a {@code HollowListTypeDataElements} into multiple {@code HollowListTypeDataElements}s. + * Ordinals are remapped and corresponding data is copied over. + * The original data elements are not destroyed. + * {@code numSplits} must be a power of 2. + */ +public class HollowListTypeDataElementsSplitter extends HollowTypeDataElementsSplitter { + + public HollowListTypeDataElementsSplitter(HollowListTypeDataElements from, int numSplits) { + super(from, numSplits); + } + + @Override + public void initToElements() { + this.to = new HollowListTypeDataElements[numSplits]; + for(int i=0;i> toOrdinalShift; + to[toIndex].maxOrdinal = toOrdinal; + + long startElement = from.getStartElement(ordinal); + long endElement = from.getEndElement(ordinal); + long numElements = endElement - startElement; + totalOfListSizes[toIndex] += numElements; + } + + long maxShardTotalOfListSizes = 0; + for(int toIndex=0;toIndex maxShardTotalOfListSizes) + maxShardTotalOfListSizes = totalOfListSizes[toIndex]; + } + + for(int toIndex=0;toIndex> toOrdinalShift; + + long startElement = from.getStartElement(ordinal); + long endElement = from.getEndElement(ordinal); + HollowListTypeDataElements target = to[toIndex]; + + long numElements = endElement - startElement; + target.copyElementsFrom(elementCounter[toIndex], from, startElement, endElement); + elementCounter[toIndex] += numElements; + + target.listPointerData.setElementValue((long)target.bitsPerListPointer * toOrdinal, target.bitsPerListPointer, elementCounter[toIndex]); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java index 7ff29b01c3..7d41dd47d1 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReadState.java @@ -20,6 +20,7 @@ import com.netflix.hollow.api.sampling.HollowListSampler; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; @@ -28,8 +29,11 @@ import com.netflix.hollow.core.read.dataaccess.HollowListTypeDataAccess; import com.netflix.hollow.core.read.engine.HollowCollectionTypeReadState; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import com.netflix.hollow.core.read.engine.HollowTypeReadState; +import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.read.engine.ShardsHolder; import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader; import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.read.iterator.HollowListOrdinalIterator; @@ -44,75 +48,81 @@ * A {@link HollowTypeReadState} for LIST type records. */ public class HollowListTypeReadState extends HollowCollectionTypeReadState implements HollowListTypeDataAccess { - private final HollowListSampler sampler; - private final int shardNumberMask; - private final int shardOrdinalShift; - private final HollowListTypeReadStateShard shards[]; - private int maxOrdinal; - public HollowListTypeReadState(HollowReadStateEngine stateEngine, HollowListSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + volatile HollowListTypeShardsHolder shardsVolatile; + + @Override + public ShardsHolder getShardsVolatile() { + return shardsVolatile; + } + + @Override + public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { + this.shardsVolatile = new HollowListTypeShardsHolder(shards); + } + + @Override + public HollowTypeDataElements[] createTypeDataElements(int len) { + return new HollowListTypeDataElements[len]; + } + + @Override + public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { + return new HollowListTypeReadStateShard((HollowListTypeDataElements) dataElements, shardOrdinalShift); } - public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema, int numShards) { + public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowListSchema schema) { super(stateEngine, memoryMode, schema); this.sampler = new HollowListSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowListTypeReadStateShard shards[] = new HollowListTypeReadStateShard[numShards]; - for(int i=0;i 1) + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0;i= endElement) - throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement)); - - elementOrdinal = (int)currentData.elementData.getElementValue(elementIndex * currentData.bitsPerElement, currentData.bitsPerElement); - } while(readWasUnsafe(currentData)); - - return elementOrdinal; + @Override + public HollowListTypeDataElements getDataElements() { + return dataElements; } - public int size(int ordinal) { - HollowListTypeDataElements currentData; - int size; - - do { - currentData = this.currentDataVolatile; - - long startElement; - long endElement; - if (ordinal == 0) { - startElement = 0; - endElement = currentData.listPointerData.getElementValue(0, currentData.bitsPerListPointer); - } else { - long endFixedLengthOffset = (long)ordinal * currentData.bitsPerListPointer; - long startFixedLengthOffset = endFixedLengthOffset - currentData.bitsPerListPointer; - startElement = currentData.listPointerData.getElementValue(startFixedLengthOffset, currentData.bitsPerListPointer); - endElement = currentData.listPointerData.getElementValue(endFixedLengthOffset, currentData.bitsPerListPointer); - } - - size = (int)(endElement - startElement); - } while(readWasUnsafe(currentData)); - - return size; + @Override + public int getShardOrdinalShift() { + return shardOrdinalShift; } - void invalidate() { - setCurrentData(null); + public HollowListTypeReadStateShard(HollowListTypeDataElements dataElements, int shardOrdinalShift) { + this.shardOrdinalShift = shardOrdinalShift; + this.dataElements = dataElements; } - HollowListTypeDataElements currentDataElements() { - return currentDataVolatile; - } - - private boolean readWasUnsafe(HollowListTypeDataElements data) { - HollowUnsafeHandle.getUnsafe().loadFence(); - return data != currentDataVolatile; - } + public int size(int ordinal) { + long startElement = dataElements.getStartElement(ordinal); + long endElement = dataElements.getEndElement(ordinal); + int size = (int)(endElement - startElement); - void setCurrentData(HollowListTypeDataElements data) { - this.currentDataVolatile = data; + return size; } - protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) { + protected void applyShardToChecksum(HollowChecksum checksum, BitSet populatedOrdinals, int shardNumber, int numShards) { int ordinal = populatedOrdinals.nextSetBit(shardNumber); while(ordinal != ORDINAL_NONE) { if((ordinal & (numShards - 1)) == shardNumber) { @@ -109,8 +58,10 @@ protected void applyToChecksum(HollowChecksum checksum, BitSet populatedOrdinals int size = size(shardOrdinal); checksum.applyInt(ordinal); + long startElement = dataElements.getStartElement(shardOrdinal); + long endElement = dataElements.getEndElement(shardOrdinal); for(int i=0;i= endElement) + throw new ArrayIndexOutOfBoundsException("Array index out of bounds: " + listIndex + ", list size: " + (endElement - startElement)); + + int elementOrdinal = (int)dataElements.elementData.getElementValue(elementIndex * dataElements.bitsPerElement, dataElements.bitsPerElement); + return elementOrdinal; + } + public long getApproximateHeapFootprintInBytes() { - HollowListTypeDataElements currentData = currentDataVolatile; - long requiredListPointerBits = ((long)currentData.maxOrdinal + 1) * currentData.bitsPerListPointer; - long requiredElementBits = currentData.totalNumberOfElements * currentData.bitsPerElement; + long requiredListPointerBits = ((long)dataElements.maxOrdinal + 1) * dataElements.bitsPerListPointer; + long requiredElementBits = dataElements.totalNumberOfElements * dataElements.bitsPerElement; long requiredBits = requiredListPointerBits + requiredElementBits; return requiredBits / 8; } public long getApproximateHoleCostInBytes(BitSet populatedOrdinals, int shardNumber, int numShards) { - HollowListTypeDataElements currentData = currentDataVolatile; long holeBits = 0; int holeOrdinal = populatedOrdinals.nextClearBit(0); - while(holeOrdinal <= currentData.maxOrdinal) { + while(holeOrdinal <= dataElements.maxOrdinal) { if((holeOrdinal & (numShards - 1)) == shardNumber) - holeBits += currentData.bitsPerListPointer; + holeBits += dataElements.bitsPerListPointer; holeOrdinal = populatedOrdinals.nextClearBit(holeOrdinal + 1); } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReshardingStrategy.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReshardingStrategy.java new file mode 100644 index 0000000000..0aa7bba18f --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeReshardingStrategy.java @@ -0,0 +1,18 @@ +package com.netflix.hollow.core.read.engine.list; + +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter; +import com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy; + +public class HollowListTypeReshardingStrategy extends HollowTypeReshardingStrategy { + @Override + public HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataElements from, int shardingFactor) { + return new HollowListTypeDataElementsSplitter((HollowListTypeDataElements) from, shardingFactor); + } + + @Override + public HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from) { + return new HollowListTypeDataElementsJoiner((HollowListTypeDataElements[]) from); + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeShardsHolder.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeShardsHolder.java new file mode 100644 index 0000000000..f5321c7395 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeShardsHolder.java @@ -0,0 +1,47 @@ +package com.netflix.hollow.core.read.engine.list; + +import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; +import com.netflix.hollow.core.read.engine.ShardsHolder; + +public class HollowListTypeShardsHolder implements ShardsHolder { + final HollowListTypeReadStateShard shards[]; + final int shardNumberMask; + + /** + * Thread safe construction of ShardHolder with given shards + * @param fromShards shards to be used + */ + public HollowListTypeShardsHolder(HollowTypeReadStateShard[] fromShards) { + this.shards = new HollowListTypeReadStateShard[fromShards.length]; + for (int i=0; i historicalDataElements.bitsPerKeyElement) { + historicalDataElements.bitsPerKeyElement = stateEngineDataElements[i].bitsPerKeyElement; + historicalDataElements.emptyBucketKeyValue = stateEngineDataElements[i].emptyBucketKeyValue; + } + if (stateEngineDataElements[i].bitsPerValueElement > historicalDataElements.bitsPerValueElement) { + historicalDataElements.bitsPerValueElement = stateEngineDataElements[i].bitsPerValueElement; + } + if (stateEngineDataElements[i].bitsPerMapEntry > historicalDataElements.bitsPerMapEntry) { + historicalDataElements.bitsPerMapEntry = stateEngineDataElements[i].bitsPerMapEntry; + } + } historicalDataElements.totalNumberOfBuckets = totalBucketCount; ordinalMapping = new IntMap(removedEntryCount); @@ -123,8 +130,8 @@ private void copyRecord(int ordinal) { long bitsPerBucket = historicalDataElements.bitsPerMapEntry; long size = typeState.size(ordinal); - long fromStartBucket = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].mapPointerAndSizeData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerFixedLengthMapPortion, stateEngineDataElements[shard].bitsPerMapPointer); - long fromEndBucket = stateEngineDataElements[shard].mapPointerAndSizeData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerFixedLengthMapPortion, stateEngineDataElements[shard].bitsPerMapPointer); + long fromStartBucket = stateEngineDataElements[shard].getStartBucket(shardOrdinal); + long fromEndBucket = stateEngineDataElements[shard].getEndBucket(shardOrdinal); long numBuckets = fromEndBucket - fromStartBucket; historicalDataElements.mapPointerAndSizeData.setElementValue((long)nextOrdinal * historicalDataElements.bitsPerFixedLengthMapPortion, historicalDataElements.bitsPerMapPointer, nextStartBucket + numBuckets); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElements.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElements.java index cc6b79e434..bd260597db 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElements.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElements.java @@ -23,6 +23,7 @@ import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import java.io.IOException; /** @@ -31,16 +32,11 @@ * During a delta, the HollowMapTypeReadState will create a new HollowMapTypeDataElements and atomically swap * with the existing one to make sure a consistent view of the data is always available. */ -public class HollowMapTypeDataElements { - - int maxOrdinal; +public class HollowMapTypeDataElements extends HollowTypeDataElements { FixedLengthData mapPointerAndSizeData; FixedLengthData entryData; - GapEncodedVariableLengthIntegerReader encodedRemovals; - GapEncodedVariableLengthIntegerReader encodedAdditions; - int bitsPerMapPointer; int bitsPerMapSizeValue; int bitsPerFixedLengthMapPortion; @@ -50,16 +46,12 @@ public class HollowMapTypeDataElements { int emptyBucketKeyValue; long totalNumberOfBuckets; - final ArraySegmentRecycler memoryRecycler; - final MemoryMode memoryMode; - public HollowMapTypeDataElements(ArraySegmentRecycler memoryRecycler) { this(MemoryMode.ON_HEAP, memoryRecycler); } public HollowMapTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) { - this.memoryMode = memoryMode; - this.memoryRecycler = memoryRecycler; + super(memoryMode, memoryRecycler); } void readSnapshot(HollowBlobInput in) throws IOException { @@ -121,9 +113,44 @@ public void applyDelta(HollowMapTypeDataElements fromData, HollowMapTypeDataElem new HollowMapDeltaApplicator(fromData, deltaData, this).applyDelta(); } + @Override public void destroy() { FixedLengthDataFactory.destroy(mapPointerAndSizeData, memoryRecycler); FixedLengthDataFactory.destroy(entryData, memoryRecycler); } + long getStartBucket(int ordinal) { + return ordinal == 0 ? 0 : mapPointerAndSizeData.getElementValue((long)(ordinal - 1) * bitsPerFixedLengthMapPortion, bitsPerMapPointer); + } + + long getEndBucket(int ordinal) { + return mapPointerAndSizeData.getElementValue((long)ordinal * bitsPerFixedLengthMapPortion, bitsPerMapPointer); + } + + int getBucketKeyByAbsoluteIndex(long absoluteBucketIndex) { + return (int)entryData.getElementValue(absoluteBucketIndex * bitsPerMapEntry, bitsPerKeyElement); + } + + int getBucketValueByAbsoluteIndex(long absoluteBucketIndex) { + return (int)entryData.getElementValue((absoluteBucketIndex * bitsPerMapEntry) + bitsPerKeyElement, bitsPerValueElement); + } + + void copyBucketsFrom(long startBucket, HollowMapTypeDataElements src, long srcStartBucket, long srcEndBucket) { + if (bitsPerKeyElement == src.bitsPerKeyElement && bitsPerValueElement == src.bitsPerValueElement) { + // fast path can bulk copy buckets. emptyBucketKeyValue is same since bitsPerKeyElement is the same + long numBuckets = srcEndBucket - srcStartBucket; + entryData.copyBits(src.entryData, srcStartBucket * bitsPerMapEntry, startBucket * bitsPerMapEntry, numBuckets * bitsPerMapEntry); + } else { + for (long bucket=srcStartBucket;bucket { + + public HollowMapTypeDataElementsJoiner(HollowMapTypeDataElements[] from) { + super(from); + } + + @Override + public void initToElements() { + this.to = new HollowMapTypeDataElements(from[0].memoryMode, from[0].memoryRecycler); + } + + @Override + public void populateStats() { + for(int fromIndex=0;fromIndex to.bitsPerKeyElement) { + to.bitsPerKeyElement = source.bitsPerKeyElement; + } + if (source.bitsPerValueElement > to.bitsPerValueElement) { + to.bitsPerValueElement = source.bitsPerValueElement; + } + if (source.bitsPerMapSizeValue > to.bitsPerMapSizeValue) { + to.bitsPerMapSizeValue = source.bitsPerMapSizeValue; + } + } + to.emptyBucketKeyValue = (1 << to.bitsPerKeyElement) - 1; + + long totalOfMapBuckets = 0; + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowMapTypeDataElements source = from[fromIndex]; + + long startBucket = source.getStartBucket(fromOrdinal); + long endBucket = source.getEndBucket(fromOrdinal); + long numBuckets = endBucket - startBucket; + + totalOfMapBuckets += numBuckets; + } + + to.totalNumberOfBuckets = totalOfMapBuckets; + to.bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(to.totalNumberOfBuckets); + to.bitsPerFixedLengthMapPortion = to.bitsPerMapSizeValue + to.bitsPerMapPointer; + to.bitsPerMapEntry = to.bitsPerKeyElement + to.bitsPerValueElement; + } + + @Override + public void copyRecords() { + long bucketCounter = 0; + + to.mapPointerAndSizeData = FixedLengthDataFactory.get((long)(to.maxOrdinal + 1) * to.bitsPerFixedLengthMapPortion, to.memoryMode, to.memoryRecycler); + to.entryData = FixedLengthDataFactory.get(to.totalNumberOfBuckets * to.bitsPerMapEntry, to.memoryMode, to.memoryRecycler); + + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowMapTypeDataElements source = from[fromIndex]; + + long mapSize = 0; + if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shards resulting from skipping type shards with no additions, mapSize remains 0 + long startBucket = source.getStartBucket(fromOrdinal); + long endBucket = source.getEndBucket(fromOrdinal); + + long numBuckets = endBucket - startBucket; + to.copyBucketsFrom(bucketCounter, source, startBucket, endBucket); + bucketCounter += numBuckets; + + mapSize = source.mapPointerAndSizeData.getElementValue((long)(fromOrdinal * source.bitsPerFixedLengthMapPortion) + source.bitsPerMapPointer, source.bitsPerMapSizeValue); + } + to.mapPointerAndSizeData.setElementValue( (long)ordinal * to.bitsPerFixedLengthMapPortion, to.bitsPerMapPointer, bucketCounter); + to.mapPointerAndSizeData.setElementValue((long)(ordinal * to.bitsPerFixedLengthMapPortion) + to.bitsPerMapPointer, to.bitsPerMapSizeValue, mapSize); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitter.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitter.java new file mode 100644 index 0000000000..fad478b567 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitter.java @@ -0,0 +1,91 @@ +package com.netflix.hollow.core.read.engine.map; + + +import com.netflix.hollow.core.memory.FixedLengthDataFactory; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter; + +/** + * Split a {@code HollowMapTypeDataElements} into multiple {@code HollowMapTypeDataElements}s. + * Ordinals are remapped and corresponding data is copied over. + * The original data elements are not destroyed. + * {@code numSplits} must be a power of 2. + */ +public class HollowMapTypeDataElementsSplitter extends HollowTypeDataElementsSplitter { + + public HollowMapTypeDataElementsSplitter(HollowMapTypeDataElements from, int numSplits) { + super(from, numSplits); + } + + @Override + public void initToElements() { + this.to = new HollowMapTypeDataElements[numSplits]; + for(int i=0;i> toOrdinalShift; + to[toIndex].maxOrdinal = toOrdinal; + + long startBucket = from.getStartBucket(ordinal); + long endBucket = from.getEndBucket(ordinal); + long numBuckets = endBucket - startBucket; + + shardTotalOfMapBuckets[toIndex] += numBuckets; + if(shardTotalOfMapBuckets[toIndex] > maxShardTotalOfMapBuckets) { + maxShardTotalOfMapBuckets = shardTotalOfMapBuckets[toIndex]; + } + } + + for(int toIndex=0;toIndex> toOrdinalShift; + + HollowMapTypeDataElements target = to[toIndex]; + long startBucket = from.getStartBucket(ordinal); + long endBucket = from.getEndBucket(ordinal); + + long numBuckets = endBucket - startBucket; + target.copyBucketsFrom(bucketCounter[toIndex], from, startBucket, endBucket); + bucketCounter[toIndex] += numBuckets; + + target.mapPointerAndSizeData.setElementValue((long)toOrdinal * target.bitsPerFixedLengthMapPortion, target.bitsPerMapPointer, bucketCounter[toIndex]); + long mapSize = from.mapPointerAndSizeData.getElementValue((long)(ordinal * from.bitsPerFixedLengthMapPortion) + from.bitsPerMapPointer, from.bitsPerMapSizeValue); + target.mapPointerAndSizeData.setElementValue((long)(toOrdinal * target.bitsPerFixedLengthMapPortion) + target.bitsPerMapPointer, target.bitsPerMapSizeValue, mapSize); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java index d916fed78b..74435c0360 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeReadState.java @@ -16,12 +16,16 @@ */ package com.netflix.hollow.core.read.engine.map; +import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; +import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; + import com.netflix.hollow.api.sampling.DisabledSamplingDirector; import com.netflix.hollow.api.sampling.HollowMapSampler; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; import com.netflix.hollow.core.index.FieldPaths; import com.netflix.hollow.core.index.key.HollowPrimaryKeyValueDeriver; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; @@ -29,8 +33,12 @@ import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.read.dataaccess.HollowMapTypeDataAccess; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import com.netflix.hollow.core.read.engine.HollowTypeReadState; +import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.read.engine.SetMapKeyHasher; +import com.netflix.hollow.core.read.engine.ShardsHolder; import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader; import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.read.iterator.EmptyMapOrdinalIterator; @@ -45,9 +53,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; -import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; - /** * A {@link HollowTypeReadState} for MAP type records. */ @@ -56,75 +61,80 @@ public class HollowMapTypeReadState extends HollowTypeReadState implements Hollo private final HollowMapSampler sampler; - private final int shardNumberMask; - private final int shardOrdinalShift; - private final HollowMapTypeReadStateShard shards[]; - - private HollowPrimaryKeyValueDeriver keyDeriver; - private int maxOrdinal; - public HollowMapTypeReadState(HollowReadStateEngine stateEngine, HollowMapSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + private volatile HollowPrimaryKeyValueDeriver keyDeriver; + volatile HollowMapTypeShardsHolder shardsVolatile; + + @Override + public ShardsHolder getShardsVolatile() { + return shardsVolatile; } - public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema, int numShards) { + @Override + public void updateShardsVolatile(HollowTypeReadStateShard[] shards) { + this.shardsVolatile = new HollowMapTypeShardsHolder(shards); + } + + @Override + public HollowTypeDataElements[] createTypeDataElements(int len) { + return new HollowMapTypeDataElements[len]; + } + + @Override + public HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift) { + return new HollowMapTypeReadStateShard((HollowMapTypeDataElements) dataElements, shardOrdinalShift); + } + + public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowMapSchema schema) { super(stateEngine, memoryMode, schema); this.sampler = new HollowMapSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowMapTypeReadStateShard shards[] = new HollowMapTypeReadStateShard[numShards]; - for(int i=0; i 1) + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0; i 56 ? + from.fixedLengthData.getLargeElementValue(currentReadFixedLengthStartBit, from.bitsPerField[fieldIndex]) + : from.fixedLengthData.getElementValue(currentReadFixedLengthStartBit, from.bitsPerField[fieldIndex]); + + long toWriteFixedLengthStartBit = ((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex]; + if(to.varLengthData[fieldIndex] == null) { // fixed len data + if(readValue == from.nullValueForField[fieldIndex]) { + writeNullFixedLengthField(to, fieldIndex, toWriteFixedLengthStartBit); + } + else { + to.fixedLengthData.setElementValue(toWriteFixedLengthStartBit, to.bitsPerField[fieldIndex], readValue); + } } else { - long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex); - long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex); - long size = fromEndByte - fromStartByte; - - to.fixedLengthData.setElementValue(((long)toOrdinal * to.bitsPerRecord) + to.bitOffsetPerField[fieldIndex], to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size); - to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size); - - currentWriteVarLengthDataPointers[fieldIndex] += size; + if ((readValue & (1L << (from.bitsPerField[fieldIndex] - 1))) != 0) { // null check is the first bit set (other bits have an offset of the last non-null value) + writeNullVarLengthField(to, fieldIndex, toWriteFixedLengthStartBit, currentWriteVarLengthDataPointers); + } else { + long fromStartByte = varLengthStartByte(from, fromOrdinal, fieldIndex); + long fromEndByte = varLengthEndByte(from, fromOrdinal, fieldIndex); + long size = fromEndByte - fromStartByte; + + to.fixedLengthData.setElementValue(toWriteFixedLengthStartBit, to.bitsPerField[fieldIndex], currentWriteVarLengthDataPointers[fieldIndex] + size); + to.varLengthData[fieldIndex].copy(from.varLengthData[fieldIndex], fromStartByte, currentWriteVarLengthDataPointers[fieldIndex], size); + currentWriteVarLengthDataPointers[fieldIndex] += size; + } } } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoiner.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoiner.java index 218b538a8c..218685309e 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoiner.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsJoiner.java @@ -6,7 +6,8 @@ import com.netflix.hollow.core.memory.FixedLengthDataFactory; import com.netflix.hollow.core.memory.VariableLengthDataFactory; -import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsJoiner; +import com.netflix.hollow.core.schema.HollowObjectSchema; /** @@ -15,62 +16,24 @@ * The original data elements are not destroyed. * The no. of passed data elements must be a power of 2. */ -class HollowObjectTypeDataElementsJoiner { +public class HollowObjectTypeDataElementsJoiner extends HollowTypeDataElementsJoiner { - HollowObjectTypeDataElements join(HollowObjectTypeDataElements[] from) { - final int fromMask = from.length - 1; - final int fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length); - long[] currentWriteVarLengthDataPointers; + private HollowObjectSchema schema; - if (from.length<=0 || !((from.length&(from.length-1))==0)) { - throw new IllegalStateException("No. of DataElements to be joined must be a power of 2"); - } - - HollowObjectTypeDataElements to = new HollowObjectTypeDataElements(from[0].schema, from[0].memoryMode, from[0].memoryRecycler); - currentWriteVarLengthDataPointers = new long[from[0].schema.numFields()]; - - populateStats(to, from); - - GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length]; - for (int i=0;i> fromOrdinalShift; - - if (fromOrdinal <= from[fromIndex].maxOrdinal) { - copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers); - } else { - // lopsided shards could result for consumers that skip type shards with no additions - writeNullRecord(to, ordinal, currentWriteVarLengthDataPointers); - } - } - - return to; + public HollowObjectTypeDataElementsJoiner(HollowObjectTypeDataElements[] from) { + super(from); + this.schema = from[0].schema; } - private void writeNullRecord(HollowObjectTypeDataElements to, int toOrdinal, long[] currentWriteVarLengthDataPointers) { - for(int fieldIndex=0;fieldIndex> fromOrdinalShift; + + if (fromOrdinal <= from[fromIndex].maxOrdinal) { + copyRecord(to, ordinal, from[fromIndex], fromOrdinal, currentWriteVarLengthDataPointers); + } else { + // handle lopsided shards that could result from e.g. consumers skipping type shards with no additions + writeNullRecord(to, ordinal, currentWriteVarLengthDataPointers); + } + } + + } + + private void writeNullRecord(HollowObjectTypeDataElements to, int toOrdinal, long[] currentWriteVarLengthDataPointers) { + for(int fieldIndex=0;fieldIndex { + private HollowObjectSchema schema; - HollowObjectTypeDataElements[] split(HollowObjectTypeDataElements from, int numSplits) { - final int toMask = numSplits - 1; - final int toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits); - final long[][] currentWriteVarLengthDataPointers; - - if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) { - throw new IllegalStateException("Must split by power of 2"); - } - - HollowObjectTypeDataElements[] to = new HollowObjectTypeDataElements[numSplits]; - for(int i=0;i> toOrdinalShift; - copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]); + to[i] = new HollowObjectTypeDataElements(schema, from.memoryMode, from.memoryRecycler); } - return to; } - private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDataElements from, int toMask, int toOrdinalShift) { + @Override + public void populateStats() { long[][] varLengthSizes = new long[to.length][from.schema.numFields()]; for(int ordinal=0;ordinal<=from.maxOrdinal;ordinal++) { @@ -93,4 +62,24 @@ private void populateStats(HollowObjectTypeDataElements[] to, HollowObjectTypeDa } } } + + @Override + public void copyRecords() { + final long[][] currentWriteVarLengthDataPointers = new long[to.length][from.schema.numFields()]; + + for(int i=0;i> toOrdinalShift; + copyRecord(to[toIndex], toOrdinal, from, i, currentWriteVarLengthDataPointers[toIndex]); + } + } } diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java index 42e1274993..39ea37d360 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadState.java @@ -31,7 +31,9 @@ import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import com.netflix.hollow.core.read.engine.HollowTypeReadState; +import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader; import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.schema.HollowObjectSchema; @@ -39,41 +41,37 @@ import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.io.IOException; -import java.util.Arrays; import java.util.BitSet; /** * A {@link HollowTypeReadState} for OBJECT type records. */ public class HollowObjectTypeReadState extends HollowTypeReadState implements HollowObjectTypeDataAccess { - private final HollowObjectSchema unfilteredSchema; private final HollowObjectSampler sampler; + private int maxOrdinal; - volatile ShardsHolder shardsVolatile; - static class ShardsHolder { - final HollowObjectTypeReadStateShard shards[]; - final int shardNumberMask; + volatile HollowObjectTypeShardsHolder shardsVolatile; - private ShardsHolder(HollowObjectTypeReadStateShard[] fromShards) { - this.shards = fromShards; - this.shardNumberMask = fromShards.length - 1; - } + @Override + public HollowObjectTypeShardsHolder getShardsVolatile() { + return shardsVolatile; + } - private ShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) { - int numShards = oldShards.length; - HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards]; - for (int i=0; i 1) @@ -120,7 +113,7 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler shardDataElements.readSnapshot(in, unfilteredSchema); newShards[i] = new HollowObjectTypeReadStateShard(getSchema(), shardDataElements, shardOrdinalShift); } - shardsVolatile = new ShardsHolder(newShards); + shardsVolatile = new HollowObjectTypeShardsHolder(newShards); if(shardsVolatile.shards.length == 1) maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal; @@ -130,9 +123,6 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler @Override public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException { - if (shouldReshard(shardsVolatile.shards.length, deltaNumShards)) { - reshard(deltaNumShards); - } if(shardsVolatile.shards.length > 1) maxOrdinal = VarInt.readVInt(in); @@ -164,7 +154,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen nextData.applyDelta(oldData, deltaData); HollowObjectTypeReadStateShard newShard = new HollowObjectTypeReadStateShard(getSchema(), nextData, shardsVolatile.shards[i].shardOrdinalShift); - shardsVolatile = new ShardsHolder(shardsVolatile.shards, newShard, i); + shardsVolatile = new HollowObjectTypeShardsHolder(shardsVolatile.shards, newShard, i); notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length); deltaData.encodedAdditions.destroy(); @@ -178,157 +168,6 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal; } - /** - * Given old and new numShards, this method returns the shard resizing multiplier. - */ - static int shardingFactor(int oldNumShards, int newNumShards) { - if (newNumShards <= 0 || oldNumShards <= 0 || newNumShards == oldNumShards) { - throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards); - } - - boolean isNewGreater = newNumShards > oldNumShards; - int dividend = isNewGreater ? newNumShards : oldNumShards; - int divisor = isNewGreater ? oldNumShards : newNumShards; - - if (dividend % divisor != 0) { - throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards); - } - return dividend / divisor; - } - - /** - * Reshards this type state to the desired shard count using O(shard size) space while supporting concurrent reads - * into the underlying data elements. - * - * @param newNumShards The desired number of shards - */ - void reshard(int newNumShards) { - int prevNumShards = shardsVolatile.shards.length; - int shardingFactor = shardingFactor(prevNumShards, newNumShards); - HollowObjectTypeDataElements[] newDataElements; - int[] shardOrdinalShifts; - - if (newNumShards>prevNumShards) { // split existing shards - // Step 1: Grow the number of shards. Each original shard will result in N child shards where N is the sharding factor. - // The child shards will reference into the existing data elements as-is, and reuse existing shardOrdinalShift. - // However since the shards array is resized, a read will map into the new shard index, as a result a subset of - // ordinals in each shard will be accessed. In the next "splitting" step, the data elements in these new shards - // will be filtered to only retain the subset of ordinals that are actually accessed. - // - // This is an atomic update to shardsVolatile: full construction happens-before the store to shardsVolatile, - // in other words a fully constructed object as visible to this thread will be visible to other threads that - // load the new shardsVolatile. - shardsVolatile = expandWithOriginalDataElements(shardsVolatile, shardingFactor); - - // Step 2: Split each original data element into N child data elements where N is the sharding factor. - // Then update each of the N child shards with the respective split of data element, this will be - // sufficient to serve all reads into this shard. Once all child shards for a pre-split parent - // shard have been assigned the split data elements, the parent data elements can be discarded. - for(int i=0; i historicalDataElements.bitsPerElement) { + historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement; + historicalDataElements.emptyBucketValue = stateEngineDataElements[i].emptyBucketValue; + } + } historicalDataElements.totalNumberOfBuckets = totalBucketCount; ordinalMapping = new IntMap(removedEntryCount); @@ -121,8 +124,8 @@ private void copyRecord(int ordinal) { long bitsPerBucket = historicalDataElements.bitsPerElement; long size = typeState.size(ordinal); - long fromStartBucket = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].setPointerAndSizeData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerFixedLengthSetPortion, stateEngineDataElements[shard].bitsPerSetPointer); - long fromEndBucket = stateEngineDataElements[shard].setPointerAndSizeData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerFixedLengthSetPortion, stateEngineDataElements[shard].bitsPerSetPointer); + long fromStartBucket = stateEngineDataElements[shard].getStartBucket(shardOrdinal); + long fromEndBucket = stateEngineDataElements[shard].getEndBucket(shardOrdinal); long numBuckets = fromEndBucket - fromStartBucket; historicalDataElements.setPointerAndSizeData.setElementValue((long)nextOrdinal * historicalDataElements.bitsPerFixedLengthSetPortion, historicalDataElements.bitsPerSetPointer, nextStartBucket + numBuckets); diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElements.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElements.java index 8aecd840e5..c7387a1a52 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElements.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElements.java @@ -23,6 +23,7 @@ import com.netflix.hollow.core.memory.encoding.VarInt; import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler; import com.netflix.hollow.core.read.HollowBlobInput; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import java.io.IOException; /** @@ -32,15 +33,10 @@ * with the existing one to make sure a consistent view of the data is always available. */ -public class HollowSetTypeDataElements { +public class HollowSetTypeDataElements extends HollowTypeDataElements { - int maxOrdinal; - - FixedLengthData setPointerAndSizeData; - FixedLengthData elementData; - - GapEncodedVariableLengthIntegerReader encodedRemovals; - GapEncodedVariableLengthIntegerReader encodedAdditions; + public FixedLengthData setPointerAndSizeData; + public FixedLengthData elementData; int bitsPerSetPointer; int bitsPerSetSizeValue; @@ -49,16 +45,12 @@ public class HollowSetTypeDataElements { int emptyBucketValue; long totalNumberOfBuckets; - final ArraySegmentRecycler memoryRecycler; - final MemoryMode memoryMode; - public HollowSetTypeDataElements(ArraySegmentRecycler memoryRecycler) { this(MemoryMode.ON_HEAP, memoryRecycler); } public HollowSetTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) { - this.memoryMode = memoryMode; - this.memoryRecycler = memoryRecycler; + super(memoryMode, memoryRecycler); } void readSnapshot(HollowBlobInput in) throws IOException { @@ -117,8 +109,37 @@ public void applyDelta(HollowSetTypeDataElements fromData, HollowSetTypeDataElem new HollowSetDeltaApplicator(fromData, deltaData, this).applyDelta(); } + @Override public void destroy() { FixedLengthDataFactory.destroy(setPointerAndSizeData, memoryRecycler); FixedLengthDataFactory.destroy(elementData, memoryRecycler); } + + long getStartBucket(int ordinal) { + return ordinal == 0 ? 0 : setPointerAndSizeData.getElementValue((long)(ordinal - 1) * bitsPerFixedLengthSetPortion, bitsPerSetPointer); + } + + long getEndBucket(int ordinal) { + return setPointerAndSizeData.getElementValue((long) ordinal * bitsPerFixedLengthSetPortion, bitsPerSetPointer); + } + + int getBucketValue(long absoluteBucketIndex) { + return (int)elementData.getElementValue(absoluteBucketIndex * bitsPerElement, bitsPerElement); + } + + void copyBucketsFrom(long startBucket, HollowSetTypeDataElements src, long srcStartBucket, long srcEndBucket) { + if (bitsPerElement == src.bitsPerElement) { + // fast path can bulk copy buckets. emptyBucketValue is same since bitsPerElement is same + long numBuckets = srcEndBucket - srcStartBucket; + elementData.copyBits(src.elementData, srcStartBucket * bitsPerElement, startBucket * bitsPerElement, numBuckets * bitsPerElement); + } else { + for (long bucket=srcStartBucket;bucket { + + public HollowSetTypeDataElementsJoiner(HollowSetTypeDataElements[] from) { + super(from); + } + + @Override + public void initToElements() { + this.to = new HollowSetTypeDataElements(from[0].memoryMode, from[0].memoryRecycler); + to.bitsPerElement = 0; + } + + @Override + public void populateStats() { + for(int fromIndex=0;fromIndex to.bitsPerElement) { + to.bitsPerElement = source.bitsPerElement; + } + if (source.bitsPerSetSizeValue > to.bitsPerSetSizeValue) { + to.bitsPerSetSizeValue = source.bitsPerSetSizeValue; + } + } + to.emptyBucketValue = (1 << to.bitsPerElement) - 1; + + long totalOfSetBuckets = 0; + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowSetTypeDataElements source = from[fromIndex]; + + long startBucket = source.getStartBucket(fromOrdinal); + long endBucket = source.getEndBucket(fromOrdinal); + long numBuckets = endBucket - startBucket; + + totalOfSetBuckets += numBuckets; + } + + to.totalNumberOfBuckets = totalOfSetBuckets; + to.bitsPerSetPointer = 64 - Long.numberOfLeadingZeros(to.totalNumberOfBuckets); + to.bitsPerFixedLengthSetPortion = to.bitsPerSetPointer + to.bitsPerSetSizeValue; + } + + @Override + public void copyRecords() { + long bucketCounter = 0; + + to.setPointerAndSizeData = FixedLengthDataFactory.get(((long)to.maxOrdinal + 1) * to.bitsPerFixedLengthSetPortion, to.memoryMode, to.memoryRecycler); + to.elementData = FixedLengthDataFactory.get(to.totalNumberOfBuckets * to.bitsPerElement, to.memoryMode, to.memoryRecycler); + + for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) { + int fromIndex = ordinal & fromMask; + int fromOrdinal = ordinal >> fromOrdinalShift; + + HollowSetTypeDataElements source = from[fromIndex]; + + long setSize = 0; + if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shards resulting from skipping type shards with no additions, setSize remains 0 + long startBucket = source.getStartBucket(fromOrdinal); + long endBucket = source.getEndBucket(fromOrdinal); + + long numBuckets = endBucket - startBucket; + to.copyBucketsFrom(bucketCounter, source, startBucket, endBucket); + bucketCounter += numBuckets; + + setSize = source.setPointerAndSizeData.getElementValue((long)(fromOrdinal * source.bitsPerFixedLengthSetPortion) + source.bitsPerSetPointer, source.bitsPerSetSizeValue); + } + to.setPointerAndSizeData.setElementValue((long)ordinal * to.bitsPerFixedLengthSetPortion, to.bitsPerSetPointer, bucketCounter); + to.setPointerAndSizeData.setElementValue((long)(ordinal * to.bitsPerFixedLengthSetPortion) + to.bitsPerSetPointer, to.bitsPerSetSizeValue, setSize); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitter.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitter.java new file mode 100644 index 0000000000..cd535af3bb --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitter.java @@ -0,0 +1,89 @@ +package com.netflix.hollow.core.read.engine.set; + +import com.netflix.hollow.core.memory.FixedLengthDataFactory; +import com.netflix.hollow.core.read.engine.HollowTypeDataElementsSplitter; + +/** + * Split a {@code HollowSetTypeDataElements} into multiple {@code HollowSetTypeDataElements}s. + * Ordinals are remapped and corresponding data is copied over. + * The original data elements are not destroyed. + * {@code numSplits} must be a power of 2. + */ +public class HollowSetTypeDataElementsSplitter extends HollowTypeDataElementsSplitter { + + public HollowSetTypeDataElementsSplitter(HollowSetTypeDataElements from, int numSplits) { + super(from, numSplits); + } + + @Override + public void initToElements() { + this.to = new HollowSetTypeDataElements[numSplits]; + for(int i=0;i> toOrdinalShift; + to[toIndex].maxOrdinal = toOrdinal; + + long startBucket = from.getStartBucket(ordinal); + long endBucket = from.getEndBucket(ordinal); + long numBuckets = endBucket - startBucket; + + shardTotalOfSetBuckets[toIndex] += numBuckets; + if(shardTotalOfSetBuckets[toIndex] > maxShardTotalOfSetBuckets) { + maxShardTotalOfSetBuckets = shardTotalOfSetBuckets[toIndex]; + } + } + + for(int toIndex=0;toIndex> toOrdinalShift; + + long startBucket = from.getStartBucket(ordinal); + long endBucket = from.getEndBucket(ordinal); + HollowSetTypeDataElements target = to[toIndex]; + + long numBuckets = endBucket - startBucket; + target.copyBucketsFrom(bucketCounter[toIndex], from, startBucket, endBucket); + bucketCounter[toIndex] += numBuckets; + + target.setPointerAndSizeData.setElementValue((long)toOrdinal * target.bitsPerFixedLengthSetPortion, target.bitsPerSetPointer, bucketCounter[toIndex]); + long setSize = from.setPointerAndSizeData.getElementValue((long)(ordinal * from.bitsPerFixedLengthSetPortion) + from.bitsPerSetPointer, from.bitsPerSetSizeValue); + target.setPointerAndSizeData.setElementValue((long)(toOrdinal * target.bitsPerFixedLengthSetPortion) + target.bitsPerSetPointer, target.bitsPerSetSizeValue, setSize); + } + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java index 5a9bb85e84..2db7302330 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeReadState.java @@ -16,12 +16,16 @@ */ package com.netflix.hollow.core.read.engine.set; +import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; +import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; + import com.netflix.hollow.api.sampling.DisabledSamplingDirector; import com.netflix.hollow.api.sampling.HollowSampler; import com.netflix.hollow.api.sampling.HollowSamplingDirector; import com.netflix.hollow.api.sampling.HollowSetSampler; import com.netflix.hollow.core.index.FieldPaths; import com.netflix.hollow.core.index.key.HollowPrimaryKeyValueDeriver; +import com.netflix.hollow.core.memory.HollowUnsafeHandle; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader; import com.netflix.hollow.core.memory.encoding.VarInt; @@ -30,8 +34,12 @@ import com.netflix.hollow.core.read.dataaccess.HollowSetTypeDataAccess; import com.netflix.hollow.core.read.engine.HollowCollectionTypeReadState; import com.netflix.hollow.core.read.engine.HollowReadStateEngine; +import com.netflix.hollow.core.read.engine.HollowTypeDataElements; import com.netflix.hollow.core.read.engine.HollowTypeReadState; +import com.netflix.hollow.core.read.engine.HollowTypeReadStateShard; import com.netflix.hollow.core.read.engine.PopulatedOrdinalListener; +import com.netflix.hollow.core.read.engine.SetMapKeyHasher; +import com.netflix.hollow.core.read.engine.ShardsHolder; import com.netflix.hollow.core.read.engine.SnapshotPopulatedOrdinalsReader; import com.netflix.hollow.core.read.filter.HollowFilterConfig; import com.netflix.hollow.core.read.iterator.EmptyOrdinalIterator; @@ -46,9 +54,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static com.netflix.hollow.core.HollowConstants.ORDINAL_NONE; -import static com.netflix.hollow.core.index.FieldPaths.FieldPathException.ErrorKind.NOT_BINDABLE; - /** * A {@link HollowTypeReadState} for OBJECT type records. */ @@ -56,76 +61,81 @@ public class HollowSetTypeReadState extends HollowCollectionTypeReadState implem private static final Logger LOG = Logger.getLogger(HollowSetTypeReadState.class.getName()); private final HollowSetSampler sampler; - - private final int shardNumberMask; - private final int shardOrdinalShift; - private final HollowSetTypeReadStateShard shards[]; - - private HollowPrimaryKeyValueDeriver keyDeriver; - + private int maxOrdinal; - public HollowSetTypeReadState(HollowReadStateEngine stateEngine, HollowSetSchema schema, int numShards) { - this(stateEngine, MemoryMode.ON_HEAP, schema, numShards); + private volatile HollowPrimaryKeyValueDeriver keyDeriver; + volatile HollowSetTypeShardsHolder shardsVolatile; + + @Override + public ShardsHolder getShardsVolatile() { + return shardsVolatile; } - public HollowSetTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memoryMode, HollowSetSchema schema, int numShards) { - super(stateEngine, memoryMode, schema); - this.sampler = new HollowSetSampler(schema.getName(), DisabledSamplingDirector.INSTANCE); - this.shardNumberMask = numShards - 1; - this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); - - if(numShards < 1 || 1 << shardOrdinalShift != numShards) - throw new IllegalArgumentException("Number of shards must be a power of 2!"); - - HollowSetTypeReadStateShard shards[] = new HollowSetTypeReadStateShard[numShards]; - for(int i=0;i 1) + public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException { + if(numShards > 1) maxOrdinal = VarInt.readVInt(in); - - for(int i=0;i * diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java index 9c653082c5..c0308a52c2 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java @@ -32,12 +32,14 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.BitSet; +import java.util.logging.Logger; /** * The {@link HollowTypeWriteState} contains and is the root handle to all of the records of a specific type in * a {@link HollowWriteStateEngine}. */ public abstract class HollowTypeWriteState { + private static final Logger LOG = Logger.getLogger(HollowTypeWriteState.class.getName()); protected final HollowSchema schema; @@ -389,4 +391,18 @@ public HollowWriteStateEngine getStateEngine() { return stateEngine; } + public boolean allowTypeResharding() { + if (this instanceof HollowObjectTypeWriteState) { + if (stateEngine.allowTypeResharding()) { + if (isNumShardsPinned()) { + LOG.warning("Type re-sharding feature was enabled but num shards is pinned (likely using the " + + "HollowShardLargeType annotation in the data model). Proceeding with fixed num shards."); + return false; + } + } + return stateEngine.allowTypeResharding(); + } else { + return false; // only supported for object types + } + } } diff --git a/hollow/src/main/java/com/netflix/hollow/tools/history/HollowHistory.java b/hollow/src/main/java/com/netflix/hollow/tools/history/HollowHistory.java index 4cc8d4cafa..f54b3b123f 100644 --- a/hollow/src/main/java/com/netflix/hollow/tools/history/HollowHistory.java +++ b/hollow/src/main/java/com/netflix/hollow/tools/history/HollowHistory.java @@ -278,8 +278,8 @@ public void deltaOccurred(long newVersion) { // {@code latestVersion} is still the version from before the delta transition. {@code latestVersion} is // updated in this method. - // Update the state stored in keyIndex (in its member readStateEngine) with the passed read state engine. - // The readStateEngine within keyIndex stores an ever-growing state of all keys ever seen by this HollowHistory + // Update the state stored in keyIndex. + // The keyIndex stores an ever-growing state of all keys ever seen by this HollowHistory // instance i.e. all keys seen in initial load or a successive double-snapshot and all keys added/removed in // deltas and reverse deltas. It doesn't store a copy of the keyed records, instead just the primary key values // for each type that has a primary key defined (in schema or custom via history helpers). @@ -290,7 +290,7 @@ public void deltaOccurred(long newVersion) { // data corresponding to ghost records in the "to" state in any state transition (i.e. records corresponding to // ordinals were populated in the "from" state but are not populated in the "to" state) into a new state engine // where it assigns new ordinals serially(0, 1, 2, etc.) to each such record. A mapping of original ordinal - // in the read state to its new ordinal position in the historical state data access for all such records in + // in the read state to its reference in the historical state data access for all such records in // each type is stored in the member typeRemovedOrdinalMapping. HollowHistoricalStateDataAccess historicalDataAccess = creator.createBasedOnNewDelta(latestVersion, latestHollowReadStateEngine); historicalDataAccess.setNextState(latestHollowReadStateEngine); diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java index 26b3059855..cf9e2c7fad 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowIncrementalProducerTest.java @@ -38,6 +38,7 @@ import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey; import com.netflix.hollow.core.write.objectmapper.flatrecords.FakeHollowSchemaIdentifierMapper; import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter; +import com.netflix.hollow.test.InMemoryBlobStore; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -47,8 +48,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - -import com.netflix.hollow.test.InMemoryBlobStore; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java index f9bd7257c3..0456a3fddd 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java @@ -38,6 +38,8 @@ import com.netflix.hollow.api.producer.fs.HollowFilesystemAnnouncer; import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager; import com.netflix.hollow.api.producer.listener.VetoableListener; +import com.netflix.hollow.api.producer.model.CustomReferenceType; +import com.netflix.hollow.api.producer.model.HasAllTypeStates; import com.netflix.hollow.core.HollowBlobHeader; import com.netflix.hollow.core.read.engine.HollowBlobHeaderReader; import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState; @@ -426,53 +428,91 @@ private void restoreAndAssert(HollowProducer producer, long version, int size, i } @Test - public void testReshardingObjectTypes() { + public void testReshardingAllTypes() { for (boolean allowResharding : Arrays.asList(true, false)) { HollowProducer.Builder producerBuilder = HollowProducer.withPublisher(new FakeBlobPublisher()).withAnnouncer(new HollowFilesystemAnnouncer(tmpFolder.toPath())); if (allowResharding) producerBuilder = producerBuilder.withTypeResharding(true); HollowProducer producer = producerBuilder.withTargetMaxTypeShardSize(32).build(); + producer.initializeDataModel(HasAllTypeStates.class); producer.runCycle(ws -> { // causes 2 shards for Integer at shard size 32 for (int i=0;i<50;i++) { - Set set = new HashSet<>(Collections.singleton((long) i)); - ws.add(new HasNonObjectField(i, set)); + final long val = new Long(i); + Set set = new HashSet<>(Arrays.asList("e" + val)); + List list = Arrays.asList(i); + Map map = new HashMap(){{put("k"+val, new Long(val));}}; + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + set, + list, + map + )); } }); - assertEquals(2, producer.getWriteEngine().getTypeState("Integer").getNumShards()); - int numShardsNonObjectType = producer.getWriteEngine().getTypeState("SetOfLong").getNumShards(); - assertTrue(numShardsNonObjectType >= 1); + assertEquals(2, producer.getWriteEngine().getTypeState("Long").getNumShards()); + assertEquals(2, producer.getWriteEngine().getTypeState("CustomReferenceType").getNumShards()); + assertEquals(8, producer.getWriteEngine().getTypeState("SetOfString").getNumShards()); + assertEquals(4, producer.getWriteEngine().getTypeState("ListOfInteger").getNumShards()); + assertEquals(8, producer.getWriteEngine().getTypeState("MapOfStringToLong").getNumShards()); + producer.runCycle(ws -> { - // 2x the data, causes 4 shards for Integer at shard size 32 - // 2x the collection type, numShards should remain the same (until non object types support resharding) + // 1x the data, causes more num shards at same shard size for (int i=0;i<100;i++) { - Set set = new HashSet<>(Collections.singleton((long) i)); - ws.add(new HasNonObjectField(i, set)); + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); } }); if (allowResharding) { - assertEquals(4, producer.getWriteEngine().getTypeState("Integer").getNumShards()); + assertTrue(2 < producer.getWriteEngine().getTypeState("Long").getNumShards()); + assertTrue(2 < producer.getWriteEngine().getTypeState("CustomReferenceType").getNumShards()); + } else { - assertEquals(2, producer.getWriteEngine().getTypeState("Integer").getNumShards()); + assertEquals(2, producer.getWriteEngine().getTypeState("Long").getNumShards()); + assertEquals(2, producer.getWriteEngine().getTypeState("CustomReferenceType").getNumShards()); + } - assertEquals(numShardsNonObjectType, producer.getWriteEngine().getTypeState("SetOfLong").getNumShards()); + + // producer doesn't support resharding for these types yet + assertEquals(8, producer.getWriteEngine().getTypeState("SetOfString").getNumShards()); + assertEquals(4, producer.getWriteEngine().getTypeState("ListOfInteger").getNumShards()); + assertEquals(8, producer.getWriteEngine().getTypeState("MapOfStringToLong").getNumShards()); producer.runCycle(ws -> { - // still 4 shards, because ghost records + // still same num shards, because ghost records for (int i=0;i<50;i++) { - Set set = new HashSet<>(Collections.singleton((long) i)); - ws.add(new HasNonObjectField(i, set)); + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); } }); producer.runCycle(ws -> { - // back to 2 shards for Integer + // back to original shard count for (int i=0;i<49;i++) { // one change in runCycle - Set set = new HashSet<>(Collections.singleton((long) i)); - ws.add(new HasNonObjectField(i, set)); + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); } }); - assertEquals(2, producer.getWriteEngine().getTypeState("Integer").getNumShards()); + assertEquals(2, producer.getWriteEngine().getTypeState("Long").getNumShards()); + assertEquals(2, producer.getWriteEngine().getTypeState("CustomReferenceType").getNumShards()); + assertEquals(8, producer.getWriteEngine().getTypeState("SetOfString").getNumShards()); + assertEquals(4, producer.getWriteEngine().getTypeState("ListOfInteger").getNumShards()); + assertEquals(8, producer.getWriteEngine().getTypeState("MapOfStringToLong").getNumShards()); } } diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/model/CustomReferenceType.java b/hollow/src/test/java/com/netflix/hollow/api/producer/model/CustomReferenceType.java new file mode 100644 index 0000000000..0b3564fda1 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/model/CustomReferenceType.java @@ -0,0 +1,8 @@ +package com.netflix.hollow.api.producer.model; + +public class CustomReferenceType { + long id; + public CustomReferenceType(long id) { + this.id = id; + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/model/HasAllTypeStates.java b/hollow/src/test/java/com/netflix/hollow/api/producer/model/HasAllTypeStates.java new file mode 100644 index 0000000000..15ab3c8b6a --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/model/HasAllTypeStates.java @@ -0,0 +1,19 @@ +package com.netflix.hollow.api.producer.model; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HasAllTypeStates { + CustomReferenceType customReferenceType; + Set setOfStrings; + List listOfInt; + Map mapOfStringToLong; + + public HasAllTypeStates(CustomReferenceType customReferenceType, Set setOfStrings, List listOfInt, Map mapOfStringToLong) { + this.customReferenceType = customReferenceType; + this.setOfStrings = setOfStrings; + this.listOfInt = listOfInt; + this.mapOfStringToLong = mapOfStringToLong; + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/HollowBlobOptionalPartTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/HollowBlobOptionalPartTest.java index cd590abb5e..c195426389 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/HollowBlobOptionalPartTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/HollowBlobOptionalPartTest.java @@ -34,6 +34,7 @@ import com.netflix.hollow.core.write.objectmapper.TypeA; import com.netflix.hollow.core.write.objectmapper.TypeB; import com.netflix.hollow.core.write.objectmapper.TypeC; +import com.netflix.hollow.test.InMemoryBlobStore; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -43,8 +44,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; - -import com.netflix.hollow.test.InMemoryBlobStore; import org.junit.Assert; import org.junit.Test; diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/AbstractHollowTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/AbstractHollowTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..b75d776a81 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/AbstractHollowTypeDataElementsSplitJoinTest.java @@ -0,0 +1,55 @@ +package com.netflix.hollow.core.read.engine; + +import com.netflix.hollow.core.AbstractStateEngineTest; +import com.netflix.hollow.core.schema.HollowObjectSchema; +import com.netflix.hollow.core.write.HollowObjectTypeWriteState; +import com.netflix.hollow.core.write.HollowObjectWriteRecord; +import org.junit.Before; + +public class AbstractHollowTypeDataElementsSplitJoinTest extends AbstractStateEngineTest { + protected HollowObjectSchema schema; + + @Before + public void setUp() { + schema = new HollowObjectSchema("TestObject", 4); + schema.addField("longField", HollowObjectSchema.FieldType.LONG); + schema.addField("stringField", HollowObjectSchema.FieldType.STRING); + schema.addField("intField", HollowObjectSchema.FieldType.INT); + schema.addField("doubleField", HollowObjectSchema.FieldType.DOUBLE); + + super.setUp(); + } + + @Override + protected void initializeTypeStates() { + writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema)); + } + + protected void populateWriteStateEngine(int numRecords) { + initWriteStateEngine(); + HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema); + for(int i=0;i> shardOrdinalShift; + shardOrdinals[shardIndex][shardOrdinal] = ordinal; + } + + for (int shardIndex=0; shardIndex shardingFactor(0, 1)); + assertIllegalStateException(() -> shardingFactor(2, 0)); + assertIllegalStateException(() -> shardingFactor(1, 1)); + assertIllegalStateException(() -> shardingFactor(1, -1)); + assertIllegalStateException(() -> shardingFactor(2, 3)); + } + + @Test + public void testReshardingIntermediateStages_expandWithOriginalDataElements() throws Exception { + for (int shardingFactor : new int[]{2}) { // , 4, 8, 16, 32, 64, 128, 256, 512, 1024 + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) { + HollowObjectTypeReadState expectedTypeState = populateTypeStateWith(numRecords); + HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(expectedTypeState); + + HollowObjectTypeShardsHolder original = expectedTypeState.getShardsVolatile(); + HollowObjectTypeReadState actualTypeState = new HollowObjectTypeReadState(readStateEngine, MemoryMode.ON_HEAP, schema, schema); + actualTypeState.updateShardsVolatile(reshardingStrategy.expandWithOriginalDataElements(original, shardingFactor)); + + assertEquals(shardingFactor * expectedTypeState.numShards(), actualTypeState.numShards()); + assertDataUnchanged(actualTypeState, numRecords); + } + } + } + + @Test + public void testReshardingIntermediateStages_splitDataElementsForOneShard() throws Exception { + for (int shardingFactor : new int[]{2}) { // , 4, 8, 16, 32, 64, 128, 256, 512, 1024 + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) { + HollowObjectTypeReadState typeState = populateTypeStateWith(numRecords); + HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState); + + HollowObjectTypeShardsHolder originalShardsHolder = typeState.getShardsVolatile(); + int originalNumShards = typeState.numShards(); + + // expand shards + typeState.updateShardsVolatile(reshardingStrategy.expandWithOriginalDataElements(originalShardsHolder, shardingFactor)); + + for(int i=0; i invocation) { + try { + invocation.get(); + Assert.fail(); + } catch (IllegalStateException e) { + // expected + } + } + +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/AbstractHollowListTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/AbstractHollowListTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..58d2982928 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/AbstractHollowListTypeDataElementsSplitJoinTest.java @@ -0,0 +1,87 @@ +package com.netflix.hollow.core.read.engine.list; + +import static org.mockito.Mockito.when; + +import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitJoinTest; +import com.netflix.hollow.core.read.iterator.HollowOrdinalIterator; +import com.netflix.hollow.core.schema.HollowListSchema; +import com.netflix.hollow.core.write.HollowListTypeWriteState; +import com.netflix.hollow.core.write.HollowListWriteRecord; +import java.io.IOException; +import java.util.Arrays; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class AbstractHollowListTypeDataElementsSplitJoinTest extends AbstractHollowTypeDataElementsSplitJoinTest { + protected HollowListSchema listSchema; + + @Mock + protected HollowListTypeReadState mockListTypeState; + + @Before + public void setUp() { + this.listSchema = new HollowListSchema("TestList", "TestObject"); + + super.setUp(); + + MockitoAnnotations.initMocks(this); + HollowListTypeDataElements[] fakeDataElements = new HollowListTypeDataElements[5]; + when(mockListTypeState.currentDataElements()).thenReturn(fakeDataElements); + } + + @Override + protected void initializeTypeStates() { + super.initializeTypeStates(); + writeStateEngine.addTypeState(new HollowListTypeWriteState(listSchema)); + writeStateEngine.setTargetMaxTypeShardSize(4 * 100 * 1000 * 1024); + } + + private void populateWriteStateEngine(int[][] listContents) { + for(int[] list : listContents) { + addRecord(Arrays.stream(list).toArray()); + } + } + + private void addRecord(int... ordinals) { + HollowListWriteRecord rec = new HollowListWriteRecord(); + + for(int i=0;i widthSmall); + + HollowListTypeDataElementsJoiner joiner = new HollowListTypeDataElementsJoiner(new HollowListTypeDataElements[] + {dataElementsSmall, dataElementsBig}); + HollowListTypeDataElements dataElementsJoined = joiner.join(); + int widthJoined = dataElementsJoined.bitsPerElement; + + long val0 = dataElementsJoined.elementData.getElementValue(0, widthJoined); + + assertEquals(widthBig, widthJoined); + assertEquals(valSmall, val0); + } + +// @Test +// public void testLopsidedShards() { +// // TODO: implement when producer allows enabling type sharding for List types +// } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..3849b4206f --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElementsSplitJoinTest.java @@ -0,0 +1,72 @@ +package com.netflix.hollow.core.read.engine.list; + +import static org.junit.Assert.assertEquals; + +import com.netflix.hollow.tools.checksum.HollowChecksum; +import java.io.IOException; +import java.util.BitSet; +import org.junit.Test; + +public class HollowListTypeDataElementsSplitJoinTest extends AbstractHollowListTypeDataElementsSplitJoinTest { + + @Test + public void testSplitThenJoin() throws IOException { + + int maxNumListRecords = 100; + + // 1->2->1, 1->4->1, ... + for (int numRecords=0;numRecords 0) { + int numKeyValueOrdinals = 1 + Arrays.stream(maps) + .flatMap(Arrays::stream) + .flatMapToInt(Arrays::stream) + .max() + .orElseThrow(() -> new IllegalArgumentException("Array is empty")); + // populate write state with that many ordinals + super.populateWriteStateEngine(numKeyValueOrdinals); + } + for(int[][] map : maps) { + HollowMapWriteRecord rec = new HollowMapWriteRecord(); + for (int[] entry : map) { + assertEquals(2, entry.length); // key value pair + rec.addEntry(entry[0], entry[1]); + } + writeStateEngine.add("TestMap", rec); + } + } + + protected HollowMapTypeReadState populateTypeStateWith(int[][][] maps) throws IOException { + populateWriteStateEngine(maps); + roundTripSnapshot(); + return (HollowMapTypeReadState) readStateEngine.getTypeState("TestMap"); + } + + protected int[][][] generateListContents(int numRecords) { + int[][][] maps = new int[numRecords][][]; + Random random = new Random(); + int maxEntries = 10; + for (int i=0;i expected = convertToMap(maps[i]); + Map actual = readMap(typeState, i); + assertEquals(expected, actual); + } + } + + public static Map readMap(HollowMapTypeReadState typeState, int ordinal) { + Map result = new HashMap<>(); + HollowMapEntryOrdinalIterator iter = typeState.ordinalIterator(ordinal); + boolean hasMore = iter.next(); + while (hasMore) { + int key = iter.getKey(); + int value = iter.getValue(); + result.put(key, value); + hasMore = iter.next(); + } + return result; + } + + public static Map convertToMap(int[][] array) { + Map map = new HashMap<>(); + + for (int[] pair : array) { + if (pair.length == 2) { + map.put(pair[0], pair[1]); + } else { + throw new IllegalArgumentException("Each sub-array must have exactly 2 elements."); + } + } + + return map; + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoinerTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoinerTest.java new file mode 100644 index 0000000000..fcabd631b5 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsJoinerTest.java @@ -0,0 +1,112 @@ +package com.netflix.hollow.core.read.engine.map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class HollowMapTypeDataElementsJoinerTest extends AbstractHollowMapTypeDataElementsSplitJoinTest { + @Override + protected void initializeTypeStates() { + super.initializeTypeStates(); + writeStateEngine.setTargetMaxTypeShardSize(16); + } + + @Test + public void testJoin() throws IOException { + int[][][] maps = new int[][][] { { {1, 1}, {2, 2}, {3, 3} } }; + HollowMapTypeReadState typeReadState = populateTypeStateWith(maps); + assertEquals(1, typeReadState.numShards()); + + maps = new int[][][] { + { {1, 1}, {2, 2}, {3, 3} }, + { {1, 3}, {2, 1}, {3, 2} }, + {}, + }; + int entryLen = 20; + maps[2] = new int[entryLen][2]; + for (int i=0; i widthSmall); + + HollowMapTypeDataElementsJoiner joiner = new HollowMapTypeDataElementsJoiner(new HollowMapTypeDataElements[] + {dataElementsSmall, dataElementsBig}); + HollowMapTypeDataElements dataElementsJoined = joiner.join(); + int widthJoined = dataElementsJoined.bitsPerMapEntry; + + long keyJoined = dataElementsJoined.entryData.getElementValue(0, dataElementsJoined.bitsPerKeyElement); + long valJoined = dataElementsJoined.entryData.getElementValue(0 + dataElementsJoined.bitsPerKeyElement, dataElementsJoined.bitsPerValueElement); + + assertEquals(widthBig, widthJoined); + assertEquals(keySmall, keyJoined); + assertEquals(valSmall, valJoined); + + int ordinalFirstBig = 1; + long startBucketFirstBig = dataElementsJoined.getStartBucket(ordinalFirstBig); + long endBucketFirstBig = dataElementsJoined.getEndBucket(ordinalFirstBig); + Map bigValueMapJoined = new HashMap<>(); + for (long bucket=startBucketFirstBig;bucket expected = new HashMap() {{ put(bigValueMapOriginal[0][0][0], bigValueMapOriginal[0][0][1]); }}; + assertEquals(expected, bigValueMapJoined); + } + +// @Test +// public void testLopsidedShards() { +// // TODO: implement when producer supports enabling type sharding for Map types +// } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..89264e6272 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/map/HollowMapTypeDataElementsSplitJoinTest.java @@ -0,0 +1,71 @@ +package com.netflix.hollow.core.read.engine.map; + +import static org.junit.Assert.assertEquals; + +import com.netflix.hollow.tools.checksum.HollowChecksum; +import java.io.IOException; +import java.util.BitSet; +import org.junit.Test; + +public class HollowMapTypeDataElementsSplitJoinTest extends AbstractHollowMapTypeDataElementsSplitJoinTest { + + @Test + public void testSplitThenJoin() throws IOException { + int maxNumMapRecords = 100; + + // 1->2->1, 1->4->1, ... + for (int numRecords=0;numRecords widthSmall); - HollowObjectTypeDataElements dataElementsJoined = joiner.join(new HollowObjectTypeDataElements[] + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(new HollowObjectTypeDataElements[] {dataElementsSmall, dataElementsBig}); + HollowObjectTypeDataElements dataElementsJoined = joiner.join(); int intFieldPosJoined = dataElementsJoined.schema.getPosition("intField"); int widthJoined = dataElementsJoined.bitsPerField[intFieldPosJoined]; diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java index 5ac4b8cf82..8ad9d29819 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitJoinTest.java @@ -2,38 +2,26 @@ import static org.junit.Assert.assertEquals; -import com.netflix.hollow.api.consumer.HollowConsumer; -import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever; -import com.netflix.hollow.core.read.engine.HollowReadStateEngine; -import com.netflix.hollow.core.schema.HollowSchema; -import com.netflix.hollow.core.write.HollowObjectTypeWriteState; +import com.netflix.hollow.core.write.HollowObjectWriteRecord; import com.netflix.hollow.tools.checksum.HollowChecksum; import java.io.IOException; -import java.nio.file.Paths; import java.util.BitSet; import org.junit.Test; public class HollowObjectTypeDataElementsSplitJoinTest extends AbstractHollowObjectTypeDataElementsSplitJoinTest { - @Override - protected void initializeTypeStates() { - writeStateEngine.setTargetMaxTypeShardSize(4 * 1000 * 1024); - writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema)); - } - @Test public void testSplitThenJoin() throws IOException { - HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); - HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); - for (int numRecords=0;numRecords<1*1000;numRecords++) { HollowObjectTypeReadState typeReadState = populateTypeStateWith(numRecords); assertEquals(1, typeReadState.numShards()); assertDataUnchanged(typeReadState, numRecords); - for (int numSplits : new int[]{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}) { - HollowObjectTypeDataElements[] splitElements = splitter.split(typeReadState.currentDataElements()[0], numSplits); - HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + for (int numSplits : new int[]{2}) { // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], numSplits); + HollowObjectTypeDataElements[] splitElements = splitter.split(); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(splitElements); + HollowObjectTypeDataElements joinedElements = joiner.join(); HollowObjectTypeReadState resultTypeReadState = new HollowObjectTypeReadState(typeReadState.getSchema(), joinedElements); assertDataUnchanged(resultTypeReadState, numRecords); @@ -59,17 +47,16 @@ private void assertChecksumUnchanged(HollowObjectTypeReadState newTypeState, Hol @Test public void testSplitThenJoinWithFilter() throws IOException { - HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); - HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); - int numSplits = 2; for (int numRecords=0;numRecords<1*1000;numRecords++) { HollowObjectTypeReadState typeReadState = populateTypeStateWithFilter(numRecords); assertEquals(1, typeReadState.numShards()); assertDataUnchanged(typeReadState, numRecords); - HollowObjectTypeDataElements[] splitElements = splitter.split(typeReadState.currentDataElements()[0], numSplits); - HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], numSplits); + HollowObjectTypeDataElements[] splitElements = splitter.split(); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(splitElements); + HollowObjectTypeDataElements joinedElements = joiner.join(); HollowObjectTypeReadState resultTypeReadState = new HollowObjectTypeReadState(typeReadState.getSchema(), joinedElements); assertDataUnchanged(resultTypeReadState, numRecords); @@ -79,51 +66,51 @@ public void testSplitThenJoinWithFilter() throws IOException { @Test public void testSplitThenJoinWithEmptyJoin() throws IOException { - HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); HollowObjectTypeReadState typeReadState = populateTypeStateWith(1); assertEquals(1, typeReadState.numShards()); - HollowObjectTypeDataElements[] splitBy4 = splitter.split(typeReadState.currentDataElements()[0], 4); + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 4); + HollowObjectTypeDataElements[] splitBy4 = splitter.split(); assertEquals(-1, splitBy4[1].maxOrdinal); assertEquals(-1, splitBy4[3].maxOrdinal); - HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); - HollowObjectTypeDataElements joined = joiner.join(new HollowObjectTypeDataElements[]{splitBy4[1], splitBy4[3]}); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(new HollowObjectTypeDataElements[]{splitBy4[1], splitBy4[3]}); + HollowObjectTypeDataElements joined = joiner.join(); assertEquals(-1, joined.maxOrdinal); } - // manually invoked - // @Test - public void testSplittingAndJoiningWithSnapshotBlob() throws Exception { - - String blobPath = null; // dir where snapshot blob exists for e.g. "/tmp/"; - long v = 0l; // snapshot version for e.g. 20230915162636001l; - String objectTypeWithOneShard = null; // type name corresponding to an Object type with single shard for e.g. "Movie"; - int numSplits = 2; - - if (blobPath==null || v==0l || objectTypeWithOneShard==null) { - throw new IllegalArgumentException("These arguments need to be specified"); + @Test + public void testSplitThenJoinWithNullAndSpecialValues() throws IOException { + initWriteStateEngine(); + HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema); + for(int i=0;i<10;i++) { + rec.reset(); + rec.setLong("longField", i); + // other fields will be null + writeStateEngine.add("TestObject", rec); + } + for(int i=10;i<20;i++) { + rec.reset(); + rec.setLong("longField", Long.MIN_VALUE); + rec.setString("stringField", ""); + rec.setInt("intField", i); + rec.setDouble("doubleField", Double.NaN); + writeStateEngine.add("TestObject", rec); } - HollowFilesystemBlobRetriever hollowBlobRetriever = new HollowFilesystemBlobRetriever(Paths.get(blobPath)); - HollowConsumer c = HollowConsumer.withBlobRetriever(hollowBlobRetriever).build(); - c.triggerRefreshTo(v); - HollowReadStateEngine readStateEngine = c.getStateEngine(); - - HollowObjectTypeReadState typeState = (HollowObjectTypeReadState) readStateEngine.getTypeState(objectTypeWithOneShard); - HollowSchema origSchema = typeState.getSchema(); - - assertEquals(1, typeState.numShards()); - HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); - HollowObjectTypeDataElements[] splitElements = splitter.split(typeState.currentDataElements()[0], numSplits); + roundTripSnapshot(); + HollowObjectTypeReadState typeReadState = (HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject"); + assertEquals(1, typeReadState.numShards()); - HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(); - HollowObjectTypeDataElements joinedElements = joiner.join(splitElements); + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 4); + HollowObjectTypeDataElements[] splitBy4 = splitter.split(); - HollowObjectTypeReadState resultTypeState = new HollowObjectTypeReadState(typeState.getSchema(), joinedElements); + HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner(splitBy4); + HollowObjectTypeDataElements joined = joiner.join(); - assertChecksumUnchanged(resultTypeState, typeState, typeState.getPopulatedOrdinals()); + HollowObjectTypeReadState joinedTypeReadState = new HollowObjectTypeReadState(typeReadState.getSchema(), joined); + assertChecksumUnchanged(typeReadState, joinedTypeReadState, typeReadState.getPopulatedOrdinals()); } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java index 7f76ab5b27..b5e4427021 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeDataElementsSplitterTest.java @@ -10,29 +10,31 @@ public class HollowObjectTypeDataElementsSplitterTest extends AbstractHollowObje @Test public void testSplit() throws IOException { - HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(); - HollowObjectTypeReadState typeReadState = populateTypeStateWith(5); assertEquals(1, typeReadState.numShards()); assertDataUnchanged(typeReadState, 5); - HollowObjectTypeDataElements[] result1 = splitter.split(typeReadState.currentDataElements()[0], 1); + HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 1); + HollowObjectTypeDataElements[] result1 = splitter.split(); typeReadState = new HollowObjectTypeReadState(typeReadState.getSchema(), result1[0]); assertDataUnchanged(typeReadState, 5); - HollowObjectTypeDataElements[] result8 = splitter.split(typeReadState.currentDataElements()[0], 8); + splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 8); + HollowObjectTypeDataElements[] result8 = splitter.split(); assertEquals(0, result8[0].maxOrdinal); // for index that landed one record after split assertEquals(-1, result8[7].maxOrdinal); // for index that landed no records after split try { - splitter.split(typeReadState.currentDataElements()[0], 3); // numSplits=3 + splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 3); + splitter.split(); // numSplits=3 Assert.fail(); } catch (IllegalStateException e) { // expected, numSplits should be a power of 2 } try { - splitter.split(typeReadState.currentDataElements()[0], 0); // numSplits=0 + splitter = new HollowObjectTypeDataElementsSplitter(typeReadState.currentDataElements()[0], 0); + splitter.split(); // numSplits=0 Assert.fail(); } catch (IllegalStateException e) { // expected, numSplits should be a power of 2 diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateTest.java index ac1f9403ea..4cfc7602cc 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/object/HollowObjectTypeReadStateTest.java @@ -1,84 +1,27 @@ package com.netflix.hollow.core.read.engine.object; -import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState.shardingFactor; import static junit.framework.TestCase.assertEquals; -import com.netflix.hollow.api.objects.generic.GenericHollowObject; -import com.netflix.hollow.core.memory.MemoryMode; -import com.netflix.hollow.core.write.HollowObjectTypeWriteState; +import com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy; import java.util.Random; -import java.util.function.Supplier; -import org.junit.Assert; import org.junit.Test; public class HollowObjectTypeReadStateTest extends AbstractHollowObjectTypeDataElementsSplitJoinTest { - @Override - protected void initializeTypeStates() { - writeStateEngine.setTargetMaxTypeShardSize(4 * 100 * 1024); - writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema)); - } - - @Test - public void testMappingAnOrdinalToAShardAndBack() { - int maxOrdinal = 1000; - int numShards = 4; - int minRecordLocationsPerShard = (maxOrdinal + 1) / numShards; - int[][] shardOrdinals = new int[numShards][]; - for(int i=0;i> shardOrdinalShift; - shardOrdinals[shardIndex][shardOrdinal] = ordinal; - } - - for (int shardIndex=0; shardIndex shardingFactor(0, 1)); - assertIllegalStateException(() -> shardingFactor(2, 0)); - assertIllegalStateException(() -> shardingFactor(1, 1)); - assertIllegalStateException(() -> shardingFactor(1, -1)); - assertIllegalStateException(() -> shardingFactor(2, 3)); - } - @Test public void testResharding() throws Exception { - for (int shardingFactor : new int[]{2, 4, 8, 16}) // 32, 64, 128, 256, 512, 1024... - { - for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(1000)) + for (int shardingFactor : new int[]{2}) { // , 4, 8, 16, 32, 64, 128, 256, 512, 1024 + for(int numRecords=1;numRecords<=10000;numRecords+=new Random().nextInt(1000)) { HollowObjectTypeReadState objectTypeReadState = populateTypeStateWith(numRecords); assertDataUnchanged(objectTypeReadState, numRecords); + HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(objectTypeReadState); // Splitting shards { int prevShardCount = objectTypeReadState.numShards(); int newShardCount = shardingFactor * prevShardCount; - objectTypeReadState.reshard(newShardCount); + reshardingStrategy.reshard(objectTypeReadState, objectTypeReadState.numShards(), newShardCount); assertEquals(newShardCount, objectTypeReadState.numShards()); assertEquals(newShardCount, shardingFactor * prevShardCount); @@ -89,7 +32,7 @@ public void testResharding() throws Exception { { int prevShardCount = objectTypeReadState.numShards(); int newShardCount = prevShardCount / shardingFactor; - objectTypeReadState.reshard(newShardCount); + reshardingStrategy.reshard(objectTypeReadState, objectTypeReadState.numShards(), newShardCount); assertEquals(newShardCount, objectTypeReadState.numShards()); assertEquals(shardingFactor * newShardCount, prevShardCount); @@ -101,19 +44,17 @@ public void testResharding() throws Exception { @Test public void testReshardingWithFilter() throws Exception { - - for (int shardingFactor : new int[]{2, 64}) - { - for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(10000)) - { + for (int shardingFactor : new int[]{2}) { // , 4, 8, 16, 32, 64, 128, 256, 512, 1024 + for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(10000)) { HollowObjectTypeReadState objectTypeReadState = populateTypeStateWithFilter(numRecords); assertDataUnchanged(objectTypeReadState, numRecords); + HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(objectTypeReadState); // Splitting shards { int prevShardCount = objectTypeReadState.numShards(); int newShardCount = shardingFactor * prevShardCount; - objectTypeReadState.reshard(newShardCount); + reshardingStrategy.reshard(objectTypeReadState, objectTypeReadState.numShards(), newShardCount); assertEquals(newShardCount, objectTypeReadState.numShards()); assertEquals(newShardCount, shardingFactor * prevShardCount); @@ -124,7 +65,7 @@ public void testReshardingWithFilter() throws Exception { { int prevShardCount = objectTypeReadState.numShards(); int newShardCount = prevShardCount / shardingFactor; - objectTypeReadState.reshard(newShardCount); + reshardingStrategy.reshard(objectTypeReadState, objectTypeReadState.numShards(), newShardCount); assertEquals(newShardCount, objectTypeReadState.numShards()); assertEquals(shardingFactor * newShardCount, prevShardCount); @@ -133,89 +74,4 @@ public void testReshardingWithFilter() throws Exception { } } } - - @Test - public void testReshardingIntermediateStages_expandWithOriginalDataElements() throws Exception { - for (int shardingFactor : new int[]{2, 4}) { - for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) - { - HollowObjectTypeReadState expectedTypeState = populateTypeStateWith(numRecords); - - HollowObjectTypeReadState.ShardsHolder original = expectedTypeState.shardsVolatile; - HollowObjectTypeReadState.ShardsHolder expanded = expectedTypeState.expandWithOriginalDataElements(original, shardingFactor); - - HollowObjectTypeReadState actualTypeState = new HollowObjectTypeReadState(readStateEngine, MemoryMode.ON_HEAP, schema, schema); - actualTypeState.shardsVolatile = expanded; - - assertEquals(shardingFactor * expectedTypeState.numShards(), actualTypeState.numShards()); - assertDataUnchanged(actualTypeState, numRecords); - } - } - } - - @Test - public void testReshardingIntermediateStages_splitDataElementsForOneShard() throws Exception { - for (int shardingFactor : new int[]{2, 4}) { - for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000)) - { - HollowObjectTypeReadState typeState = populateTypeStateWith(numRecords); - - HollowObjectTypeReadState.ShardsHolder originalShardsHolder = typeState.shardsVolatile; - int originalNumShards = typeState.numShards(); - - // expand shards - typeState.shardsVolatile = typeState.expandWithOriginalDataElements(originalShardsHolder, shardingFactor); - - for(int i=0; i invocation) { - try { - invocation.get(); - Assert.fail(); - } catch (IllegalStateException e) { - // expected - } - } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/AbstractHollowSetTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/AbstractHollowSetTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..aed255cd2d --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/AbstractHollowSetTypeDataElementsSplitJoinTest.java @@ -0,0 +1,89 @@ +package com.netflix.hollow.core.read.engine.set; + +import static org.mockito.Mockito.when; + +import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsSplitJoinTest; +import com.netflix.hollow.core.read.iterator.HollowOrdinalIterator; +import com.netflix.hollow.core.schema.HollowSetSchema; +import com.netflix.hollow.core.write.HollowSetTypeWriteState; +import com.netflix.hollow.core.write.HollowSetWriteRecord; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class AbstractHollowSetTypeDataElementsSplitJoinTest extends AbstractHollowTypeDataElementsSplitJoinTest { + protected HollowSetSchema setSchema; + + @Mock + protected HollowSetTypeReadState mockSetTypeState; + + @Before + public void setUp() { + this.setSchema = new HollowSetSchema("TestSet", "TestObject"); + + super.setUp(); + + MockitoAnnotations.initMocks(this); + HollowSetTypeDataElements[] fakeDataElements = new HollowSetTypeDataElements[5]; + when(mockSetTypeState.currentDataElements()).thenReturn(fakeDataElements); + } + + @Override + protected void initializeTypeStates() { + super.initializeTypeStates(); + writeStateEngine.addTypeState(new HollowSetTypeWriteState(setSchema)); + writeStateEngine.setTargetMaxTypeShardSize(4 * 100 * 1000 * 1024); + } + + int[][] generateSetContents(int numRecords) { + int[][] setContents = new int[numRecords][]; + for (int i=0;i expected = Arrays.stream(listContents[i]).boxed().collect(Collectors.toSet()); + Set actual = new HashSet<>(); + int o = iter.next(); + while (o != HollowOrdinalIterator.NO_MORE_ORDINALS) { + actual.add(o); + o = iter.next(); + } + + Assert.assertEquals(expected, actual); + Assert.assertEquals(HollowOrdinalIterator.NO_MORE_ORDINALS, iter.next()); + } + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsJoinerTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsJoinerTest.java new file mode 100644 index 0000000000..79665ca836 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsJoinerTest.java @@ -0,0 +1,105 @@ +package com.netflix.hollow.core.read.engine.set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Assert; +import org.junit.Test; + +public class HollowSetTypeDataElementsJoinerTest extends AbstractHollowSetTypeDataElementsSplitJoinTest { + @Override + protected void initializeTypeStates() { + super.initializeTypeStates(); + writeStateEngine.setTargetMaxTypeShardSize(16); + } + + @Test + public void testJoin() throws IOException { + int[][] setContents = new int[][] { + {0, 1, 2} + }; + HollowSetTypeReadState typeReadState = populateTypeStateWith(setContents); + assertEquals(1, typeReadState.numShards()); + + setContents = new int[][] { + {0, 1, 2}, + {3}, + {} + }; + int setSize = 50; + setContents[2] = new int[setSize]; + for (int i=4; i setOfBigVals = IntStream.of(bigVals).boxed().collect(Collectors.toSet()); + assertEquals(1, typeReadStateBig.numShards()); + HollowSetTypeDataElements dataElementsBig = typeReadStateBig.currentDataElements()[0]; + int widthBig = dataElementsBig.bitsPerElement; + long bucketStart = 0; + long valBig = dataElementsBig.elementData.getElementValue(bucketStart, widthBig); + while (valBig == dataElementsBig.emptyBucketValue) { + bucketStart += widthBig; + valBig = dataElementsBig.elementData.getElementValue(bucketStart, widthBig); + } + + HollowSetTypeDataElementsJoiner joiner = new HollowSetTypeDataElementsJoiner(new HollowSetTypeDataElements[] + {dataElementsSmall, dataElementsBig}); + HollowSetTypeDataElements dataElementsJoined = joiner.join(); + int widthJoined = dataElementsJoined.bitsPerElement; + + long valSmallJoined = dataElementsJoined.elementData.getElementValue(0, widthJoined); + bucketStart = dataElementsJoined.getStartBucket(1); + long bucketEnd = dataElementsJoined.getEndBucket(1); + Set bigValsJoined = new HashSet<>(); + for (long bucket=bucketStart;bucket widthSmall); + assertEquals(widthBig, widthJoined); + assertEquals(valSmall, valSmallJoined); + assertEquals(setOfBigVals, bigValsJoined); + } + +// @Test +// public void testLopsidedShards() { +// // TODO: implement when producer supports enabling type sharding for Set types +// } +} diff --git a/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitJoinTest.java b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitJoinTest.java new file mode 100644 index 0000000000..833f1237c8 --- /dev/null +++ b/hollow/src/test/java/com/netflix/hollow/core/read/engine/set/HollowSetTypeDataElementsSplitJoinTest.java @@ -0,0 +1,71 @@ +package com.netflix.hollow.core.read.engine.set; + +import static org.junit.Assert.assertEquals; + +import com.netflix.hollow.tools.checksum.HollowChecksum; +import java.io.IOException; +import java.util.BitSet; +import org.junit.Test; + +public class HollowSetTypeDataElementsSplitJoinTest extends AbstractHollowSetTypeDataElementsSplitJoinTest { + + @Test + public void testSplitThenJoin() throws IOException { + int maxNumMapRecords = 100; + + // 1->2->1, 1->4->1, ... + for (int numRecords=0;numRecords { - ws.add("A"); - }); - - HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).withTargetMaxTypeShardSize(32).build(); - p2.initializeDataModel(String.class, Long.class); - p2.restore(v1, blobStore); - long v2 = p2.runCycle(ws -> { - ws.add("A"); - ws.add("B"); - for (int i=0; i<50; i++) { - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v2); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - consumer.triggerRefreshTo(v1); // reverse delta transition for new type with customNumShards - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - } - - @Test - public void testReverseDeltaNumShardsWhenTypeDropsToZeroRecords() { - InMemoryBlobStore blobStore = new InMemoryBlobStore(); - HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); - - HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); - p1.initializeDataModel(String.class, Long.class); - long v1 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v1); - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - long v2 = p1.runCycle(ws -> { - ws.add("A"); - }); - consumer.triggerRefreshTo(v2); - assertEquals(v2, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has a ghost record - - long v3 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - ws.add("B"); - }); - consumer.triggerRefreshTo(v3); - assertEquals(v3, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type dropped all records - - long v4 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - consumer.triggerRefreshTo(v4); - assertEquals(v4, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has 1 record again - } - - @Test - public void testNoReshardingIfNumShardsPinnedByAnnotation() { - HollowWriteStateEngine wse = new HollowWriteStateEngine(); - new HollowObjectMapper(wse).initializeTypeState(TypeWithPinnedNumShards.class); - HollowObjectTypeWriteState typeWriteState = (HollowObjectTypeWriteState) wse.getTypeState("TypeWithPinnedNumShards"); - assertFalse(typeWriteState.allowTypeResharding()); - } - - @Test - public void testRestoreNumShardsButDoNotPin() { - InMemoryBlobStore blobStore = new InMemoryBlobStore(); - HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); - - HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTargetMaxTypeShardSize(32).build(); - p1.initializeDataModel(Long.class); - long v1 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v1); - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); - p2.initializeDataModel(Long.class); - p2.restore(v1, blobStore); - assertEquals(2, p2.getWriteEngine().getTypeState("Long").numShards); - assertFalse(p2.getWriteEngine().getTypeState("Long").isNumShardsPinned()); - - long v2 = p2.runCycle(ws -> { - for (int i=0; i<100; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer2 = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer2.triggerRefreshTo(v2); - int newNumShards = consumer2.getStateEngine().getTypeState("Long").numShards(); - assertEquals(v2, consumer2.getCurrentVersionId()); - assertEquals(4, newNumShards); - } - - @HollowShardLargeType(numShards=4) - private static class TypeWithPinnedNumShards { - private int value; - } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java index 5590beace4..6cbaf73f07 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java @@ -1,12 +1,18 @@ package com.netflix.hollow.core.write; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.producer.HollowProducer; import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager; +import com.netflix.hollow.api.producer.model.CustomReferenceType; +import com.netflix.hollow.api.producer.model.HasAllTypeStates; +import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper; +import com.netflix.hollow.core.write.objectmapper.HollowShardLargeType; import com.netflix.hollow.test.InMemoryBlobStore; +import com.netflix.hollow.tools.checksum.HollowChecksum; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -27,10 +33,10 @@ public void testReverseDeltaNumShardsWhenNewTypes() { // add a new object type and all collection types to data model HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).build(); - p2.initializeDataModel(HasAllTypes.class); + p2.initializeDataModel(HasAllTypeStates.class); p2.restore(v1, blobStore); long v2 = p2.runCycle(state -> { - HasAllTypes o1 = new HasAllTypes( + HasAllTypeStates o1 = new HasAllTypeStates( new CustomReferenceType(5l), new HashSet<>(Arrays.asList("e1")), Arrays.asList(1, 2, 3), @@ -70,24 +76,225 @@ public int maxDeltasBeforeDoubleSnapshot() { assertEquals(numShardsMap, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); } - private class HasAllTypes { + @Test + public void testNumShardsWhenTypeDropsToZeroRecords() { + InMemoryBlobStore blobStore = new InMemoryBlobStore(); + HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); + + HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); + p1.initializeDataModel(HasAllTypeStates.class); + long v1 = p1.runCycle(ws -> { + ws.add("A"); + for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer.triggerRefreshTo(v1); + assertEquals(v1, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + HollowChecksum origChecksum = new HollowChecksum().forStateEngineWithCommonSchemas(consumer.getStateEngine(), consumer.getStateEngine()); + + long v2 = p1.runCycle(ws -> { + ws.add("A"); + }); + consumer.triggerRefreshTo(v2); + assertEquals(v2, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // all types contain ghost records + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + long v3 = p1.runCycle(ws -> { + ws.add("A"); + ws.add("B"); + }); + consumer.triggerRefreshTo(v3); + assertEquals(v3, consumer.getCurrentVersionId()); + // All types dropped all records, no serialization in delta for these types (irrespective of dynamic type sharding) + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + long v4 = p1.runCycle(ws -> { + ws.add("A"); + for (int i=0; i<50; i++) { // back up to the original shard counts + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + consumer.triggerRefreshTo(v4); + assertEquals(v4, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + consumer.triggerRefreshTo(v1); + HollowChecksum finalChecksum = new HollowChecksum().forStateEngineWithCommonSchemas(consumer.getStateEngine(), consumer.getStateEngine()); + + assertEquals(finalChecksum, origChecksum); + } + + @Test + public void testNoReshardingIfNumShardsPinnedByAnnotation() { + HollowWriteStateEngine wse = new HollowWriteStateEngine(); + new HollowObjectMapper(wse).initializeTypeState(TypeWithPinnedNumShards.class); + HollowObjectTypeWriteState typeWriteState = (HollowObjectTypeWriteState) wse.getTypeState("TypeWithPinnedNumShards"); + assertFalse(typeWriteState.allowTypeResharding()); + + wse = new HollowWriteStateEngine(); + new HollowObjectMapper(wse).initializeTypeState(HasAllTypesWithPinnedNumShards.class); + for (HollowTypeWriteState writeState : wse.getOrderedTypeStates()) { + assertFalse(writeState.allowTypeResharding()); + } + } + + @Test + public void testRestoreNumShardsButDoNotPin() { + InMemoryBlobStore blobStore = new InMemoryBlobStore(); + HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); + + HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTargetMaxTypeShardSize(32).build(); + p1.initializeDataModel(HasAllTypeStates.class); + long v1 = p1.runCycle(ws -> { + for (int i=0; i<50; i++) { + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer.triggerRefreshTo(v1); + assertEquals(v1, consumer.getCurrentVersionId()); + // results in following numShards per type at shard size of 32 + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); + p2.initializeDataModel(HasAllTypeStates.class); + p2.restore(v1, blobStore); + assertEquals(2, p2.getWriteEngine().getTypeState("Long").numShards); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + for (HollowTypeWriteState writeState : p2.getWriteEngine().getOrderedTypeStates()) { + assertFalse(writeState.isNumShardsPinned()); + } + + long v2 = p2.runCycle(ws -> { + for (int i=0; i<1000; i++) { // results more shards at same shard size + final long val = new Long(i); + ws.add(new HasAllTypeStates( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer2 = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer2.triggerRefreshTo(v2); + int newNumShards = consumer2.getStateEngine().getTypeState("Long").numShards(); + assertEquals(v2, consumer2.getCurrentVersionId()); + assertTrue(2 < consumer2.getStateEngine().getTypeState("Long").numShards()); + assertTrue(2 < consumer2.getStateEngine().getTypeState("CustomReferenceType").numShards()); + + // producer doesn't support resharding for these types yet + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + + } + + @HollowShardLargeType(numShards=4) + private static class TypeWithPinnedNumShards { + private int value; + } + + private class HasAllTypesWithPinnedNumShards { + @HollowShardLargeType(numShards = 32) CustomReferenceType customReferenceType; + @HollowShardLargeType(numShards = 32) Set setOfStrings; + @HollowShardLargeType(numShards = 32) List listOfInt; + @HollowShardLargeType(numShards = 32) Map mapOfStringToLong; - private HasAllTypes(CustomReferenceType customReferenceType, Set setOfStrings, List listOfInt, Map mapOfStringToLong) { + private HasAllTypesWithPinnedNumShards(CustomReferenceType customReferenceType, Set setOfStrings, List listOfInt, Map mapOfStringToLong) { this.customReferenceType = customReferenceType; this.setOfStrings = setOfStrings; this.listOfInt = listOfInt; this.mapOfStringToLong = mapOfStringToLong; } } - - private class CustomReferenceType { - long id; - private CustomReferenceType(long id) { - this.id = id; - } - } }