Skip to content

Commit

Permalink
fixing NPE in C8YNotificationSubscriber, when connector was never act…
Browse files Browse the repository at this point in the history
…ivated
  • Loading branch information
ck-c8y committed Mar 5, 2024
1 parent 03a2a52 commit ad146b4
Showing 1 changed file with 102 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,10 @@ public void setConfigurationRegistry(@Lazy ConfigurationRegistry configurationRe
private Map<String, Integer> tenantWSStatusCode = new HashMap<>();
private Map<String, Integer> deviceWSStatusCode = new HashMap<>();

// structure: <tenant, <connectorIdent, tokenSeed>>
// structure: <tenant, <connectorIdent, tokenSeed>>
private Map<String, Map<String, String>> deviceTokenPerConnector = new HashMap<>();
private Map<String, String> tenantToken = new HashMap<>();


public void addSubscriber(String tenant, String ident, AsynchronousDispatcherOutbound dispatcherOutbound) {
Map<String, AsynchronousDispatcherOutbound> dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant);
if (dispatcherOutboundMap == null) {
Expand All @@ -128,12 +127,12 @@ public void addSubscriber(String tenant, String ident, AsynchronousDispatcherOut
// section 1: initializing tenant client and device client
//
/**
public void initTenantClient() {
// Subscribe on Tenant do get informed when devices get deleted/added
String tenant = subscriptionsService.getTenant();
log.info("Tenant {} - Initializing Operation Subscriptions...", tenant);
//subscribeTenantAndConnect(subscriptionsService.getTenant());
}
* public void initTenantClient() {
* // Subscribe on Tenant do get informed when devices get deleted/added
* String tenant = subscriptionsService.getTenant();
* log.info("Tenant {} - Initializing Operation Subscriptions...", tenant);
* //subscribeTenantAndConnect(subscriptionsService.getTenant());
* }
**/

public void initDeviceClient() {
Expand All @@ -147,26 +146,27 @@ public void initDeviceClient() {
// Getting existing subscriptions
deviceSubList = getNotificationSubscriptionForDevices(null, DEVICE_SUBSCRIPTION).get();
log.info("Tenant {} - Subscribing to devices {}", tenant, deviceSubList);
} catch (InterruptedException | ExecutionException e ) {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
// When one subscription exists, connect...
if (deviceSubList.size() > 0) {
try {
// For each dispatcher/connector create a new connection
if (dispatcherOutboundMaps.get(tenant) != null) {
for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant).values()) {
String tokenSeed = DEVICE_SUBSCRIBER
+ dispatcherOutbound.getConnectorClient().getConnectorIdent()
+ additionalSubscriptionIdTest;
String token = createToken(DEVICE_SUBSCRIPTION,
tokenSeed);
deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), tokenSeed);
CustomWebSocketClient client = connect(token, dispatcherOutbound);
deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(),
client);

}
for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
.values()) {
String tokenSeed = DEVICE_SUBSCRIBER
+ dispatcherOutbound.getConnectorClient().getConnectorIdent()
+ additionalSubscriptionIdTest;
String token = createToken(DEVICE_SUBSCRIPTION,
tokenSeed);
deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), tokenSeed);
CustomWebSocketClient client = connect(token, dispatcherOutbound);
deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(),
client);

}
}
} catch (URISyntaxException e) {
log.error("Tenant {} - Error connecting device subscription: {}", tenant, e.getLocalizedMessage());
Expand All @@ -178,7 +178,7 @@ public void notificationSubscriberReconnect(String tenant) {
subscriptionsService.runForTenant(tenant, () -> {
disconnect(tenant, false);
// notificationSubscriber.init();
//initTenantClient();
// initTenantClient();
initDeviceClient();
});
}
Expand All @@ -189,66 +189,74 @@ public void notificationSubscriberReconnect(String tenant) {

// Not needed anymore
/**
public void subscribeTenantAndConnect(String tenant) {
log.info("Tenant {} - Creating new Tenant Subscription", tenant);
NotificationSubscriptionRepresentation notificationSubscriptionRepresentation = createTenantSubscription();
String tenantToken = createToken(notificationSubscriptionRepresentation.getSubscription(),
TENANT_SUBSCRIBER + additionalSubscriptionIdTest);
this.tenantToken.put(tenant, tenantToken);
try {
NotificationCallback tenantCallback = new NotificationCallback() {
@Override
public void onOpen(URI uri) {
log.info("Tenant {} - Connected to Cumulocity notification service over WebSocket {}", tenant, uri);
tenantWSStatusCode.put(tenant, 200);
}
@Override
public void onNotification(Notification notification) {
try {
log.debug("Tenant {} - Tenant Notification received: <{}>", tenant, notification.getMessage());
log.debug("Tenant {} - Notification headers: <{}>", tenant,
notification.getNotificationHeaders());
String tenant = notification.getTenantFromNotificationHeaders();
ManagedObjectRepresentation mor = JSONBase.getJSONParser()
.parse(ManagedObjectRepresentation.class, notification.getMessage());
if (notification.getNotificationHeaders().contains("DELETE")) {
log.info("Tenant {} - Device deleted with name {} and id {}", tenant, mor.getName(),
mor.getId().getValue());
final ManagedObjectRepresentation morRetrieved = configurationRegistry.getC8yAgent()
.getManagedObjectForId(tenant, mor.getId().getValue());
if (morRetrieved != null) {
unsubscribeDeviceAndDisconnect(morRetrieved);
}
}
} catch (Exception e) {
log.error("Tenant {} - Error on processing Tenant Notification {}: {}", tenant, notification.getMessage(),
e.getLocalizedMessage());
}
}
@Override
public void onError(Throwable t) {
log.error("Tenant {} - We got an exception: {}", tenant, t);
}
@Override
public void onClose(int statusCode, String reason) {
log.info("Tenant {} - Tenant ws connection closed.", tenant);
if (reason.contains("401"))
tenantWSStatusCode.put(tenant, 401);
else
tenantWSStatusCode.put(tenant, 0);
}
};
connect(tenantToken, tenantCallback);
// tenantClientMap.put(tenant, tenant_client);
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
* public void subscribeTenantAndConnect(String tenant) {
* log.info("Tenant {} - Creating new Tenant Subscription", tenant);
* NotificationSubscriptionRepresentation notificationSubscriptionRepresentation
* = createTenantSubscription();
* String tenantToken =
* createToken(notificationSubscriptionRepresentation.getSubscription(),
* TENANT_SUBSCRIBER + additionalSubscriptionIdTest);
* this.tenantToken.put(tenant, tenantToken);
*
* try {
* NotificationCallback tenantCallback = new NotificationCallback() {
*
* @Override
* public void onOpen(URI uri) {
* log.info("Tenant {} - Connected to Cumulocity notification service
* over WebSocket {}", tenant, uri);
* tenantWSStatusCode.put(tenant, 200);
* }
*
* @Override
* public void onNotification(Notification notification) {
* try {
* log.debug("Tenant {} - Tenant Notification received: <{}>", tenant,
* notification.getMessage());
* log.debug("Tenant {} - Notification headers: <{}>", tenant,
* notification.getNotificationHeaders());
* String tenant = notification.getTenantFromNotificationHeaders();
* ManagedObjectRepresentation mor = JSONBase.getJSONParser()
* .parse(ManagedObjectRepresentation.class,
* notification.getMessage());
* if (notification.getNotificationHeaders().contains("DELETE")) {
* log.info("Tenant {} - Device deleted with name {} and id {}",
* tenant, mor.getName(),
* mor.getId().getValue());
* final ManagedObjectRepresentation morRetrieved =
* configurationRegistry.getC8yAgent()
* .getManagedObjectForId(tenant, mor.getId().getValue());
* if (morRetrieved != null) {
* unsubscribeDeviceAndDisconnect(morRetrieved);
* }
* }
* } catch (Exception e) {
* log.error("Tenant {} - Error on processing Tenant Notification {}:
* {}", tenant, notification.getMessage(),
* e.getLocalizedMessage());
* }
* }
*
* @Override
* public void onError(Throwable t) {
* log.error("Tenant {} - We got an exception: {}", tenant, t);
* }
*
* @Override
* public void onClose(int statusCode, String reason) {
* log.info("Tenant {} - Tenant ws connection closed.", tenant);
* if (reason.contains("401"))
* tenantWSStatusCode.put(tenant, 401);
* else
* tenantWSStatusCode.put(tenant, 0);
* }
* };
* connect(tenantToken, tenantCallback);
* // tenantClientMap.put(tenant, tenant_client);
* } catch (URISyntaxException e) {
* e.printStackTrace();
* }
* }
**/

public NotificationSubscriptionRepresentation createTenantSubscription() {
Expand Down Expand Up @@ -499,13 +507,16 @@ public void setDeviceConnectionStatus(String tenant, int status) {

public void removeConnector(String tenant, String connectorIdent) {
// Remove Dispatcher from list
if(this.dispatcherOutboundMaps.get(tenant) != null)
if (this.dispatcherOutboundMaps.get(tenant) != null)
this.dispatcherOutboundMaps.get(tenant).remove(connectorIdent);
if(this.deviceClientMap.get(tenant) != null) {
// Close WS connection for connector
this.deviceClientMap.get(tenant).get(connectorIdent).close();
// Remove client from client Map
this.deviceClientMap.get(tenant).remove(connectorIdent);
if (this.deviceClientMap.get(tenant) != null) {
// Test if connector was created at all and then close WS connection for
// connector
if (this.deviceClientMap.get(tenant).get(connectorIdent) != null) {
this.deviceClientMap.get(tenant).get(connectorIdent).close();
// Remove client from client Map
this.deviceClientMap.get(tenant).remove(connectorIdent);
}
}
if (this.dispatcherOutboundMaps.get(tenant) != null && dispatcherOutboundMaps.get(tenant).keySet().isEmpty())
disconnect(tenant, false);
Expand Down Expand Up @@ -608,9 +619,9 @@ public void reconnect() {
if (tenantWSStatusCode.get(tenant) == 401
|| tenantClient.getReadyState().equals(ReadyState.NOT_YET_CONNECTED)) {
log.info("Tenant {} - Trying to reconnect ws tenant client... ", tenant);
//subscriptionsService.runForEachTenant(() -> {
// initTenantClient();
//});
// subscriptionsService.runForEachTenant(() -> {
// initTenantClient();
// });
} else if (tenantClient.getReadyState().equals(ReadyState.CLOSING)
|| tenantClient.getReadyState().equals(ReadyState.CLOSED)) {
log.info("Tenant {} - Trying to reconnect ws tenant client... ", tenant);
Expand Down

0 comments on commit ad146b4

Please sign in to comment.