From ad146b4ac826b567740dfda6e8703dc0ec155dd4 Mon Sep 17 00:00:00 2001 From: sagIoTPower Date: Tue, 5 Mar 2024 15:46:24 +0100 Subject: [PATCH] fixing NPE in C8YNotificationSubscriber, when connector was never activated --- .../C8YNotificationSubscriber.java | 193 +++++++++--------- 1 file changed, 102 insertions(+), 91 deletions(-) diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/notification/C8YNotificationSubscriber.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/notification/C8YNotificationSubscriber.java index 8915c183..4ab208bc 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/notification/C8YNotificationSubscriber.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/notification/C8YNotificationSubscriber.java @@ -107,11 +107,10 @@ public void setConfigurationRegistry(@Lazy ConfigurationRegistry configurationRe private Map tenantWSStatusCode = new HashMap<>(); private Map deviceWSStatusCode = new HashMap<>(); - // structure: > + // structure: > private Map> deviceTokenPerConnector = new HashMap<>(); private Map tenantToken = new HashMap<>(); - public void addSubscriber(String tenant, String ident, AsynchronousDispatcherOutbound dispatcherOutbound) { Map dispatcherOutboundMap = getDispatcherOutboundMaps().get(tenant); if (dispatcherOutboundMap == null) { @@ -128,12 +127,12 @@ public void addSubscriber(String tenant, String ident, AsynchronousDispatcherOut // section 1: initializing tenant client and device client // /** - public void initTenantClient() { - // Subscribe on Tenant do get informed when devices get deleted/added - String tenant = subscriptionsService.getTenant(); - log.info("Tenant {} - Initializing Operation Subscriptions...", tenant); - //subscribeTenantAndConnect(subscriptionsService.getTenant()); - } + * public void initTenantClient() { + * // Subscribe on Tenant do get informed when devices get deleted/added + * String tenant = subscriptionsService.getTenant(); + * log.info("Tenant {} - Initializing Operation Subscriptions...", tenant); + * //subscribeTenantAndConnect(subscriptionsService.getTenant()); + * } **/ public void initDeviceClient() { @@ -147,7 +146,7 @@ public void initDeviceClient() { // Getting existing subscriptions deviceSubList = getNotificationSubscriptionForDevices(null, DEVICE_SUBSCRIPTION).get(); log.info("Tenant {} - Subscribing to devices {}", tenant, deviceSubList); - } catch (InterruptedException | ExecutionException e ) { + } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } // When one subscription exists, connect... @@ -155,18 +154,19 @@ public void initDeviceClient() { try { // For each dispatcher/connector create a new connection if (dispatcherOutboundMaps.get(tenant) != null) { - for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant).values()) { - String tokenSeed = DEVICE_SUBSCRIBER - + dispatcherOutbound.getConnectorClient().getConnectorIdent() - + additionalSubscriptionIdTest; - String token = createToken(DEVICE_SUBSCRIPTION, - tokenSeed); - deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), tokenSeed); - CustomWebSocketClient client = connect(token, dispatcherOutbound); - deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), - client); - - } + for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant) + .values()) { + String tokenSeed = DEVICE_SUBSCRIBER + + dispatcherOutbound.getConnectorClient().getConnectorIdent() + + additionalSubscriptionIdTest; + String token = createToken(DEVICE_SUBSCRIPTION, + tokenSeed); + deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), tokenSeed); + CustomWebSocketClient client = connect(token, dispatcherOutbound); + deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), + client); + + } } } catch (URISyntaxException e) { log.error("Tenant {} - Error connecting device subscription: {}", tenant, e.getLocalizedMessage()); @@ -178,7 +178,7 @@ public void notificationSubscriberReconnect(String tenant) { subscriptionsService.runForTenant(tenant, () -> { disconnect(tenant, false); // notificationSubscriber.init(); - //initTenantClient(); + // initTenantClient(); initDeviceClient(); }); } @@ -189,66 +189,74 @@ public void notificationSubscriberReconnect(String tenant) { // Not needed anymore /** - public void subscribeTenantAndConnect(String tenant) { - log.info("Tenant {} - Creating new Tenant Subscription", tenant); - NotificationSubscriptionRepresentation notificationSubscriptionRepresentation = createTenantSubscription(); - String tenantToken = createToken(notificationSubscriptionRepresentation.getSubscription(), - TENANT_SUBSCRIBER + additionalSubscriptionIdTest); - this.tenantToken.put(tenant, tenantToken); - - try { - NotificationCallback tenantCallback = new NotificationCallback() { - - @Override - public void onOpen(URI uri) { - log.info("Tenant {} - Connected to Cumulocity notification service over WebSocket {}", tenant, uri); - tenantWSStatusCode.put(tenant, 200); - } - - @Override - public void onNotification(Notification notification) { - try { - log.debug("Tenant {} - Tenant Notification received: <{}>", tenant, notification.getMessage()); - log.debug("Tenant {} - Notification headers: <{}>", tenant, - notification.getNotificationHeaders()); - String tenant = notification.getTenantFromNotificationHeaders(); - ManagedObjectRepresentation mor = JSONBase.getJSONParser() - .parse(ManagedObjectRepresentation.class, notification.getMessage()); - if (notification.getNotificationHeaders().contains("DELETE")) { - log.info("Tenant {} - Device deleted with name {} and id {}", tenant, mor.getName(), - mor.getId().getValue()); - final ManagedObjectRepresentation morRetrieved = configurationRegistry.getC8yAgent() - .getManagedObjectForId(tenant, mor.getId().getValue()); - if (morRetrieved != null) { - unsubscribeDeviceAndDisconnect(morRetrieved); - } - } - } catch (Exception e) { - log.error("Tenant {} - Error on processing Tenant Notification {}: {}", tenant, notification.getMessage(), - e.getLocalizedMessage()); - } - } - - @Override - public void onError(Throwable t) { - log.error("Tenant {} - We got an exception: {}", tenant, t); - } - - @Override - public void onClose(int statusCode, String reason) { - log.info("Tenant {} - Tenant ws connection closed.", tenant); - if (reason.contains("401")) - tenantWSStatusCode.put(tenant, 401); - else - tenantWSStatusCode.put(tenant, 0); - } - }; - connect(tenantToken, tenantCallback); - // tenantClientMap.put(tenant, tenant_client); - } catch (URISyntaxException e) { - e.printStackTrace(); - } - } + * public void subscribeTenantAndConnect(String tenant) { + * log.info("Tenant {} - Creating new Tenant Subscription", tenant); + * NotificationSubscriptionRepresentation notificationSubscriptionRepresentation + * = createTenantSubscription(); + * String tenantToken = + * createToken(notificationSubscriptionRepresentation.getSubscription(), + * TENANT_SUBSCRIBER + additionalSubscriptionIdTest); + * this.tenantToken.put(tenant, tenantToken); + * + * try { + * NotificationCallback tenantCallback = new NotificationCallback() { + * + * @Override + * public void onOpen(URI uri) { + * log.info("Tenant {} - Connected to Cumulocity notification service + * over WebSocket {}", tenant, uri); + * tenantWSStatusCode.put(tenant, 200); + * } + * + * @Override + * public void onNotification(Notification notification) { + * try { + * log.debug("Tenant {} - Tenant Notification received: <{}>", tenant, + * notification.getMessage()); + * log.debug("Tenant {} - Notification headers: <{}>", tenant, + * notification.getNotificationHeaders()); + * String tenant = notification.getTenantFromNotificationHeaders(); + * ManagedObjectRepresentation mor = JSONBase.getJSONParser() + * .parse(ManagedObjectRepresentation.class, + * notification.getMessage()); + * if (notification.getNotificationHeaders().contains("DELETE")) { + * log.info("Tenant {} - Device deleted with name {} and id {}", + * tenant, mor.getName(), + * mor.getId().getValue()); + * final ManagedObjectRepresentation morRetrieved = + * configurationRegistry.getC8yAgent() + * .getManagedObjectForId(tenant, mor.getId().getValue()); + * if (morRetrieved != null) { + * unsubscribeDeviceAndDisconnect(morRetrieved); + * } + * } + * } catch (Exception e) { + * log.error("Tenant {} - Error on processing Tenant Notification {}: + * {}", tenant, notification.getMessage(), + * e.getLocalizedMessage()); + * } + * } + * + * @Override + * public void onError(Throwable t) { + * log.error("Tenant {} - We got an exception: {}", tenant, t); + * } + * + * @Override + * public void onClose(int statusCode, String reason) { + * log.info("Tenant {} - Tenant ws connection closed.", tenant); + * if (reason.contains("401")) + * tenantWSStatusCode.put(tenant, 401); + * else + * tenantWSStatusCode.put(tenant, 0); + * } + * }; + * connect(tenantToken, tenantCallback); + * // tenantClientMap.put(tenant, tenant_client); + * } catch (URISyntaxException e) { + * e.printStackTrace(); + * } + * } **/ public NotificationSubscriptionRepresentation createTenantSubscription() { @@ -499,13 +507,16 @@ public void setDeviceConnectionStatus(String tenant, int status) { public void removeConnector(String tenant, String connectorIdent) { // Remove Dispatcher from list - if(this.dispatcherOutboundMaps.get(tenant) != null) + if (this.dispatcherOutboundMaps.get(tenant) != null) this.dispatcherOutboundMaps.get(tenant).remove(connectorIdent); - if(this.deviceClientMap.get(tenant) != null) { - // Close WS connection for connector - this.deviceClientMap.get(tenant).get(connectorIdent).close(); - // Remove client from client Map - this.deviceClientMap.get(tenant).remove(connectorIdent); + if (this.deviceClientMap.get(tenant) != null) { + // Test if connector was created at all and then close WS connection for + // connector + if (this.deviceClientMap.get(tenant).get(connectorIdent) != null) { + this.deviceClientMap.get(tenant).get(connectorIdent).close(); + // Remove client from client Map + this.deviceClientMap.get(tenant).remove(connectorIdent); + } } if (this.dispatcherOutboundMaps.get(tenant) != null && dispatcherOutboundMaps.get(tenant).keySet().isEmpty()) disconnect(tenant, false); @@ -608,9 +619,9 @@ public void reconnect() { if (tenantWSStatusCode.get(tenant) == 401 || tenantClient.getReadyState().equals(ReadyState.NOT_YET_CONNECTED)) { log.info("Tenant {} - Trying to reconnect ws tenant client... ", tenant); - //subscriptionsService.runForEachTenant(() -> { - // initTenantClient(); - //}); + // subscriptionsService.runForEachTenant(() -> { + // initTenantClient(); + // }); } else if (tenantClient.getReadyState().equals(ReadyState.CLOSING) || tenantClient.getReadyState().equals(ReadyState.CLOSED)) { log.info("Tenant {} - Trying to reconnect ws tenant client... ", tenant);