Skip to content

Commit

Permalink
fix: synchronize cloud data client operations properly (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla authored May 30, 2024
1 parent 1c4e1ce commit 132bf74
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 99 deletions.
16 changes: 16 additions & 0 deletions gdk-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"component": {
"aws.greengrass.ShadowManager": {
"author": "Me",
"version": "NEXT_PATCH",
"build": {
"build_system": "maven"
},
"publish": {
"bucket": "ggv2componentartifacts",
"region": "us-east-1"
}
}
},
"gdk_version": "1.0.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void onConnectionInterrupted(int errorCode) {

@Override
public void onConnectionResumed(boolean sessionPresent) {
// Make sure that it is non-blocking as it is run on mqtt event loop thread.
if (inState(State.RUNNING)) {
startSyncingShadows(StartSyncInfo.builder().startSyncStrategy(true)
.updateCloudSubscriptions(true).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -48,9 +49,9 @@ public class CloudDataClient {
private final MqttClient mqttClient;
private final ExecutorService executorService;
@Getter(AccessLevel.PACKAGE)
private final Set<String> subscribedUpdateShadowTopics = new HashSet<>();
private final Set<String> subscribedUpdateShadowTopics = ConcurrentHashMap.newKeySet();
@Getter(AccessLevel.PACKAGE)
private final Set<String> subscribedDeleteShadowTopics = new HashSet<>();
private final Set<String> subscribedDeleteShadowTopics = ConcurrentHashMap.newKeySet();
private final Pattern shadowPattern = Pattern.compile("\\$aws\\/things\\/(.*)\\/shadow(\\/name\\/(.*))?"
+ "\\/(update|delete)\\/(accepted|rejected|delta|documents)");
private static final RetryUtils.RetryConfig RETRY_CONFIG = RetryUtils.RetryConfig.builder()
Expand All @@ -60,6 +61,7 @@ public class CloudDataClient {
.retryableExceptions(Collections.singletonList(SubscriptionRetryException.class))
.build();
private Future<?> syncLoopFuture;
private final Object subscriptionLock = new Object();

/**
* Ctr for CloudDataClient.
Expand Down Expand Up @@ -87,37 +89,10 @@ public void stopSubscribing() {
/**
* Unsubscribe to all shadow topics.
*/
public synchronized void unsubscribeForAllShadowsTopics() {
unsubscribeForAllShadowsTopics(subscribedUpdateShadowTopics, this::handleUpdate);
unsubscribeForAllShadowsTopics(subscribedDeleteShadowTopics, this::handleDelete);
}

/**
* Unsubscribe from all the shadow topics.
*
* @param topics topics to unsubscribe
* @param callback Callback function applied to shadow topic
*/
private synchronized void unsubscribeForAllShadowsTopics(Set<String> topics, Consumer<MqttMessage> callback) {
Set<String> topicsToUnsubscribe = new HashSet<>(topics);
for (String topic : topicsToUnsubscribe) {
try {
mqttClient.unsubscribe(UnsubscribeRequest.builder().callback(callback).topic(topic).build());
logger.atDebug().log("Unsubscribed from {}", topic);
topics.remove(topic);
} catch (TimeoutException | ExecutionException e) {
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.kv(LOG_TOPIC, topic)
.setCause(e)
.log("Failed to unsubscribe from shadow topic");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Failed from unsubscribe to all shadow topics");
}
}
public void unsubscribeForAllShadowsTopics() {
// There are no new topics to add to update/delete sets, so no new topics are subscribed.
// There are no new topics to remove from update/delete sets, so all the existing topics are unsubscribed.
updateSubscriptions(new HashSet<>());
}

/**
Expand Down Expand Up @@ -145,74 +120,71 @@ public synchronized void updateSubscriptions(Set<Pair<String, String>> shadowSet
* @param updateTopics Set of update shadow topics to subscribe to
* @param deleteTopics Set of delete shadow topics to subscribe to
*/
private synchronized void updateSubscriptions(Set<String> updateTopics, Set<String> deleteTopics) {
private void updateSubscriptions(Set<String> updateTopics, Set<String> deleteTopics) {
if (!mqttClient.connected()) {
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Attempting to update subscriptions when offline");
return;
}
// It is possible for a thread to hold the lock indefinitely as updating subscriptions is retried forever.
// The lock is released only when updating the subscriptions is successful or when the thread is interrupted
// (happens when we cancel the syncLoopFuture)
synchronized (subscriptionLock) {
// get update topics to remove and subscribe
Set<String> updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics);
updateTopicsToRemove.removeAll(updateTopics);

// get update topics to remove and subscribe
Set<String> updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics);
updateTopicsToRemove.removeAll(updateTopics);

Set<String> updateTopicsToSubscribe = new HashSet<>(updateTopics);
updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics);

Set<String> deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics);
deleteTopicsToRemove.removeAll(deleteTopics);

Set<String> deleteTopicsToSubscribe = new HashSet<>(deleteTopics);
deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics);

boolean success;
try {
success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> {
unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate);
subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate);

unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete);
subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete);
Set<String> updateTopicsToSubscribe = new HashSet<>(updateTopics);
updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics);

if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty()
|| !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty()
&& !Thread.currentThread().isInterrupted()) {
Set<String> deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics);
deleteTopicsToRemove.removeAll(deleteTopics);

throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to");
}
Set<String> deleteTopicsToSubscribe = new HashSet<>(deleteTopics);
deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics);

// if interrupted then handle
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Failed to update subscriptions");
return false;
}

return true;
}, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger);
boolean success;
try {
success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> {
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Could not update the shadow subscriptions as the thread is interrupted");
return false;
}
unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate);
subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate);

unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete);
subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete);

if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty()
|| !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty()) {
throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to");
}
return true;
}, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
} catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry()
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError().setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()).setCause(e)
.log("Got interrupted while updating the shadow subscriptions");
return;
} catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry()
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
}

if (success) {
logger.atDebug()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Finished updating subscriptions");
if (success) {
logger.atDebug()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Finished updating subscriptions");
}
}
}

Expand All @@ -224,8 +196,9 @@ private synchronized void updateSubscriptions(Set<String> updateTopics, Set<Stri
* @param callback Callback function applied to shadow topic
* @throws InterruptedException Interrupt occurred while trying to unsubscribe to shadows
*/
private synchronized void unsubscribeToShadows(Set<String> currentTopics, Set<String> topicsToUnsubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
@SuppressWarnings("PMD.PreserveStackTrace")
private void unsubscribeToShadows(Set<String> currentTopics, Set<String> topicsToUnsubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
Set<String> tempHashSet = new HashSet<>(topicsToUnsubscribe);
for (String topic : tempHashSet) {
try {
Expand All @@ -251,22 +224,18 @@ private synchronized void unsubscribeToShadows(Set<String> currentTopics, Set<St
* @param callback Callback function applied to shadow topic
* @throws InterruptedException Interrupt occurred while trying to subscribe to shadows
*/
private synchronized void subscribeToShadows(Set<String> currentTopics, Set<String> topicsToSubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
private void subscribeToShadows(Set<String> currentTopics, Set<String> topicsToSubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
Set<String> tempHashSet = new HashSet<>(topicsToSubscribe);

for (String topic : tempHashSet) {
try {
mqttClient.subscribe(SubscribeRequest.builder().topic(topic).callback(callback).build());
topicsToSubscribe.remove(topic);
currentTopics.add(topic);
logger.atDebug().log("Subscribed to {}", topic);
} catch (TimeoutException | ExecutionException e) {
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.kv(LOG_TOPIC, topic)
.setCause(e)
.log("Failed to subscribe to shadow topic");
logger.atWarn().setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()).kv(LOG_TOPIC, topic)
.setCause(e).log("Failed to subscribe to shadow topic");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ void GIVEN_100_synced_shadows_WHEN_unsubscribeForAllShadowsTopics_THEN_unsubscri
}

cloudDataClient.unsubscribeForAllShadowsTopics();
TimeUnit.MILLISECONDS.sleep(5000);

verify(mockMqttClient, times(200)).unsubscribe(any());
assertThat(cloudDataClient.getSubscribedUpdateShadowTopics().size(), is(0));
Expand Down

0 comments on commit 132bf74

Please sign in to comment.