Skip to content

Commit

Permalink
improve server thread; add command pipelining
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Fiddian-Green <[email protected]>
  • Loading branch information
andrewfg committed Dec 19, 2024
1 parent f02252c commit b6595be
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 221 deletions.
1 change: 1 addition & 0 deletions bundles/org.openhab.binding.govee/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Here is a list of the supported devices (the ones marked with * have been tested
- H706A Permanent Outdoor Lights Pro 30M
- H706B Permanent Outdoor Lights Pro 45M
- H706C Permanent Outdoor Lights Pro 60M
- H7070 Outdoor Projector Light (*)
- H7075 Outdoor Wall Light
- H70B1 520 LED Curtain Lights
- H70BC 400 LED Curtain Lights
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -46,18 +47,18 @@
import com.google.gson.JsonParseException;

/**
* The {@link CommunicationManager} is a thread that handles the answers of all devices.
* Therefore it needs to apply the information to the right thing.
*
* Discovery uses the same response code, so we must not refresh the status during discovery.
* The {@link CommunicationManager} component implements a sender to send commands to Govee devices,
* and implements a thread that handles the notifications from all devices. It applies the status
* information to the right Thing. It supports both discovery and status commands and notifications
* concurrently.
*
* @author Stefan Höhn - Initial contribution
* @author Danny Baumann - Thread-Safe design refactoring
* @author Andrew Fiddian-Green - Extensive refactoring
* @author Andrew Fiddian-Green - New threading model using java.nio channel
*/
@NonNullByDefault
@Component(service = CommunicationManager.class)
public class CommunicationManager implements Runnable {
public class CommunicationManager {
private final Logger logger = LoggerFactory.getLogger(CommunicationManager.class);
private final Gson gson = new Gson();

Expand All @@ -66,8 +67,11 @@ public class CommunicationManager implements Runnable {

private @Nullable GoveeDiscoveryListener discoveryListener;
private @Nullable Thread serverThread;
private @Nullable DatagramSocket serverSocket;
private boolean serverStopFlag = false;

private final Object paramsLock = new Object();
private final Object serverLock = new Object();
private final Object senderLock = new Object();

private static final String DISCOVERY_MULTICAST_ADDRESS = "239.255.255.250";
private static final int DISCOVERY_PORT = 4001;
Expand All @@ -87,56 +91,58 @@ public interface GoveeDiscoveryListener {

@Activate
public CommunicationManager() {
serverStart();
}

@Deactivate
public void deactivate() {
thingHandlerListeners.clear();
discoveryListener = null;
listenerCountDecreased();
serverStop();
}

/**
* Thing handlers register themselves to receive state updates when they are initalized.
* Thing handlers register themselves to receive state updates when they are initialized.
*/
public void registerHandler(GoveeHandler handler) {
thingHandlerListeners.put(ipAddressFrom(handler.getHostname()), handler);
listenerCountIncreased();
}

/**
* Thing handlers unregister themselves when they are destroyed.
*/
public void unregisterHandler(GoveeHandler handler) {
thingHandlerListeners.remove(ipAddressFrom(handler.getHostname()));
listenerCountDecreased();
}

/**
* Send a unicast command request to the device.
*/
public void sendRequest(GoveeHandler handler, GenericGoveeRequest request) throws IOException {
try (DatagramSocket socket = new DatagramSocket()) {
socket.setReuseAddress(true);
String message = gson.toJson(request);
byte[] data = message.getBytes();
String hostname = handler.getHostname();
InetAddress address = InetAddress.getByName(hostname);
DatagramPacket packet = new DatagramPacket(data, data.length, address, REQUEST_PORT);
socket.send(packet);
logger.trace("Sent request to {} on {} with content = {}", handler.getThing().getUID(),
address.getHostAddress(), message);
serverStart();
synchronized (senderLock) {
try (DatagramSocket socket = new DatagramSocket()) {
socket.setReuseAddress(true);
String message = gson.toJson(request);
byte[] data = message.getBytes();
String hostname = handler.getHostname();
InetAddress address = InetAddress.getByName(hostname);
DatagramPacket packet = new DatagramPacket(data, data.length, address, REQUEST_PORT);
socket.send(packet);
logger.trace("Sent request to {} on {} with content = {}", handler.getThing().getUID(),
address.getHostAddress(), message);
}
}
}

/**
* Send discovery multicast pings on any ipv4 address bound to any network interface in the given list and
* then sleep for sufficient time until responses may have been received.
* Send discovery multicast pings on any ipv4 address bound to any network interface in the given
* list and then sleep for sufficient time until responses may have been received.
*/
public void runDiscoveryForInterfaces(List<NetworkInterface> interfaces, GoveeDiscoveryListener listener) {
serverStart();
try {
discoveryListener = listener;
listenerCountIncreased();
Instant sleepUntil = Instant.now().plusSeconds(SCAN_TIMEOUT_SEC);

interfaces.parallelStream() // send on all interfaces in parallel
Expand All @@ -154,80 +160,96 @@ public void runDiscoveryForInterfaces(List<NetworkInterface> interfaces, GoveeDi
}
} finally {
discoveryListener = null;
listenerCountDecreased();
}
}

/**
* This is a {@link Runnable} 'run()' method which gets executed on the server thread.
* This method gets executed on the server thread. It uses a {@link DatagramChannel} to listen on port
* 4002 and it processes any notifications received. The task runs continuously in a loop until the
* thread is externally interrupted.
*
* <li>In case of status notifications it forwards the message to the Thing handler listener.</li>
* <li>In case of discovery notifications it forwards the message to the discovery listener.</li>
* <li>If there is neither a Thing handler listener, nor a discovery listener, it logs an error.</li>
*/
@Override
public synchronized void run() {
while (!Thread.currentThread().isInterrupted()) {
logger.trace("Server thread started");
try (DatagramSocket socket = new DatagramSocket(RESPONSE_PORT)) {
serverSocket = socket;
socket.setReuseAddress(true);
byte[] buffer = new byte[1024];
private void serverThreadTask() {
synchronized (serverLock) {
try {
logger.trace("Server thread started.");
ByteBuffer buffer = ByteBuffer.allocate(1024);

while (!Thread.currentThread().isInterrupted()) {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
socket.receive(packet);
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
return; // terminate thread
} else {
logger.debug("Unexpected receive exception {}", e.getMessage());
break; // recycle socket and retry
}
}
while (!serverStopFlag) {
try (DatagramChannel channel = DatagramChannel.open()
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(RESPONSE_PORT))) {

String notification = new String(packet.getData(), packet.getOffset(), packet.getLength());
String ipAddress = packet.getAddress().toString().replace("/", "");
logger.trace("Received notification from {} with content = {}", ipAddress, notification);
while (!serverStopFlag) {
String sourceIp = "";
try {
SocketAddress socketAddress = channel.receive(buffer.clear());
if ((socketAddress instanceof InetSocketAddress inetSocketAddress)
&& (inetSocketAddress.getAddress() instanceof InetAddress inetAddress)) {
sourceIp = inetAddress.getHostAddress();
} else {
logger.debug("Receive() - bad socketAddress={}", socketAddress);
return;
}
} catch (ClosedByInterruptException e) {
// thrown if 'Thread.interrupt()' is called during 'channel.receive()'
logger.trace(
"Receive exception=ClosedByInterruptException, isInterrupted={}, serverStopFlag={}",
Thread.currentThread().isInterrupted(), serverStopFlag);
Thread.interrupted(); // clear 'interrupted' flag
if (serverStopFlag) {
return;
} else {
break;
}
} catch (IOException e) {
logger.debug("Receive unexpected exception={}", e.getMessage());
break;
}

GoveeHandler handler = thingHandlerListeners.get(ipAddress);
boolean isStatusNotification = notification.contains("devStatus");
GoveeDiscoveryListener discoveryListener = this.discoveryListener;
String message = new String(buffer.array(), 0, buffer.position());
logger.trace("Receive from sourceIp={}, message={}", sourceIp, message);

// it is a status notification and there is a Thing handler listener, so notify that listener
if (handler != null && isStatusNotification) {
logger.debug("Sending state notification to {} on {}", handler.getThing().getUID(), ipAddress);
handler.handleIncomingStatus(notification);
continue;
}
GoveeHandler handler = thingHandlerListeners.get(sourceIp);
boolean devStatus = message.contains("devStatus");
if (handler != null && devStatus) {
logger.debug("Notifying status of thing={} on sourcecIp={}",
handler.getThing().getUID(), sourceIp);
handler.handleIncomingStatus(message);
continue;
}

// it is a discovery notification and there is a discoveryListener, so notify that listener
if (!isStatusNotification && discoveryListener != null) {
// send discovery notification only when there is no Thing handler listener
if (handler == null) {
try {
DiscoveryResponse response = gson.fromJson(notification, DiscoveryResponse.class);
if (response != null) {
logger.debug("Notifying potential new Thing discovered on {}", ipAddress);
discoveryListener.onDiscoveryResponse(response);
GoveeDiscoveryListener discoveryListener = this.discoveryListener;
if (!devStatus && discoveryListener != null) {
try {
DiscoveryResponse response = gson.fromJson(message, DiscoveryResponse.class);
if (response != null) {
logger.debug("Notifying discovery of device on sourceIp={}", sourceIp);
discoveryListener.onDiscoveryResponse(response);
}
} catch (JsonParseException e) {
logger.debug("Discovery notification parse exception={}", e.getMessage());
}
} catch (JsonParseException e) {
logger.debug("Failed to parse discovery notification", e);
continue;
}

logger.warn(
"Unhandled message with sourceIp={}, devStatus={}, handler={}, discoveryListener={}",
sourceIp, devStatus, handler, discoveryListener);
}
continue;
} catch (IOException e) {
logger.debug("Datagram channel create exception={}", e.getMessage());
}

// none of the above conditions apply just so log it
logger.warn(
"Unhandled notification for ipAddress:{}, handler:{}, stateNotification:{}, discoveryListener:{}",
ipAddress, handler, isStatusNotification, discoveryListener);
} // {while}
} catch (SocketException e) {
logger.debug("Unexpected socket exception {}", e.getMessage());
}
} finally {
serverSocket = null;
serverThread = null;
logger.trace("Server thread finished");
serverStopFlag = false;
logger.trace("Server thread terminated.");
}
} // {while}
}
}

/**
Expand All @@ -242,37 +264,29 @@ private static String ipAddressFrom(String host) {
}

/**
* Call this after one or more listeners have been added.
* Starts the server thread if it is not already running.
*/
private void listenerCountIncreased() {
synchronized (serverLock) {
Thread thread = serverThread;
if ((thread == null) || thread.isInterrupted() || !thread.isAlive()) {
thread = new Thread(this, "OH-binding-" + GoveeBindingConstants.BINDING_ID);
serverThread = thread;
thread.start();
private void serverStart() {
synchronized (paramsLock) {
Thread serverthread = serverThread;
if (serverthread == null) {
serverthread = new Thread(this::serverThreadTask, "OH-binding-" + GoveeBindingConstants.BINDING_ID);
serverThread = serverthread;
serverStopFlag = false;
serverthread.start();
}
}
}

/**
* Call this after one or more listeners have been removed.
* Stops the server thread when listener count reaches zero.
* Stops the server thread.
*/
private void listenerCountDecreased() {
synchronized (serverLock) {
if (thingHandlerListeners.isEmpty() && (discoveryListener == null)) {
Thread thread = serverThread;
DatagramSocket socket = serverSocket;
if (thread != null) {
thread.interrupt(); // set interrupt flag before closing socket
}
if (socket != null) {
socket.close();
}
serverThread = null;
serverSocket = null;
private void serverStop() {
synchronized (paramsLock) {
serverStopFlag = true;
Thread serverthread = serverThread;
if (serverthread != null) {
serverthread.interrupt();
}
}
}
Expand Down
Loading

0 comments on commit b6595be

Please sign in to comment.