Skip to content

Commit

Permalink
fix: update necessary if shadow cleared
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Oct 10, 2023
1 parent e97c818 commit f21c308
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
Expand Down Expand Up @@ -104,7 +105,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("PMD.ExcessiveClassLength")
@SuppressWarnings({"PMD.ExcessiveClassLength", "PMD.CouplingBetweenObjects"})
@ExtendWith({MockitoExtension.class, GGExtension.class})
class SyncTest extends NucleusLaunchUtils {
public static final String MOCK_THING_NAME_1 = "Thing1";
Expand Down Expand Up @@ -1480,13 +1481,76 @@ void GIVEN_local_shadow_state_empty_WHEN_shadow_manager_syncs_THEN_cloud_shadow_
assertLocalShadowEquals(initialLocalState);

resp = updateHandler.handleRequest(updateRequest2, "DoAll");
assertUpdateThingShadowHandlerResponseStateEquals(localUpdate1, resp); // null is not a valid document, so {} is returned
assertUpdateThingShadowHandlerResponseStateEquals(localUpdate2, resp);
assertEmptySyncQueue(clazz);
assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true)));
assertThat("cloud version", () -> syncInfo.get().get().getCloudVersion(), eventuallyEval(is(2L)));
assertThat("local version", () -> syncInfo.get().get().getLocalVersion(), eventuallyEval(is(3L)));
assertLocalShadowEquals(finalLocalState);
assertCloudUpdateEquals(finalLocalState);
assertCloudUpdateEquals("{\"state\":null}"); // verify clear request forwarded to cloud
}

@ParameterizedTest
@ValueSource(classes = {RealTimeSyncStrategy.class, PeriodicSyncStrategy.class})
void GIVEN_synced_shadow_WHEN_cloud_cleared_THEN_local_cleared(Class<?extends BaseSyncStrategy> clazz, ExtensionContext context) throws IOException, InterruptedException, IoTDataPlaneClientCreationException {
ignoreExceptionOfType(context, InterruptedException.class);
ignoreExceptionOfType(context, ResourceNotFoundException.class);

mockCloudUpdateResponsesWithIncreasingVersions(1);
setCloudThingShadow("{\"version\":1,\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}");

startNucleusWithConfig(NucleusLaunchUtilsConfig.builder()
.configFile(getSyncConfigFile(clazz))
.syncClazz(clazz)
.mockCloud(true)
.build());
waitForInitialSync(clazz, 1L, 1L);

// empty state update does not affect shadow state
sendCloudUpdate(MOCK_THING_NAME_1, CLASSIC_SHADOW, "{\"version\":2,\"state\":{}}");
assertSyncInfo(2L, 1L);
assertLocalShadowEquals("{\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}");

// null state update clears the shadow
sendCloudUpdate(MOCK_THING_NAME_1, CLASSIC_SHADOW, "{\"version\":3,\"state\":null}");
assertSyncInfo(3L, 2L);
assertLocalShadowEquals("{\"state\":{}}");
}

private void mockCloudUpdateResponsesWithIncreasingVersions(int initialVersion) throws IoTDataPlaneClientCreationException {
AtomicInteger version = new AtomicInteger(initialVersion);
when(iotDataPlaneClientFactory.getIotDataPlaneClient()
.updateThingShadow(cloudUpdateThingShadowRequestCaptor.capture()))
.thenAnswer(invocation -> {
UpdateThingShadowResponse response = mock(UpdateThingShadowResponse.class);
String responseDocument = String.format("{\"version\": %d}", version.incrementAndGet());
when(response.payload()).thenReturn(SdkBytes.fromString(responseDocument, UTF_8));
return response;
});
}

private void sendCloudUpdate(String thingName, String shadowName, String document) {
SyncHandler syncHandler = kernel.getContext().get(SyncHandler.class);
syncHandler.pushLocalUpdateSyncRequest(thingName, shadowName, document.getBytes(UTF_8));
}

private void waitForInitialSync(Class<? extends BaseSyncStrategy> strategy,
long expectedCloudVersion, long expectedLocalVersion) throws InterruptedException {
verify(syncQueue, after(7000).atMost(4)).put(any(FullShadowSyncRequest.class));
assertEmptySyncQueue(strategy);
assertSyncInfo(expectedCloudVersion, expectedLocalVersion);
}

private void assertSyncInfo(long expectedCloudVersion, long expectedLocalVersion) {
assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true)));
assertThat("cloud version", () -> syncInfo.get().get().getCloudVersion(), eventuallyEval(is(expectedCloudVersion)));
assertThat("local version", syncInfo.get().get().getLocalVersion(), is(expectedLocalVersion));
}

private void setCloudThingShadow(String document) throws IoTDataPlaneClientCreationException {
GetThingShadowResponse initialCloudStateShadowResponse = mock(GetThingShadowResponse.class, Answers.RETURNS_DEEP_STUBS);
lenient().when(initialCloudStateShadowResponse.payload().asByteArray()).thenReturn(document.getBytes(UTF_8));
when(iotDataPlaneClientFactory.getIotDataPlaneClient().getThingShadow(any(GetThingShadowRequest.class))).thenReturn(initialCloudStateShadowResponse);
}

private void assertUpdateThingShadowHandlerResponseStateEquals(String expectedDocument, UpdateThingShadowHandlerResponse resp) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,7 @@ public UpdateThingShadowHandlerResponse handleRequest(UpdateThingShadowRequest r
.withVersion(updatedDocument.getVersion())
.withClientToken(clientToken)
.withTimestamp(Instant.now())
// explicitly convert to shadow document to return valid state.
// this is to prevent edge cases like returning null
.withState(new ShadowDocument(updateDocumentRequest, false).getState().toJson())
.withState(updateDocumentRequest.get("state"))
.withMetadata(metadata)
.build();
byte[] responseNodeBytes = JsonUtil.getPayloadBytes(responseNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.aws.greengrass.util.SerializerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
Expand All @@ -28,6 +29,7 @@
* Class for managing operations on the Shadow Document.
*/
@Getter
@JsonDeserialize(using = ShadowDocumentDeserializer.class)
public class ShadowDocument {
@JsonProperty(value = SHADOW_DOCUMENT_STATE, required = true)
private ShadowState state;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.shadowmanager.model;

import com.aws.greengrass.util.SerializerFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import java.io.IOException;

public class ShadowDocumentDeserializer extends JsonDeserializer<ShadowDocument> {
// hack to prevent StackOverflowException for custom deserialize,
// allows us to use jackson's default deserialization for type within
// a custom deserializer
static {
SerializerFactory.getFailSafeJsonObjectMapper().addMixIn(ShadowDocument.class, DefaultJsonDeserializer.class);
}

@JsonDeserialize
private interface DefaultJsonDeserializer {
// Reset default json deserializer
}

@Override
public ShadowDocument deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {

ShadowDocument document = ctxt.readValue(p, ShadowDocument.class);
if (document.getState() == null) { // handle {"state": null} document
ShadowDocument clearDocument = new ShadowDocument(document);
clearDocument.getState().setClear(true);
return clearDocument;
}
return document;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

import com.aws.greengrass.shadowmanager.util.JsonMerger;
import com.aws.greengrass.shadowmanager.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.Setter;

import java.util.Iterator;

Expand All @@ -33,6 +35,13 @@ public class ShadowState {
@JsonProperty(SHADOW_DOCUMENT_STATE_REPORTED)
private JsonNode reported;

/**
* If true, this {@link ShadowState} represents the {"state": null} document, which resets shadow state.
*/
@Setter
@JsonIgnore
private boolean clear;

public ShadowState() {
this(null, null);
}
Expand All @@ -42,6 +51,11 @@ public ShadowState(final JsonNode desired, final JsonNode reported) {
this.reported = nullIfEmpty(reported);
}

private ShadowState(final JsonNode desired, final JsonNode reported, boolean clear) {
this(desired, reported);
this.clear = clear;
}

/**
* Creates a new instance of the shadow state by deep copying the desired and reported nodes.
*
Expand All @@ -50,7 +64,8 @@ public ShadowState(final JsonNode desired, final JsonNode reported) {
public ShadowState deepCopy() {
return new ShadowState(
isNullOrMissing(this.desired) ? this.desired : this.desired.deepCopy(),
isNullOrMissing(this.reported) ? this.reported : this.reported.deepCopy());
isNullOrMissing(this.reported) ? this.reported : this.reported.deepCopy(),
this.clear);
}

/**
Expand Down Expand Up @@ -103,6 +118,9 @@ public void update(JsonNode updatedStateNode) {
* @return a JSON node containing the shadow state.
*/
public JsonNode toJson() {
if (clear) {
return JsonUtil.OBJECT_MAPPER.nullNode();
}
final ObjectNode result = JsonUtil.OBJECT_MAPPER.createObjectNode();
if (this.desired != null) {
result.set(SHADOW_DOCUMENT_STATE_DESIRED, this.desired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ abstract boolean isUpdateNecessary(SyncContext context) throws RetryableExceptio
boolean isUpdateNecessary(JsonNode baseDocument, JsonNode update) {
JsonNode merged = baseDocument.deepCopy();
JsonMerger.merge(merged.get(SHADOW_DOCUMENT_STATE), update.get(SHADOW_DOCUMENT_STATE));
// explicitly handle case where state is cleared with null,
// since resulting merge will have equal documents
if (!JsonUtil.isNullStateDocument(baseDocument) && JsonUtil.isNullStateDocument(update)) {
return true;
}
return !baseDocument.equals(merged);
}

Expand Down

0 comments on commit f21c308

Please sign in to comment.