Skip to content

Commit

Permalink
Merge pull request #67 from SoftwareAG/develop
Browse files Browse the repository at this point in the history
Adding monitoring of connector connections
  • Loading branch information
ck-c8y authored Dec 23, 2023
2 parents d1abbb6 + fe7e8d1 commit 65eb0e1
Show file tree
Hide file tree
Showing 67 changed files with 1,793 additions and 1,687 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
import dynamic.mapping.configuration.ServiceConfiguration;
import dynamic.mapping.connector.core.callback.ConnectorMessage;
import dynamic.mapping.core.C8YAgent;
import dynamic.mapping.core.ConnectorStatus;
import dynamic.mapping.core.ConnectorStatusEvent;
import dynamic.mapping.core.MappingComponent;
import dynamic.mapping.core.Status;
import dynamic.mapping.core.ConnectorStatus;
import dynamic.mapping.processor.model.ProcessingContext;

@Slf4j
Expand Down Expand Up @@ -110,14 +110,13 @@ public abstract class AConnectorClient {
@Setter
public ConnectorConfiguration configuration;


@Getter
@Setter
public ServiceConfiguration serviceConfiguration;

@Getter
@Setter
public ConnectorStatus connectorStatus = ConnectorStatus.unknown();
public ConnectorStatusEvent connectorStatus = ConnectorStatusEvent.unknown();

public void submitInitialize() {
// test if init task is still running, then we don't need to start another task
Expand All @@ -133,7 +132,7 @@ public void submitInitialize() {

public void loadConfiguration() {
configuration = connectorConfigurationComponent.getConnectorConfiguration(this.getConnectorIdent(), tenant);
connectorStatus.updateStatus(Status.CONFIGURED);
connectorStatus.updateStatus(ConnectorStatus.CONFIGURED);
connectorStatus.clearMessage();
sendConnectorLifecycle();
// log.info("Tenant {} - DANGEROUS-LOG reload configuration: {} , {}", tenant,
Expand All @@ -149,7 +148,7 @@ public void submitConnect() {
if (connectTask == null || connectTask.isDone()) {
connectTask = cachedThreadPool.submit(() -> connect());
}
connectorStatus.updateStatus(Status.CONNECTING);
connectorStatus.updateStatus(ConnectorStatus.CONNECTING);
connectorStatus.clearMessage();
sendConnectorLifecycle();
}
Expand All @@ -162,7 +161,7 @@ public void submitDisconnect() {
if (connectTask == null || connectTask.isDone()) {
connectTask = cachedThreadPool.submit(() -> disconnect());
}
connectorStatus.updateStatus(Status.DISCONNECTING);
connectorStatus.updateStatus(ConnectorStatus.DISCONNECTING);
connectorStatus.clearMessage();
sendConnectorLifecycle();
}
Expand All @@ -182,7 +181,9 @@ public void submitHouskeeping() {
* Should return true when connector is enabled and provided properties are
* valid
***/
public abstract boolean shouldConnect();
public boolean shouldConnect() {
return isConfigValid(configuration) && configuration.isEnabled();
}

/***
* Returns true if the connector is currently connected
Expand Down Expand Up @@ -246,15 +247,16 @@ public void runHouskeeping() {
}
mappingComponent.cleanDirtyMappings(tenant);
mappingComponent.sendMappingStatus(tenant);
mappingComponent.sendConnectorLifecycle(tenant, getConnectorStatus(), getConnectorIdent(), getConnectorName());
mappingComponent.sendConnectorLifecycle(tenant, getConnectorStatus(), getConnectorIdent(),
getConnectorName());
sendConnectorLifecycle();
} catch (Exception ex) {
log.error("Error during house keeping execution: {}", ex);
}
}

public boolean hasError() {
return !(connectorStatus.status).equals(Status.FAILED);
return !(connectorStatus.status).equals(ConnectorStatus.FAILED);
}

public List<ProcessingContext<?>> test(String topic, boolean send, Map<String, Object> payload)
Expand Down Expand Up @@ -424,7 +426,7 @@ public void sendConnectorLifecycle() {
}

public void sendSubscriptionEvents(String topic, String action) {
if(serviceConfiguration.sendSubscriptionEvents) {
if (serviceConfiguration.sendSubscriptionEvents) {
String msg = action + " topic: " + topic;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
Expand All @@ -436,7 +438,7 @@ public void sendSubscriptionEvents(String topic, String action) {
c8yAgent.createEvent(msg,
STATUS_SUBSCRIPTION_EVENT_TYPE,
DateTime.now(), mappingServiceRepresentation, tenant, stMap);
}
}
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import dynamic.mapping.connector.core.ConnectorProperty;
import dynamic.mapping.core.C8YAgent;
import dynamic.mapping.core.MappingComponent;
import dynamic.mapping.core.Status;
import dynamic.mapping.core.ConnectorStatus;

@Slf4j
// @EnableScheduling
Expand Down Expand Up @@ -260,7 +260,7 @@ tenant, isConnected(),
mqttClient.connect(connOpts);
log.info("Tenant {} - Successfully connected to broker {}", tenant,
mqttClient.getServerURI());
connectorStatus.updateStatus(Status.CONNECTED);
connectorStatus.updateStatus(ConnectorStatus.CONNECTED);
connectorStatus.clearMessage();
sendConnectorLifecycle();
} catch (MqttException e) {
Expand Down Expand Up @@ -313,7 +313,7 @@ private void updateConnectorStatusToFailed(Exception e) {
msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage();
}
connectorStatus.setMessage(msg);
connectorStatus.updateStatus(Status.FAILED);
connectorStatus.updateStatus(ConnectorStatus.FAILED);
}

@Override
Expand All @@ -327,11 +327,6 @@ public void close() {
}
}

@Override
public boolean shouldConnect() {
return isConfigValid(configuration) && configuration.isEnabled();
}

@Override
public boolean isConfigValid(ConnectorConfiguration configuration) {
if (configuration == null)
Expand Down Expand Up @@ -380,7 +375,7 @@ public void disconnect() {
});
mqttClient.unsubscribe("$SYS");
mqttClient.disconnect();
connectorStatus.updateStatus(Status.DISCONNECTED);
connectorStatus.updateStatus(ConnectorStatus.DISCONNECTED);
connectorStatus.clearMessage();
sendConnectorLifecycle();
log.info("Tenant {} - Disconnected from MQTT broker II: {}", tenant, mqttClient.getServerURI());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,13 @@

package dynamic.mapping.core;

import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.validation.constraints.NotNull;

import lombok.Data;

@Data
public class ConnectorStatus implements Serializable {
@NotNull
public Status status;

@NotNull
public String message;

@NotNull
public String date;

public ConnectorStatus() {
this.status = Status.UNKNOWN;
}

public ConnectorStatus(Status status) {
this.status = status;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
this.date = dateFormat.format(now);
this.message = "";
}

public static ConnectorStatus connected() {
return new ConnectorStatus(Status.CONNECTED);
}

public static ConnectorStatus disconnected() {
return new ConnectorStatus(Status.DISCONNECTED);
}

public static ConnectorStatus enabled() {
return new ConnectorStatus(Status.ENABLED);
}

public static ConnectorStatus failed(String errorMessage) {
return new ConnectorStatus(Status.FAILED);
}

public static ConnectorStatus unknown() {
return new ConnectorStatus(Status.UNKNOWN);
}

public void updateStatus(Status st) {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
date = dateFormat.format(now);
status = st;
}

public void clearMessage() {
this.message = "";
}
public enum ConnectorStatus {
UNKNOWN,
CONFIGURED,
ENABLED,
CONNECTING,
CONNECTED,
DISCONNECTED,
DISCONNECTING,
FAILED,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2022 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA,
* and/or its subsidiaries and/or its affiliates and/or their licensors.
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @authors Christof Strack, Stefan Witschel
*/

package dynamic.mapping.core;

import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.validation.constraints.NotNull;

import lombok.Data;

@Data
public class ConnectorStatusEvent implements Serializable {
@NotNull
public ConnectorStatus status;

@NotNull
public String message;

@NotNull
public String date;

public ConnectorStatusEvent() {
this.status = ConnectorStatus.UNKNOWN;
}

public ConnectorStatusEvent(ConnectorStatus status) {
this.status = status;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
this.date = dateFormat.format(now);
this.message = "";
}

public static ConnectorStatusEvent connected() {
return new ConnectorStatusEvent(ConnectorStatus.CONNECTED);
}

public static ConnectorStatusEvent disconnected() {
return new ConnectorStatusEvent(ConnectorStatus.DISCONNECTED);
}

public static ConnectorStatusEvent enabled() {
return new ConnectorStatusEvent(ConnectorStatus.ENABLED);
}

public static ConnectorStatusEvent failed(String errorMessage) {
return new ConnectorStatusEvent(ConnectorStatus.FAILED);
}

public static ConnectorStatusEvent unknown() {
return new ConnectorStatusEvent(ConnectorStatus.UNKNOWN);
}

public void updateStatus(ConnectorStatus st) {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
date = dateFormat.format(now);
status = st;
}

public void clearMessage() {
this.message = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void sendMappingStatus(String tenant) {
}
}

public void sendConnectorLifecycle(String tenant, ConnectorStatus connectorStatus, String connectorIdent,
public void sendConnectorLifecycle(String tenant, ConnectorStatusEvent connectorStatus, String connectorIdent,
String connectorName) {
if (serviceConfigurations.get(tenant).sendConnectorLifecycle) {
subscriptionsService.runForTenant(tenant, () -> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import dynamic.mapping.core.ConnectorStatus;
import dynamic.mapping.core.ConnectorStatusEvent;

@Data
@NoArgsConstructor
Expand Down Expand Up @@ -57,6 +57,6 @@ public class MappingServiceRepresentation implements Serializable {
private ArrayList<MappingStatus> mappingStatus;

@JsonProperty(value = CONNECTOR_FRAGMENT)
private ConnectorStatus connectorStatus;
private ConnectorStatusEvent connectorStatus;

}
Loading

0 comments on commit 65eb0e1

Please sign in to comment.