Skip to content

Commit

Permalink
Introduce reconnection handling for wireless mbus bridges.
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <[email protected]>
  • Loading branch information
splatch committed Jul 30, 2024
1 parent 191c9a1 commit 1941bd2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.connectorio.addons.binding.handler.GenericBridgeHandlerBase;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.KeyStore;
Expand All @@ -30,10 +34,8 @@
import org.connectorio.addons.binding.wmbus.internal.security.MutableKeyStore;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
import org.openhab.core.thing.binding.ThingHandler;
import org.openhab.core.thing.binding.ThingHandlerService;
import org.openhab.core.types.Command;
import org.openmuc.jmbus.wireless.WMBusConnection;
Expand All @@ -58,19 +60,45 @@ public WMBusBridgeBaseHandler(Bridge bridge, DiscoveryCoordinator discoveryCoord
@Override
public void initialize() {
DefaultWMBusMessageDispatcher messageDispatcher = new DefaultWMBusMessageDispatcher();
connection = initializeConnection(messageDispatcher);
connection.whenComplete((connection, error) -> {
BiConsumer<WMBusConnection, Throwable> callback = (connection, error) -> {
if (error != null) {
logger.error("Could not open serial connection", error);
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Could not open port");
return;
}
updateStatus(ThingStatus.ONLINE);
dispatcher.complete(messageDispatcher);
};

open(callback, messageDispatcher, new Consumer<>() {
@Override
public void accept(IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Error in receiver thread " + e.getMessage());
WMBusConnection connection = WMBusBridgeBaseHandler.this.connection.getNow(null);
if (connection != null) {
try {
connection.close();
} catch (IOException ex) {
logger.warn("Failed to close connection", e);
}
}

open(callback, messageDispatcher, this);
}
});
}

protected abstract CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher);
private void open(BiConsumer<WMBusConnection, Throwable> callback, WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
connection = initializeConnection(dispatcher, reconnect);
connection.whenComplete(callback);
}
}, 5L, TimeUnit.SECONDS);
}

protected abstract CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect);

@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.SerialBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
Expand All @@ -39,7 +41,8 @@ public WMBusSerialBridgeHandler(Bridge bridge, SerialPortManager serialPortManag
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher,
Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);

Expand All @@ -60,9 +63,7 @@ protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDi
@Override
public void run() {
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
SerialTransportBuilder builder = new SerialTransportBuilder(serialPortManager, config.manufacturer, listener, config.serialPort);
builder.setMode(config.mode);
builder.setSerialPortConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.SerialBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
import org.connectorio.addons.binding.wmbus.internal.transport.WMBusMessageListenerAdapter;
import org.openhab.core.io.transport.serial.SerialPortManager;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
Expand All @@ -38,7 +39,8 @@ public WMBusSerialJrxtxBridgeHandler(Bridge bridge, DiscoveryCoordinator discove
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher,
Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);

Expand All @@ -59,9 +61,7 @@ protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDi
@Override
public void run() {
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
WMBusSerialBuilder builder = new WMBusSerialBuilder(
config.manufacturer, listener, config.serialPort
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.TcpBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
Expand All @@ -37,7 +39,7 @@ public WMBusTcpBridgeHandler(Bridge bridge, DiscoveryCoordinator discoveryCoordi
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
scheduler.execute(new Runnable() {
@Override
Expand All @@ -56,9 +58,7 @@ public void run() {
return;
}

WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
WMBusTcpBuilder builder = new WMBusTcpBuilder(
config.manufacturer, listener, config.hostAddress.trim(), config.port
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class WMBusMessageListenerAdapter implements WMBusListener {

private final Logger logger = LoggerFactory.getLogger(WMBusMessageListenerAdapter.class);
private final WMBusMessageDispatcher dispatcher;
private final Consumer<String> offlineCallback;
private final Consumer<IOException> offlineCallback;

public WMBusMessageListenerAdapter(WMBusMessageDispatcher dispatcher, Consumer<String> offlineCallback) {
public WMBusMessageListenerAdapter(WMBusMessageDispatcher dispatcher, Consumer<IOException> offlineCallback) {
this.dispatcher = dispatcher;
this.offlineCallback = offlineCallback;
}
Expand All @@ -53,7 +53,7 @@ public void discardedBytes(byte[] bytes) {
public void stoppedListening(IOException cause) {
logger.error("Error while reading serial data stream", cause);

offlineCallback.accept("Error in receiver thread " + cause.getMessage());
offlineCallback.accept(cause);
}

}

0 comments on commit 1941bd2

Please sign in to comment.