Skip to content

Commit

Permalink
Avoid sending duplicate status messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Mar 17, 2024
1 parent 0b30b49 commit 6a15b95
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public abstract class AConnectorClient {

private Instant start = Instant.now();

private ConnectorStatus previousConnectorStatus = ConnectorStatus.UNKNOWN;

@Getter
@Setter
public ConnectorConfiguration connectorConfiguration;
Expand Down Expand Up @@ -145,8 +147,7 @@ public void loadConfiguration() {
serviceConfiguration = serviceConfigurationComponent.getServiceConfiguration(tenant);
configurationRegistry.getServiceConfigurations().put(tenant, serviceConfiguration);

connectorStatus.updateStatus(ConnectorStatus.CONFIGURED, true);
sendConnectorLifecycle();
updateConnectorStatusAndSend(ConnectorStatus.CONFIGURED, true, true);
}

public void submitConnect() {
Expand Down Expand Up @@ -261,9 +262,11 @@ public void runHousekeeping() {
// check if connector is in DISCONNECTED state and then move it to CONFIGURED
// state.
if (ConnectorStatus.DISCONNECTED.equals(connectorStatus.status) && isConfigValid(connectorConfiguration)) {
connectorStatus.updateStatus(ConnectorStatus.CONFIGURED, true);
updateConnectorStatusAndSend(ConnectorStatus.CONFIGURED, true, true);
}
sendConnectorLifecycle();
// else {
// sendConnectorLifecycle();
// }
} catch (Exception ex) {
log.error("Tenant {} - Error during house keeping execution: ", tenant, ex);
}
Expand Down Expand Up @@ -474,6 +477,14 @@ public void connectionLost(String closeMessage, Throwable closeException) {
reconnect();
}

public void updateConnectorStatusAndSend(ConnectorStatus status, boolean clearMessage, boolean send) {
previousConnectorStatus = connectorStatus.getStatus();
connectorStatus.updateStatus(status, clearMessage);
if (send && !(status.equals(previousConnectorStatus))) {
sendConnectorLifecycle();
}
}

@Data
@AllArgsConstructor
public static class Certificate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ public ConnectorSpecification getSpecification() {

@Override
public void connect() {
connectorStatus.updateStatus(ConnectorStatus.CONNECTING, true);
sendConnectorLifecycle();
updateConnectorStatusAndSend(ConnectorStatus.CONNECTING, true, true);

log.info("Tenant {} - Establishing the MQTT connection now - phase I: (isConnected:shouldConnect) ({}:{})",
tenant, isConnected(),
shouldConnect());
Expand Down Expand Up @@ -287,8 +287,7 @@ tenant, isConnected(),
// connectionState.setTrue();
log.info("Tenant {} - Successfully connected to broker {}", tenant,
mqttClient.getConfig().getServerHost());
connectorStatus.updateStatus(ConnectorStatus.CONNECTED, true);
sendConnectorLifecycle();
updateConnectorStatusAndSend(ConnectorStatus.CONNECTED, true,true);
} catch (Exception e) {
log.error("Tenant {} - Failed to connect to broker {}, {}, {}, {}", tenant,
mqttClient.getConfig().getServerHost(), e.getMessage(), connectionState.booleanValue(),
Expand Down Expand Up @@ -330,14 +329,15 @@ tenant, isConnected(),
}
}


private void updateConnectorStatusToFailed(Exception e) {
String msg = " --- " + e.getClass().getName() + ": "
+ e.getMessage();
if (!(e.getCause() == null)) {
msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage();
}
connectorStatus.setMessage(msg);
connectorStatus.updateStatus(ConnectorStatus.FAILED, false);
updateConnectorStatusAndSend(ConnectorStatus.FAILED, false,true);
}

@Override
Expand Down Expand Up @@ -373,8 +373,7 @@ public boolean isConnected() {

@Override
public void disconnect() {
connectorStatus.updateStatus(ConnectorStatus.DISCONNECTING, true);
sendConnectorLifecycle();
updateConnectorStatusAndSend(ConnectorStatus.DISCONNECTING, true,true);
log.info("Tenant {} - Disconnecting from MQTT broker: {}", tenant,
(mqttClient == null ? null : mqttClient.getConfig().getServerHost()));

Expand All @@ -401,8 +400,7 @@ public void disconnect() {
log.error("Tenant {} - Error disconnecting from MQTT broker:", tenant,
e);
}
connectorStatus.updateStatus(ConnectorStatus.DISCONNECTED, true);
sendConnectorLifecycle();
updateConnectorStatusAndSend(ConnectorStatus.DISCONNECTED, true,true);
log.info("Tenant {} - Disconnected from MQTT broker II: {}", tenant,
mqttClient.getConfig().getServerHost());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
@Component
public class C8YAgent implements ImportBeanDefinitionRegistrar {

ConnectorStatus previousConnectorStatus = ConnectorStatus.UNKNOWN;

@Autowired
private EventApi eventApi;

Expand Down Expand Up @@ -422,7 +424,8 @@ public void loadProcessorExtensions(String tenant) {
external);
}
} catch (IOException e) {
log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ", tenant,
log.error("Tenant {} - Exception occurred, When loading extension, starting without extensions: ",
tenant,
e);
}
}
Expand Down Expand Up @@ -611,7 +614,9 @@ public void createExtensibleProcessor(String tenant) {
}

public void sendNotificationLifecycle(String tenant, ConnectorStatus connectorStatus, String message) {
if (configurationRegistry.getServiceConfigurations().get(tenant).sendNotificationLifecycle) {
if (configurationRegistry.getServiceConfigurations().get(tenant).sendNotificationLifecycle
&& !(connectorStatus.equals(previousConnectorStatus))) {
previousConnectorStatus = connectorStatus;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
String date = dateFormat.format(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,6 @@ public void sendMappingStatus(String tenant) {
}
}

public void sendConnectorLifecycle(String tenant, String connectorIdent, ConnectorStatusEvent connectorStatus,
String connectorName) {
if (configurationRegistry.getServiceConfigurations().get(tenant).sendConnectorLifecycle) {
subscriptionsService.runForTenant(tenant, () -> {
MappingServiceRepresentation mappingServiceRepresentation = configurationRegistry
.getMappingServiceRepresentations().get(tenant);
Map<String, Map<String, String>> ccs = consolidatedConnectorStatus.getOrDefault(tenant,
new HashMap<String, Map<String, String>>());
log.debug("Tenant {} - Sending status connector: {}", tenant, ccs);
Map<String, String> stMap = Map.ofEntries(
entry("status", connectorStatus.getStatus().name()),
entry("message", connectorStatus.message),
entry("connectorName", connectorName),
entry("date", connectorStatus.date));
ccs.put(connectorIdent, stMap);
consolidatedConnectorStatus.put(tenant, ccs);
Map<String, Object> service = new HashMap<String, Object>();
service.put(C8YAgent.CONNECTOR_FRAGMENT, ccs);
ManagedObjectRepresentation updateMor = new ManagedObjectRepresentation();
updateMor.setId(GId.asGId(mappingServiceRepresentation.getId()));
updateMor.setAttrs(service);
this.inventoryApi.update(updateMor);
});
}
}

public MappingStatus getMappingStatus(String tenant, Mapping m) {
// log.info("Tenant {} - get MappingStatus: {}", tenant, m.ident);
Map<String, MappingStatus> statusMapping = tenantStatusMapping.get(tenant);
Expand Down

0 comments on commit 6a15b95

Please sign in to comment.