Skip to content

Commit

Permalink
change consumer to read metadata in blob header for version number
Browse files Browse the repository at this point in the history
  • Loading branch information
workeatsleep committed Nov 16, 2023
1 parent e7ce7db commit 5e4cb58
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import com.netflix.hollow.tools.history.HollowHistoricalStateDataAccess;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;

/**
* A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption.
*/
Expand Down Expand Up @@ -177,7 +181,21 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu
reader.applyDelta(in, optionalPartIn);
}

setVersion(transition.getToVersion());
long expectedToVersion = transition.getToVersion();
Long actualToVersion = null;
String actualToVersionStr = stateEngine.getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION);
if (actualToVersionStr != null) {
actualToVersion = Long.valueOf(actualToVersionStr);
}

if (actualToVersion != null) {
if (actualToVersion.longValue() != expectedToVersion) {
LOG.warning("toVersion in blob didn't match toVersion seen in metadata; actualToVersion=" + actualToVersion + ", expectedToVersion=" + expectedToVersion);
}
setVersion(actualToVersion.longValue());
} else {
setVersion(transition.getToVersion());
}

for(HollowConsumer.RefreshListener refreshListener : refreshListeners)
refreshListener.blobLoaded(transition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.netflix.hollow.core.HollowConstants.VERSION_LATEST;
import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH;
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -46,6 +47,7 @@
import com.netflix.hollow.test.consumer.TestHollowConsumer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -55,6 +57,8 @@
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -98,6 +102,26 @@ public void testUpdateTo_noVersions() throws Throwable {
assertTrue("Should still have no types", readStateEngine.getAllTypes().isEmpty());
}

@Test
public void testUpdateTo_updateToLatestButBlobOnDifferentVersion() throws Throwable {
long updateToVersion = 1234;
long blobToVersion = 1235; //this needs to be different from updateToVersion
HollowWriteStateEngine writeStateEngine = new HollowWriteStateEngine();
writeStateEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(blobToVersion));
ByteArrayOutputStream os = new ByteArrayOutputStream();
new HollowBlobWriter(writeStateEngine).writeSnapshot(os);
ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
HollowConsumer.Blob blob = mock(HollowConsumer.Blob.class);
when(blob.isSnapshot())
.thenReturn(true);
when(blob.getInputStream())
.thenReturn(is);
when(retriever.retrieveSnapshotBlob(anyLong()))
.thenReturn(blob);
subject.updateTo(updateToVersion);
Assert.assertEquals(subject.getCurrentVersionId(), blobToVersion);
}

@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down

0 comments on commit 5e4cb58

Please sign in to comment.