Skip to content

Commit

Permalink
Resharding: Consumer-side support for list, set, map types
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 11, 2024
1 parent 4d7eba8 commit 60e0fdd
Show file tree
Hide file tree
Showing 75 changed files with 3,905 additions and 1,526 deletions.
2 changes: 1 addition & 1 deletion docs/advanced-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ The number of bits used to represent a field which is one of the types (`INT`, `

32 bits are used to represent a `FLOAT`, and 64 bits are used to represent a `DOUBLE`.

`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of number of bits required to represent the maximum offset, plus one.
`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of bits required to represent the maximum offset, plus one.

Each field type may be assigned a null value. For `INT`, `LONG`, and `REFERENCE` fields, null is encoded as a value with all ones. For `FLOAT` and `DOUBLE` fields, null is encoded as special bit sequences. For `STRING` and `BYTES` fields, null is encoded by setting a designated null bit at the beginning of each field, followed by the end offset of the last populated value for that field.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class HollowDiffUIServerTest {
public void test() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

HollowDiffUIServer server = new HollowDiffUIServer();
HollowDiffUIServer server = new HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

Expand All @@ -22,7 +22,7 @@ public void test() throws Exception {
public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer();
com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
if(!filter.includes(typeName)) {
HollowListTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards);
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.includes(typeName)) {
HollowSetTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards);
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.includes(typeName)) {
HollowMapTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards);
}
}

return typeName;
}

private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException {
stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
Expand All @@ -376,6 +371,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
int numShards = readNumShards(in);
HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
if (shouldReshard(typeState.numShards(), numShards)) {
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);
reshardingStrategy.reshard(typeState, typeState.numShards(), numShards);
}
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
Expand All @@ -384,6 +383,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
return schema.getName();
}

private boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards != 0 && deltaNumShards != 0 && currNumShards != deltaNumShards;
}

private int readNumShards(HollowBlobInput in) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;

public abstract class HollowTypeDataElements {

public int maxOrdinal;

public GapEncodedVariableLengthIntegerReader encodedAdditions;
public GapEncodedVariableLengthIntegerReader encodedRemovals;

public final ArraySegmentRecycler memoryRecycler;
public final MemoryMode memoryMode;

public HollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}

public abstract void destroy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

public abstract class HollowTypeDataElementsJoiner<T extends HollowTypeDataElements> {
public final int fromMask;
public final int fromOrdinalShift;
public final T[] from;

public T to;

public HollowTypeDataElementsJoiner(T[] from) {
this.from = from;
this.fromMask = from.length - 1;
this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);

if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
}

for (int i=0;i<from.length;i++) {
if (from[i].maxOrdinal == -1) {
continue;
}
if (from[i].maxOrdinal > (1<<29)
|| from[i].maxOrdinal != 0 && (from.length > (1<<29)/from[i].maxOrdinal)
|| from[i].maxOrdinal * from.length + i > (1<<29)) {
throw new IllegalArgumentException("Too large to join, maxOrdinal would exceed 2<<29");
}
}

for (HollowTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}
}

public T join() {

initToElements();
to.maxOrdinal = -1;

populateStats();

copyRecords();

GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
for (int i=0;i<from.length;i++) {
fromRemovals[i] = from[i].encodedRemovals;
}
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

/**
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
public abstract class HollowTypeDataElementsSplitter<T extends HollowTypeDataElements> {
public final int numSplits;
public final int toMask;
public final int toOrdinalShift;
public final T from;

public T[] to;

public HollowTypeDataElementsSplitter(T from, int numSplits) {
this.from = from;
this.numSplits = numSplits;
this.toMask = numSplits - 1;
this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

public T[] split() {

initToElements();
for(int i=0;i<to.length;i++) {
to[i].maxOrdinal = -1;
}

populateStats();

copyRecords();

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Stream;

/**
* A HollowTypeReadState contains and is the root handle to all of the records of a specific type in
* A HollowTypeReadState contains and is the root handle to all the records of a specific type in
* a {@link HollowReadStateEngine}.
*/
public abstract class HollowTypeReadState implements HollowTypeDataAccess {
Expand Down Expand Up @@ -121,16 +121,10 @@ public BitSet getPreviousOrdinals() {
*/
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
}
Expand Down Expand Up @@ -206,4 +200,18 @@ public long getApproximateShardSizeInBytes() {
*/
public abstract int numShards();

public abstract ShardsHolder getShardsVolatile();

public abstract void updateShardsVolatile(HollowTypeReadStateShard[] shards);

public abstract HollowTypeDataElements[] createTypeDataElements(int len);

public abstract HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift);

public void destroyOriginalDataElements(HollowTypeDataElements dataElements) {
dataElements.destroy();
if (dataElements.encodedRemovals != null) {
dataElements.encodedRemovals.destroy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.hollow.core.read.engine;

public interface HollowTypeReadStateShard {

HollowTypeDataElements getDataElements();

int getShardOrdinalShift();
}
Loading

0 comments on commit 60e0fdd

Please sign in to comment.