From 4ff8cd5d05ed1c5c3f12459ab520e7f32e43feb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81lvarez?= Date: Mon, 24 Jun 2024 14:29:34 +0200 Subject: [PATCH] [openhabcloud] Handle WebSocket connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miguel Álvarez --- .../io/openhabcloud/internal/CloudClient.java | 361 +++++++++++++++++- 1 file changed, 360 insertions(+), 1 deletion(-) diff --git a/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java b/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java index c7bc8790dfe8f..c485404f9e55b 100644 --- a/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java +++ b/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java @@ -18,28 +18,49 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLEncoder; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpConversation; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.client.HttpResponseException; +import org.eclipse.jetty.client.ProtocolHandler; +import org.eclipse.jetty.client.ResponseNotifier; import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.URIUtil; +import org.eclipse.jetty.util.thread.Locker; import org.json.JSONException; import org.json.JSONObject; import org.openhab.core.OpenHAB; @@ -70,6 +91,7 @@ * * @author Victor Belov - Initial contribution * @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs + * @author Miguel Álvarez Díez - Proxy upgraded WebSocket connections */ public class CloudClient { @@ -115,6 +137,14 @@ public class CloudClient { * This map holds HTTP requests to local openHAB which are currently running */ private final Map runningRequests = new ConcurrentHashMap<>(); + /* + * This weak map holds HTTP requests identifiers + */ + private final WeakHashMap requestIds = new WeakHashMap<>(); + /* + * This weak map holds HTTP requests upgraded websocket connection + */ + private final WeakHashMap websocketConnections = new WeakHashMap<>(); /* * This variable indicates if connection to the openHAB Cloud is currently in an established state @@ -173,6 +203,9 @@ public CloudClient(HttpClient httpClient, String uuid, String secret, String bas this.remoteAccessEnabled = remoteAccessEnabled; this.exposedItems = exposedItems; this.jettyClient = httpClient; + // configure websocket upgrade handler + jettyClient.getProtocolHandlers() + .put(new WebSocketProxyUpgrade(() -> socket, requestIds, websocketConnections, httpClient)); reconnectBackoff.setMin(RECONNECT_MIN); reconnectBackoff.setMax(RECONNECT_MAX); reconnectBackoff.setJitter(RECONNECT_JITTER); @@ -352,6 +385,7 @@ public void call(Object... args) { .on(Socket.EVENT_PING, args -> logger.debug("Socket.IO ping"))// .on(Socket.EVENT_PONG, args -> logger.debug("Socket.IO pong: {} ms", args[0]))// .on("request", args -> onEvent("request", (JSONObject) args[0]))// + .on("websocket", args -> onWSEvent((Integer) args[0], (byte[]) args[1]))// .on("cancel", args -> onEvent("cancel", (JSONObject) args[0]))// .on("command", args -> onEvent("command", (JSONObject) args[0]))// ; @@ -399,6 +433,36 @@ public void onEvent(String event, JSONObject data) { } else { logger.warn("Unsupported event from openHAB Cloud: {}", event); } + } else { + logger.warn("Ignoring incoming {} event, remote access not enabled", event); + } + } + + /** + * Callback method for socket.io client which handles the websocket connection data events, + * it writes the data to the upgraded request connection. + */ + public void onWSEvent(int requestId, byte[] data) { + logger.debug("on({}): websocket incoming {} bytes", requestId, data.length); + var request = runningRequests.get(requestId); + if (request == null) { + logger.warn("Received ws data for missed request {}", requestId); + return; + } + OpenHABWebSocketConnection webSocketConnection = websocketConnections.get(request); + if (webSocketConnection == null) { + logger.warn("WebSocket connection missed for request {}", requestId); + return; + } + if (remoteAccessEnabled) { + try { + webSocketConnection.write(ByteBuffer.wrap(data)); + } catch (IOException e) { + logger.warn("IOException writing WebSocket data: {}", e.getMessage()); + webSocketConnection.close(); + } + } else { + logger.warn("Ignoring incoming WebSocket data, remote access not enabled"); } } @@ -526,6 +590,8 @@ private void handleRequestEvent(JSONObject data) { // If successfully submitted request to http client, add it to the list of currently // running requests to be able to cancel it if needed runningRequests.put(requestId, request); + // To recover id from request instance + requestIds.put(request, requestId); } catch (JSONException | IOException | URISyntaxException e) { logger.debug("{}", e.getMessage()); } @@ -556,7 +622,12 @@ private void handleCancelEvent(JSONObject data) { // Find and abort running request Request request = runningRequests.get(requestId); if (request != null) { - request.abort(new InterruptedException()); + var webSocketConnection = websocketConnections.get(request); + if (webSocketConnection == null) { + request.abort(new InterruptedException()); + } else { + webSocketConnection.close(); + } runningRequests.remove(requestId); } } catch (JSONException e) { @@ -721,4 +792,292 @@ private static String censored(String secret) { } return secret.substring(0, 2) + "..." + secret.substring(secret.length() - 2, secret.length()); } + + /** + * A {@link ProtocolHandler} implementation that intercept websocket upgraded connections and + * creates {@link OpenHABWebSocketConnection} instances to proxy the WebSocket data between the Cloud Connector + * and the OpenHAB server. + * + * @author Miguel Álvarez Díez - Initial contribution + */ + private static class WebSocketProxyUpgrade implements ProtocolHandler { + private final Logger logger = LoggerFactory.getLogger(WebSocketProxyUpgrade.class); + private final String name = "upgrade"; + private final List protocols = List.of("websocket"); + private final Supplier ioSocketSupplier; + private final Map requestIds; + private final WeakHashMap websocketConnections; + private final HttpClient client; + + public WebSocketProxyUpgrade(Supplier ioSocketSupplier, Map requestIds, + WeakHashMap websocketConnections, HttpClient httpClient) { + this.ioSocketSupplier = ioSocketSupplier; + this.requestIds = requestIds; + this.websocketConnections = websocketConnections; + this.client = httpClient; + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean accept(Request request, Response response) { + boolean upgraded = HttpStatus.SWITCHING_PROTOCOLS_101 == response.getStatus(); + return upgraded && checkProtocol(request); + } + + protected boolean checkProtocol(Request request) { + HttpField requestUpgrade = request.getHeaders().getField(HttpHeader.UPGRADE); + return requestUpgrade != null && protocols.stream().anyMatch(requestUpgrade::contains); + } + + @Override + public Response.Listener getResponseListener() { + return new Response.Listener.Adapter() { + @Override + public void onComplete(Result result) { + HttpResponse response = (HttpResponse) result.getResponse(); + HttpRequest request = (HttpRequest) response.getRequest(); + if (result.isSucceeded()) { + if (!requestIds.containsKey(request)) { + throw new HttpResponseException("Upgrade without request id", response); + } + int requestId = requestIds.get(request); + logger.debug("Upgrading request {}", requestId); + try { + HttpConversation conversation = request.getConversation(); + EndPoint endPoint = (EndPoint) conversation.getAttribute(EndPoint.class.getName()); + if (endPoint == null) + throw new HttpResponseException("Upgrade without " + EndPoint.class.getSimpleName(), + response); + OpenHABWebSocketConnection ohWebSocketConnection = new OpenHABWebSocketConnection(requestId, + endPoint, response, ioSocketSupplier, client); + websocketConnections.put(request, ohWebSocketConnection); + endPoint.upgrade(ohWebSocketConnection); + // disable further request listeners + conversation.updateResponseListeners(null); + } catch (Throwable x) { + forwardFailureComplete(request, null, response, x); + } + } else { + forwardFailureComplete(request, result.getRequestFailure(), response, + result.getResponseFailure()); + } + } + }; + } + + private void forwardFailureComplete(HttpRequest request, Throwable requestFailure, Response response, + Throwable responseFailure) { + HttpConversation conversation = request.getConversation(); + conversation.updateResponseListeners(null); + List responseListeners = conversation.getResponseListeners(); + ResponseNotifier notifier = new ResponseNotifier(); + notifier.forwardFailure(responseListeners, response, responseFailure); + notifier.notifyComplete(responseListeners, new Result(request, requestFailure, response, responseFailure)); + } + } + + /** + * A {@link Connection} implementation used to proxy WebSocket connections between OpenHAB and the Cloud connector. + * It emits the response headers to the connector on initialization, then emits the incoming socket data and + * allows to write to the socket. + */ + private static class OpenHABWebSocketConnection extends AbstractConnection + implements Runnable, Connection.UpgradeTo { + private final Logger logger = LoggerFactory.getLogger(OpenHABWebSocketConnection.class); + private final int requestId; + private final EndPoint endPoint; + private final HttpResponse upgradeResponse; + private final Supplier socketIOSupplier; + private final Locker lock; + private final ByteBufferPool byteBufferPool; + private final AtomicBoolean isFilling = new AtomicBoolean(false); + private final AtomicBoolean writing = new AtomicBoolean(false); + private final ConcurrentLinkedDeque writeQueue = new ConcurrentLinkedDeque<>(); + private @Nullable RetainableByteBuffer networkBuffer; + + public OpenHABWebSocketConnection(int requestId, EndPoint endPoint, HttpResponse upgradeResponse, + Supplier socketIOSupplier, HttpClient client) { + super(endPoint, client.getExecutor()); + this.requestId = requestId; + this.endPoint = endPoint; + this.upgradeResponse = upgradeResponse; + this.socketIOSupplier = socketIOSupplier; + this.byteBufferPool = client.getByteBufferPool(); + this.lock = new Locker(); + setInputBufferSize(4 * 1024); // Default jetty WebSocket network transport read size + emitUpgradeHeaders(); + } + + private void emitUpgradeHeaders() { + logger.debug("onUpgradeHeaders {}", requestId); + JSONObject upgradeJson = new JSONObject(); + JSONObject headersJSON = new JSONObject(); + try { + for (HttpField field : upgradeResponse.getHeaders()) { + headersJSON.put(field.getName(), field.getValue()); + } + upgradeJson.put("id", requestId); + upgradeJson.put("headers", headersJSON); + upgradeJson.put("responseStatusCode", upgradeResponse.getStatus()); + upgradeJson.put("responseStatusText", upgradeResponse.getReason()); + } catch (JSONException e) { + logger.debug("Error writing upgrade headers: {}", e.getMessage()); + close(); + return; + } + var socketIO = socketIOSupplier.get(); + if (socketIO == null || !socketIO.connected()) { + logger.warn("Missing socket"); + close(); + return; + } + socketIO.emit("responseHeader", upgradeJson); + } + + public synchronized void write(ByteBuffer byteBuffer) throws IOException { + if (writing.compareAndSet(true, true)) { + writeQueue.add(byteBuffer); + } else { + endPoint.write(Callback.from(this::writeNext, this::onWriteError), byteBuffer); + } + } + + private synchronized void writeNext() { + if (writeQueue.isEmpty()) { + writing.set(false); + } else { + endPoint.write(Callback.from(this::writeNext, this::onWriteError), writeQueue.getFirst()); + } + } + + private void onWriteError(Throwable throwable) { + logger.warn("WebSocket write error: {}", throwable.getMessage()); + close(); + } + + @Override + public void run() { + startReadAsync(); + } + + private void startReadAsync() { + acquireNetworkBuffer(); + try { + int totalRead = 0; + while (true) { + if (!getEndPoint().isOpen()) { + releaseNetworkBuffer(); + return; + } + assert (networkBuffer != null); + int read = endPoint.fill(networkBuffer.getBuffer()); + if (read == 0) { + emitDataToConnector(networkBuffer.getBuffer(), totalRead); + releaseNetworkBuffer(); + isFilling.set(false); + fillInterested(); + return; + } else if (read > 0) { + if (networkBuffer.hasRemaining()) { + totalRead += read; + } else { + emitDataToConnector(networkBuffer.getBuffer(), networkBuffer.getBuffer().limit()); + networkBuffer.getBuffer().clear(); + totalRead = 0; + } + } else { + logger.debug("WebSocket {} is closed, stop reading", requestId); + releaseNetworkBuffer(); + isFilling.set(false); + return; + } + } + } catch (IOException e) { + logger.warn("IOException reading socket data"); + releaseNetworkBuffer(); + isFilling.set(false); + } + } + + private void emitDataToConnector(ByteBuffer dataBuffer, int length) { + byte[] data = new byte[length]; + dataBuffer.get(0, data, 0, length); + var socket = socketIOSupplier.get(); + if (socket != null) { + socket.emit("websocket", requestId, data); + } + } + + @Override + public void onFillable() { + if (isFilling.compareAndSet(false, true)) { + getExecutor().execute(this); + } + } + + @Override + public boolean onIdleExpired() { + // no idle timeout for websocket + return false; + } + + @Override + protected boolean onReadTimeout(Throwable timeout) { + // no read timeout for websocket + return false; + } + + @Override + public EndPoint getEndPoint() { + return endPoint; + } + + @Override + public void close() { + logger.debug("Closing websocket connection {}", requestId); + // emit request error to close websocket at connector + JSONObject responseJson = new JSONObject(); + try { + responseJson.put("id", requestId); + responseJson.put("responseStatusText", "openHAB connection closed"); + socketIOSupplier.get().emit("responseError", responseJson); + } catch (JSONException e) { + logger.debug("{}", e.getMessage()); + } + super.close(); + } + + @Override + public void onUpgradeTo(ByteBuffer byteBuffer) { + // Handle data emitted on endPoint upgrade + emitDataToConnector(byteBuffer, byteBuffer.limit()); + } + + @Override + public void onOpen() { + super.onOpen(); + // start data read + fillInterested(); + } + + private void acquireNetworkBuffer() { + try (var l = lock.lock()) { + if (networkBuffer == null) + networkBuffer = new RetainableByteBuffer(byteBufferPool, getInputBufferSize()); + } + } + + private void releaseNetworkBuffer() { + try (var l = lock.lock()) { + if (networkBuffer == null) + throw new IllegalStateException(); + networkBuffer.release(); + networkBuffer = null; + } + } + } }