From 6a15b95e1bbaa4713d8e85e79dad3e8674efd929 Mon Sep 17 00:00:00 2001 From: sagIoTPower Date: Sun, 17 Mar 2024 10:50:57 +0100 Subject: [PATCH] Avoid sending duplicate status messages --- .../core/client/AConnectorClient.java | 19 +++++++++++--- .../mapping/connector/mqtt/MQTTClient.java | 16 +++++------- .../java/dynamic/mapping/core/C8YAgent.java | 9 +++++-- .../mapping/core/MappingComponent.java | 26 ------------------- 4 files changed, 29 insertions(+), 41 deletions(-) diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java index 1a60cfd1..c85f5c85 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java @@ -112,6 +112,8 @@ public abstract class AConnectorClient { private Instant start = Instant.now(); + private ConnectorStatus previousConnectorStatus = ConnectorStatus.UNKNOWN; + @Getter @Setter public ConnectorConfiguration connectorConfiguration; @@ -145,8 +147,7 @@ public void loadConfiguration() { serviceConfiguration = serviceConfigurationComponent.getServiceConfiguration(tenant); configurationRegistry.getServiceConfigurations().put(tenant, serviceConfiguration); - connectorStatus.updateStatus(ConnectorStatus.CONFIGURED, true); - sendConnectorLifecycle(); + updateConnectorStatusAndSend(ConnectorStatus.CONFIGURED, true, true); } public void submitConnect() { @@ -261,9 +262,11 @@ public void runHousekeeping() { // check if connector is in DISCONNECTED state and then move it to CONFIGURED // state. if (ConnectorStatus.DISCONNECTED.equals(connectorStatus.status) && isConfigValid(connectorConfiguration)) { - connectorStatus.updateStatus(ConnectorStatus.CONFIGURED, true); + updateConnectorStatusAndSend(ConnectorStatus.CONFIGURED, true, true); } - sendConnectorLifecycle(); + // else { + // sendConnectorLifecycle(); + // } } catch (Exception ex) { log.error("Tenant {} - Error during house keeping execution: ", tenant, ex); } @@ -474,6 +477,14 @@ public void connectionLost(String closeMessage, Throwable closeException) { reconnect(); } + public void updateConnectorStatusAndSend(ConnectorStatus status, boolean clearMessage, boolean send) { + previousConnectorStatus = connectorStatus.getStatus(); + connectorStatus.updateStatus(status, clearMessage); + if (send && !(status.equals(previousConnectorStatus))) { + sendConnectorLifecycle(); + } + } + @Data @AllArgsConstructor public static class Certificate { diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java index d8be8634..14b5cb7d 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java @@ -199,8 +199,8 @@ public ConnectorSpecification getSpecification() { @Override public void connect() { - connectorStatus.updateStatus(ConnectorStatus.CONNECTING, true); - sendConnectorLifecycle(); + updateConnectorStatusAndSend(ConnectorStatus.CONNECTING, true, true); + log.info("Tenant {} - Establishing the MQTT connection now - phase I: (isConnected:shouldConnect) ({}:{})", tenant, isConnected(), shouldConnect()); @@ -287,8 +287,7 @@ tenant, isConnected(), // connectionState.setTrue(); log.info("Tenant {} - Successfully connected to broker {}", tenant, mqttClient.getConfig().getServerHost()); - connectorStatus.updateStatus(ConnectorStatus.CONNECTED, true); - sendConnectorLifecycle(); + updateConnectorStatusAndSend(ConnectorStatus.CONNECTED, true,true); } catch (Exception e) { log.error("Tenant {} - Failed to connect to broker {}, {}, {}, {}", tenant, mqttClient.getConfig().getServerHost(), e.getMessage(), connectionState.booleanValue(), @@ -330,6 +329,7 @@ tenant, isConnected(), } } + private void updateConnectorStatusToFailed(Exception e) { String msg = " --- " + e.getClass().getName() + ": " + e.getMessage(); @@ -337,7 +337,7 @@ private void updateConnectorStatusToFailed(Exception e) { msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage(); } connectorStatus.setMessage(msg); - connectorStatus.updateStatus(ConnectorStatus.FAILED, false); + updateConnectorStatusAndSend(ConnectorStatus.FAILED, false,true); } @Override @@ -373,8 +373,7 @@ public boolean isConnected() { @Override public void disconnect() { - connectorStatus.updateStatus(ConnectorStatus.DISCONNECTING, true); - sendConnectorLifecycle(); + updateConnectorStatusAndSend(ConnectorStatus.DISCONNECTING, true,true); log.info("Tenant {} - Disconnecting from MQTT broker: {}", tenant, (mqttClient == null ? null : mqttClient.getConfig().getServerHost())); @@ -401,8 +400,7 @@ public void disconnect() { log.error("Tenant {} - Error disconnecting from MQTT broker:", tenant, e); } - connectorStatus.updateStatus(ConnectorStatus.DISCONNECTED, true); - sendConnectorLifecycle(); + updateConnectorStatusAndSend(ConnectorStatus.DISCONNECTED, true,true); log.info("Tenant {} - Disconnected from MQTT broker II: {}", tenant, mqttClient.getConfig().getServerHost()); } diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java index 4d6f2657..d3d2f947 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/C8YAgent.java @@ -98,6 +98,8 @@ @Component public class C8YAgent implements ImportBeanDefinitionRegistrar { + ConnectorStatus previousConnectorStatus = ConnectorStatus.UNKNOWN; + @Autowired private EventApi eventApi; @@ -422,7 +424,8 @@ public void loadProcessorExtensions(String tenant) { external); } } catch (IOException e) { - log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ", tenant, + log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ", + tenant, e); } } @@ -611,7 +614,9 @@ public void createExtensibleProcessor(String tenant) { } public void sendNotificationLifecycle(String tenant, ConnectorStatus connectorStatus, String message) { - if (configurationRegistry.getServiceConfigurations().get(tenant).sendNotificationLifecycle) { + if (configurationRegistry.getServiceConfigurations().get(tenant).sendNotificationLifecycle + && !(connectorStatus.equals(previousConnectorStatus))) { + previousConnectorStatus = connectorStatus; DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date now = new Date(); String date = dateFormat.format(now); diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/MappingComponent.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/MappingComponent.java index b743c59e..5213512e 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/core/MappingComponent.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/core/MappingComponent.java @@ -173,32 +173,6 @@ public void sendMappingStatus(String tenant) { } } - public void sendConnectorLifecycle(String tenant, String connectorIdent, ConnectorStatusEvent connectorStatus, - String connectorName) { - if (configurationRegistry.getServiceConfigurations().get(tenant).sendConnectorLifecycle) { - subscriptionsService.runForTenant(tenant, () -> { - MappingServiceRepresentation mappingServiceRepresentation = configurationRegistry - .getMappingServiceRepresentations().get(tenant); - Map> ccs = consolidatedConnectorStatus.getOrDefault(tenant, - new HashMap>()); - log.debug("Tenant {} - Sending status connector: {}", tenant, ccs); - Map stMap = Map.ofEntries( - entry("status", connectorStatus.getStatus().name()), - entry("message", connectorStatus.message), - entry("connectorName", connectorName), - entry("date", connectorStatus.date)); - ccs.put(connectorIdent, stMap); - consolidatedConnectorStatus.put(tenant, ccs); - Map service = new HashMap(); - service.put(C8YAgent.CONNECTOR_FRAGMENT, ccs); - ManagedObjectRepresentation updateMor = new ManagedObjectRepresentation(); - updateMor.setId(GId.asGId(mappingServiceRepresentation.getId())); - updateMor.setAttrs(service); - this.inventoryApi.update(updateMor); - }); - } - } - public MappingStatus getMappingStatus(String tenant, Mapping m) { // log.info("Tenant {} - get MappingStatus: {}", tenant, m.ident); Map statusMapping = tenantStatusMapping.get(tenant);