Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change consumer to read metadata in blob header for version number #650

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
* memory.
*/
public class TestBlobRetriever implements BlobRetriever {
private final Map<Long, Blob> snapshots = new HashMap<>();
private final Map<Long, Blob> deltas = new HashMap<>();
private final Map<Long, Blob> reverseDeltas = new HashMap<>();
private final Map<Long, HeaderBlob> headers = new HashMap<>();
protected final Map<Long, Blob> snapshots = new HashMap<>();
protected final Map<Long, Blob> deltas = new HashMap<>();
protected final Map<Long, Blob> reverseDeltas = new HashMap<>();
protected final Map<Long, HeaderBlob> headers = new HashMap<>();

@Override
public HeaderBlob retrieveHeaderBlob(long desiredVersion) {
Expand Down Expand Up @@ -60,7 +60,7 @@ public Blob retrieveReverseDeltaBlob(long currentVersion) {
}

// so blob can be reused
private void resetStream(Blob b) {
protected void resetStream(Blob b) {
try {
if (b!= null && b.getInputStream() != null) {
b.getInputStream().reset();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netflix.hollow.test.consumer;

import com.netflix.hollow.api.consumer.HollowConsumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestBlobRetrieverWithNearestSnapshotMatch extends TestBlobRetriever {

@Override
public HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion) {
long version = findNearestSnapshotVersion(desiredVersion);
HollowConsumer.Blob b = snapshots.get(version);
resetStream(b);
return b;
}


private long findNearestSnapshotVersion(long desiredVersion) {
List<Long> snapshotVersions = new ArrayList<>();
snapshotVersions.addAll(snapshots.keySet());
Collections.sort(snapshotVersions);
int start = 0;
int end = snapshotVersions.size() - 1;
int mid = 0;
while (start + 1< end) {
mid = (start + end) / 2;
if (mid < desiredVersion) {
start = mid;
} else if (mid > desiredVersion){
end = mid;
} else {
return snapshotVersions.get(mid);
}
}
if (end <= desiredVersion) {
return snapshotVersions.get(end);
}
return snapshotVersions.get(start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.api.client;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH;

import com.netflix.hollow.api.consumer.HollowConsumer;
Expand Down Expand Up @@ -119,7 +120,7 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable {
public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable {
long requestedVersion = requestedVersionInfo.getVersion();
if (requestedVersion == getCurrentVersionId()) {
if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) {
if (requestedVersion == VERSION_NONE && hollowDataHolderVolatile == null) {
LOG.warning("No versions to update to, initializing to empty state");
// attempting to refresh, but no available versions - initialize to empty state
hollowDataHolderVolatile = newHollowDataHolder();
Expand Down Expand Up @@ -148,16 +149,17 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());
boolean isInitialUpdate = getCurrentVersionId() == VERSION_NONE;

for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
((HollowConsumer.TransitionAwareRefreshListener)listener).transitionsPlanned(beforeVersion, requestedVersion, updatePlan.isSnapshotPlan(), updatePlan.getTransitionSequence());

if (updatePlan.destinationVersion() == HollowConstants.VERSION_NONE
if (updatePlan.destinationVersion() == VERSION_NONE
&& requestedVersion != HollowConstants.VERSION_LATEST) {
String msg = String.format("Could not create an update plan for version %s, because "
+ "that version or any qualifying previous versions could not be retrieved.", requestedVersion);
if (beforeVersion != HollowConstants.VERSION_NONE) {
if (beforeVersion != VERSION_NONE) {
msg += String.format(" Consumer will remain at current version %s until next update attempt.", beforeVersion);
}
throw new IllegalArgumentException(msg);
Expand Down Expand Up @@ -185,7 +187,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
* Also note that hollowDataHolderVolatile only changes for snapshot plans,
* and it is only for snapshot plans that HollowDataHolder#initializeAPI is
* called. */
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh);
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh, isInitialUpdate);
} catch (Throwable t) {
// If the update plan failed then revert back to the old holder
hollowDataHolderVolatile = oldDh;
Expand All @@ -194,7 +196,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
forceDoubleSnapshot = false;
}
} else { // 0 snapshot and 1+ delta transitions
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {});
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {}, isInitialUpdate);
}

for(HollowConsumer.RefreshListener refreshListener : localListeners)
Expand Down Expand Up @@ -245,7 +247,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re
public long getCurrentVersionId() {
HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile;
return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion()
: HollowConstants.VERSION_NONE;
: VERSION_NONE;
}

public void forceDoubleSnapshotNextUpdate() {
Expand All @@ -256,7 +258,7 @@ public void forceDoubleSnapshotNextUpdate() {
* Whether or not a snapshot plan should be created. Visible for testing.
*/
boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) {
if (getCurrentVersionId() == HollowConstants.VERSION_NONE
if (getCurrentVersionId() == VERSION_NONE
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.HollowConsumer.TransitionAwareRefreshListener;
import com.netflix.hollow.api.custom.HollowAPI;
import com.netflix.hollow.core.HollowConstants;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.OptionalBlobPartInput;
Expand All @@ -35,6 +34,8 @@
import java.lang.ref.WeakReference;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;

/**
* A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption.
*/
Expand All @@ -56,7 +57,7 @@ class HollowDataHolder {

private WeakReference<HollowHistoricalStateDataAccess> priorHistoricalDataAccess;

private long currentVersion = HollowConstants.VERSION_NONE;
private long currentVersion = VERSION_NONE;

HollowDataHolder(HollowReadStateEngine stateEngine,
HollowAPIFactory apiFactory,
Expand Down Expand Up @@ -106,7 +107,7 @@ HollowDataHolder setSkipTypeShardUpdateWithNoAdditions(boolean skipTypeShardUpda
}

void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
// Only fail if double snapshot is configured.
// This is a short term solution until it is decided to either remove this feature
// or refine it.
Expand All @@ -123,19 +124,19 @@ void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refres
}

if (updatePlan.isSnapshotPlan()) {
applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback);
applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback, isInitialUpdate);
} else {
applyDeltaOnlyPlan(updatePlan, refreshListeners);
applyDeltaOnlyPlan(updatePlan, refreshListeners, isInitialUpdate);
}
}

private void applySnapshotPlan(HollowUpdatePlan updatePlan,
HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback);
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback, isInitialUpdate);

for(HollowConsumer.Blob blob : updatePlan.getDeltaTransitions()) {
applyDeltaTransition(blob, true, refreshListeners);
applyDeltaTransition(blob, true, refreshListeners, isInitialUpdate);
}

try {
Expand All @@ -149,10 +150,10 @@ private void applySnapshotPlan(HollowUpdatePlan updatePlan,

private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob,
HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, snapshotBlob);
OptionalBlobPartInput optionalPartIn = snapshotBlob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners);
applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners, isInitialUpdate);
initializeAPI(apiInitCallback);

for (HollowConsumer.RefreshListener refreshListener : refreshListeners) {
Expand All @@ -165,7 +166,11 @@ private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob,
}
}

private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInput optionalPartIn, HollowConsumer.Blob transition, HollowConsumer.RefreshListener[] refreshListeners) throws IOException {
private void applyStateEngineTransition(HollowBlobInput in,
OptionalBlobPartInput optionalPartIn,
HollowConsumer.Blob transition,
HollowConsumer.RefreshListener[] refreshListeners,
boolean isInitialUpdate) throws IOException {
if(transition.isSnapshot()) {
if(filter == null) {
reader.readSnapshot(in, optionalPartIn);
Expand All @@ -174,11 +179,11 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu
reader.readSnapshot(in, optionalPartIn, filter);
}
} else {
reader.applyDelta(in, optionalPartIn);
long expectedToVersion = transition.getToVersion();
reader.applyDelta(in, optionalPartIn, expectedToVersion, isInitialUpdate);
}

setVersion(transition.getToVersion());

for(HollowConsumer.RefreshListener refreshListener : refreshListeners)
refreshListener.blobLoaded(transition);
}
Expand All @@ -199,21 +204,22 @@ private void initializeAPI(Runnable r) {
}
}

private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners,
boolean isInitialUpdate) throws Throwable {
for(HollowConsumer.Blob blob : updatePlan) {
applyDeltaTransition(blob, false, refreshListeners);
applyDeltaTransition(blob, false, refreshListeners, isInitialUpdate);
}
}

private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners, boolean isInitialUpdate) throws Throwable {
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("Skipping delta transition in shared-memory mode");
return;
}

try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob);
OptionalBlobPartInput optionalPartIn = blob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners);
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners, isInitialUpdate);

if(objLongevityConfig.enableLongLivedObjectSupport()) {
HollowDataAccess previousDataAccess = currentAPI.getDataAccess();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.hollow.api.error;

public class VersionMismatchException extends HollowException {
private final long expectedVersion;

private final long actualVersion;

public VersionMismatchException(long expectedVersion, long actualVersion) {
super("toVersion in blob did not match toVersion requested in transition; actualToVersion=" + actualVersion + ", expectedToVersion=" + expectedVersion);
this.expectedVersion = expectedVersion;
this.actualVersion = actualVersion;
}

public long getExpectedVersion() {
return expectedVersion;
}

public long getActualVersion() {
return actualVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.api.error.VersionMismatchException;
import com.netflix.hollow.core.HollowBlobHeader;
import com.netflix.hollow.core.HollowBlobOptionalPartHeader;
import com.netflix.hollow.core.memory.MemoryMode;
Expand Down Expand Up @@ -44,6 +45,8 @@
import java.util.TreeSet;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;

/**
* A HollowBlobReader is used to populate and update data in a {@link HollowReadStateEngine}, via the consumption
* of snapshot and delta blobs. Caller can choose between on-heap or shared-memory mode; defaults to (and for
Expand Down Expand Up @@ -149,7 +152,8 @@ public void readSnapshot(HollowBlobInput in, OptionalBlobPartInput optionalParts
if(optionalParts != null)
optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode());

HollowBlobHeader header = readHeader(in, false);
HollowBlobHeader header = headerReader.readHeader(in);
applyHeader(header, false);
List<HollowBlobOptionalPartHeader> partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode());
List<HollowSchema> allSchemas = combineSchemas(header, partHeaders);

Expand Down Expand Up @@ -218,12 +222,24 @@ public void applyDelta(HollowBlobInput in) throws IOException {
}

public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts) throws IOException {
applyDelta(in, optionalParts, 0, false);
}
public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts, long expectedVersion, boolean isInitialUpdate) throws IOException {
validateMemoryMode(in.getMemoryMode());
Map<String, HollowBlobInput> optionalPartInputs = null;
if(optionalParts != null)
optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode());

HollowBlobHeader header = readHeader(in, true);
HollowBlobHeader header = headerReader.readHeader(in);
if (expectedVersion != 0 && !isInitialUpdate) {
String to_version_tag = header.getHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION);
if (to_version_tag != null) {
long to_version = Long.parseLong(to_version_tag);
if (expectedVersion != to_version) {
throw new VersionMismatchException(expectedVersion, to_version);
}
}
}
applyHeader(header, true);
List<HollowBlobOptionalPartHeader> partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode());
notifyBeginUpdate();

Expand Down Expand Up @@ -258,16 +274,13 @@ public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts)
notifyEndUpdate();
}

private HollowBlobHeader readHeader(HollowBlobInput in, boolean isDelta) throws IOException {
HollowBlobHeader header = headerReader.readHeader(in);

private void applyHeader(HollowBlobHeader header, boolean isDelta) throws IOException {
if(isDelta && header.getOriginRandomizedTag() != stateEngine.getCurrentRandomizedTag())
throw new IOException("Attempting to apply a delta to a state from which it was not originated!");

stateEngine.setCurrentRandomizedTag(header.getDestinationRandomizedTag());
stateEngine.setOriginRandomizedTag(header.getOriginRandomizedTag());
stateEngine.setHeaderTags(header.getHeaderTags());
return header;
}

private List<HollowBlobOptionalPartHeader> readPartHeaders(HollowBlobHeader header, Map<String, HollowBlobInput> inputsByPartName, MemoryMode mode) throws IOException {
Expand Down
Loading
Loading