Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hollow metrics collector bug fix #703

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public HollowClientUpdater(HollowConsumer.BlobRetriever transitionCreator,
this.staleReferenceDetector.startMonitoring();
this.metrics = metrics;
this.metricsCollector = metricsCollector;
this.metricsCollector.setMetrics(metrics);
this.initialLoad = new CompletableFuture<>();
}

Expand Down Expand Up @@ -127,102 +128,103 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
}
return true;
}
metrics.setLastRefreshStartNs(System.nanoTime());
try {
metrics.setLastRefreshStartNs(System.nanoTime());

// Take a snapshot of the listeners to ensure additions or removals may occur concurrently
// but will not take effect until a subsequent refresh
final HollowConsumer.RefreshListener[] localListeners =
refreshListeners.toArray(new HollowConsumer.RefreshListener[0]);
// Take a snapshot of the listeners to ensure additions or removals may occur concurrently
// but will not take effect until a subsequent refresh
final HollowConsumer.RefreshListener[] localListeners =
refreshListeners.toArray(new HollowConsumer.RefreshListener[0]);

for (HollowConsumer.RefreshListener listener : localListeners) {
listener.versionDetected(requestedVersionInfo);
}
for (HollowConsumer.RefreshListener listener : localListeners) {
listener.versionDetected(requestedVersionInfo);
}

long beforeVersion = getCurrentVersionId();
long beforeVersion = getCurrentVersionId();

for (HollowConsumer.RefreshListener listener : localListeners) {
listener.refreshStarted(beforeVersion, requestedVersion);
}
for (HollowConsumer.RefreshListener listener : localListeners) {
listener.refreshStarted(beforeVersion, requestedVersion);
}

try {
HollowUpdatePlan updatePlan = shouldCreateSnapshotPlan(requestedVersionInfo)
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
try {
HollowUpdatePlan updatePlan = shouldCreateSnapshotPlan(requestedVersionInfo)
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());

for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
((HollowConsumer.TransitionAwareRefreshListener)listener).transitionsPlanned(beforeVersion, requestedVersion, updatePlan.isSnapshotPlan(), updatePlan.getTransitionSequence());
for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
((HollowConsumer.TransitionAwareRefreshListener) listener).transitionsPlanned(beforeVersion, requestedVersion, updatePlan.isSnapshotPlan(), updatePlan.getTransitionSequence());

if (updatePlan.destinationVersion() == HollowConstants.VERSION_NONE
&& requestedVersion != HollowConstants.VERSION_LATEST) {
String msg = String.format("Could not create an update plan for version %s, because "
+ "that version or any qualifying previous versions could not be retrieved.", requestedVersion);
if (beforeVersion != HollowConstants.VERSION_NONE) {
msg += String.format(" Consumer will remain at current version %s until next update attempt.", beforeVersion);
if (updatePlan.destinationVersion() == HollowConstants.VERSION_NONE
&& requestedVersion != HollowConstants.VERSION_LATEST) {
String msg = String.format("Could not create an update plan for version %s, because "
+ "that version or any qualifying previous versions could not be retrieved.", requestedVersion);
if (beforeVersion != HollowConstants.VERSION_NONE) {
msg += String.format(" Consumer will remain at current version %s until next update attempt.", beforeVersion);
}
throw new IllegalArgumentException(msg);
}
throw new IllegalArgumentException(msg);
}

if (updatePlan.equals(HollowUpdatePlan.DO_NOTHING)
&& requestedVersion == HollowConstants.VERSION_LATEST)
throw new IllegalArgumentException("Could not create an update plan, because no existing versions could be retrieved.");
if (updatePlan.equals(HollowUpdatePlan.DO_NOTHING)
&& requestedVersion == HollowConstants.VERSION_LATEST)
throw new IllegalArgumentException("Could not create an update plan, because no existing versions could be retrieved.");

if (updatePlan.destinationVersion(requestedVersion) == getCurrentVersionId()) {
metrics.setLastRefreshEndNs(System.nanoTime());
return true;
}
if (updatePlan.destinationVersion(requestedVersion) == getCurrentVersionId()) {
return true;
}

if (updatePlan.isSnapshotPlan()) { // 1 snapshot and 0+ delta transitions
HollowDataHolder oldDh = hollowDataHolderVolatile;
if (oldDh == null || doubleSnapshotConfig.allowDoubleSnapshot()) {
HollowDataHolder newDh = newHollowDataHolder();
try {
/* We need to assign the volatile field after API init since it may be
* accessed during the update plan application, for example via a refresh
* listener (such as a unique key indexer) that calls getAPI. If we do it after
* newDh.update(), refresh listeners will see the old API. If we do it
* before then we open ourselves up to a race where a caller will get back
* null if they call getAPI after assigning the volatile but before the API
* is initialized in HollowDataHolder#initializeAPI.
* Also note that hollowDataHolderVolatile only changes for snapshot plans,
* and it is only for snapshot plans that HollowDataHolder#initializeAPI is
* called. */
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh);
} catch (Throwable t) {
// If the update plan failed then revert back to the old holder
hollowDataHolderVolatile = oldDh;
throw t;
if (updatePlan.isSnapshotPlan()) { // 1 snapshot and 0+ delta transitions
HollowDataHolder oldDh = hollowDataHolderVolatile;
if (oldDh == null || doubleSnapshotConfig.allowDoubleSnapshot()) {
HollowDataHolder newDh = newHollowDataHolder();
try {
/* We need to assign the volatile field after API init since it may be
* accessed during the update plan application, for example via a refresh
* listener (such as a unique key indexer) that calls getAPI. If we do it after
* newDh.update(), refresh listeners will see the old API. If we do it
* before then we open ourselves up to a race where a caller will get back
* null if they call getAPI after assigning the volatile but before the API
* is initialized in HollowDataHolder#initializeAPI.
* Also note that hollowDataHolderVolatile only changes for snapshot plans,
* and it is only for snapshot plans that HollowDataHolder#initializeAPI is
* called. */
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh);
} catch (Throwable t) {
// If the update plan failed then revert back to the old holder
hollowDataHolderVolatile = oldDh;
throw t;
}
forceDoubleSnapshot = false;
}
forceDoubleSnapshot = false;
} else { // 0 snapshot and 1+ delta transitions
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {
});
}
} else { // 0 snapshot and 1+ delta transitions
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {});
}

for(HollowConsumer.RefreshListener refreshListener : localListeners)
refreshListener.refreshSuccessful(beforeVersion, getCurrentVersionId(), requestedVersion);

metrics.updateTypeStateMetrics(getStateEngine(), requestedVersion);
if(metricsCollector != null)
metricsCollector.collect(metrics);

initialLoad.complete(getCurrentVersionId()); // only set the first time
metrics.setLastRefreshEndNs(System.nanoTime());
return getCurrentVersionId() == requestedVersion;
} catch(Throwable th) {
forceDoubleSnapshotNextUpdate();
metrics.updateRefreshFailed();
if(metricsCollector != null)
metricsCollector.collect(metrics);
for(HollowConsumer.RefreshListener refreshListener : localListeners)
refreshListener.refreshFailed(beforeVersion, getCurrentVersionId(), requestedVersion, th);

// intentionally omitting a call to initialLoad.completeExceptionally(th), for producers
// that write often a consumer has a chance to try another snapshot that might succeed
for (HollowConsumer.RefreshListener refreshListener : localListeners)
refreshListener.refreshSuccessful(beforeVersion, getCurrentVersionId(), requestedVersion);

metrics.updateTypeStateMetrics(getStateEngine(), requestedVersion);
if (metricsCollector != null)
metricsCollector.collect(metrics);

initialLoad.complete(getCurrentVersionId()); // only set the first time
return getCurrentVersionId() == requestedVersion;
} catch (Throwable th) {
forceDoubleSnapshotNextUpdate();
metrics.updateRefreshFailed();
if (metricsCollector != null)
metricsCollector.collect(metrics);
for (HollowConsumer.RefreshListener refreshListener : localListeners)
refreshListener.refreshFailed(beforeVersion, getCurrentVersionId(), requestedVersion, th);

// intentionally omitting a call to initialLoad.completeExceptionally(th), for producers
// that write often a consumer has a chance to try another snapshot that might succeed
throw th;
}
} finally {
metrics.setLastRefreshEndNs(System.nanoTime());

throw th;
}
}

Expand Down
Loading