diff --git a/project.clj b/project.clj index 970f132..f7c5a70 100644 --- a/project.clj +++ b/project.clj @@ -4,7 +4,7 @@ :license {:name "MIT License" :url "http://opensource.org/licenses/MIT"} :dependencies [[io.undertow/undertow-core "2.3.5.Final"] - [ring/ring-core "1.9.6"]] + [ring/ring-core "1.11.0-beta2"]] :profiles {:dev {:dependencies [[org.clojure/clojure "1.11.1"] [clj-http "3.12.3"] [stylefruits/gniazdo "1.2.1"] diff --git a/src/ring/adapter/undertow.clj b/src/ring/adapter/undertow.clj index 27a4d32..2f7d7b9 100644 --- a/src/ring/adapter/undertow.clj +++ b/src/ring/adapter/undertow.clj @@ -4,7 +4,9 @@ [ring.adapter.undertow.request :refer [build-exchange-map]] [ring.adapter.undertow.response :refer [set-exchange-response]] [ring.adapter.undertow.ssl :refer [keystore->ssl-context]] - [ring.adapter.undertow.websocket :as ws]) + [ring.adapter.undertow.websocket :as ws] + [ring.adapter.undertow.ring-websocket :as rws] + [ring.websocket :as ring-ws]) (:import [io.undertow Undertow Undertow$Builder UndertowOptions] [org.xnio Options SslClientAuthMode] @@ -20,7 +22,9 @@ (if websocket? (if-let [ws-config (:undertow/websocket response-map)] (->> ws-config (ws/ws-callback) (ws/ws-request exchange (:headers response-map))) - (set-exchange-response exchange response-map)) + (if (ring-ws/websocket-response? response-map) + (->> (rws/ws-callback response-map) (ws/ws-request exchange (:headers response-map))) + (set-exchange-response exchange response-map))) (set-exchange-response exchange response-map))) (defn wrap-with-session-handler diff --git a/src/ring/adapter/undertow/ring_websocket.clj b/src/ring/adapter/undertow/ring_websocket.clj new file mode 100644 index 0000000..fcbcd68 --- /dev/null +++ b/src/ring/adapter/undertow/ring_websocket.clj @@ -0,0 +1,71 @@ +(ns ring.adapter.undertow.ring-websocket + (:require [ring.websocket.protocols :as wsp]) + (:import + [io.undertow.websockets + WebSocketConnectionCallback] + [io.undertow.websockets.core + AbstractReceiveListener + BufferedBinaryMessage + BufferedTextMessage + CloseMessage + WebSocketChannel + WebSockets + WebSocketCallback] + [io.undertow.websockets.spi WebSocketHttpExchange] + [java.nio ByteBuffer])) + +(defn ws-socket [^WebSocketChannel channel] + (reify wsp/Socket + (-open? [_] + (.isOpen channel)) + (-send [_ message] + (if (instance? CharSequence message) + (WebSockets/sendTextBlocking (.toString ^CharSequence message) channel) + (WebSockets/sendBinaryBlocking ^ByteBuffer message channel))) + (-ping [_ data] + (WebSockets/sendPingBlocking ^ByteBuffer data channel)) + (-pong [_ data] + (WebSockets/sendPongBlocking ^ByteBuffer data channel)) + (-close [_ code reason] + (WebSockets/sendCloseBlocking ^long code ^String reason channel)) + wsp/AsyncSocket + (-send-async [_ message succeed fail] + (let [callback (reify WebSocketCallback + (complete [_ _ _] (succeed)) + (onError [_ _ _ ex] (fail ex)))] + (if (instance? CharSequence message) + (WebSockets/sendText (.toString ^CharSequence message) channel callback) + (WebSockets/sendBinary ^ByteBuffer message channel callback)))))) + +(defn ws-listener [listener socket] + (proxy [AbstractReceiveListener] [] + (onFullTextMessage [^WebSocketChannel _channel ^BufferedTextMessage message] + (wsp/on-message listener socket (.getData message))) + (onFullBinaryMessage [^WebSocketChannel _channel ^BufferedBinaryMessage message] + (let [pooled (.getData message)] + (try + (wsp/on-message listener socket (.getResource pooled)) + (finally (.free pooled))))) + (onCloseMessage [^CloseMessage message ^WebSocketChannel _channel] + (wsp/on-close listener socket (.getCode message) (.getReason message))) + (onError [^WebSocketChannel channel ^Throwable error] + (wsp/on-error listener socket error)) + (onFullPingMessage [^WebSocketChannel channel ^BufferedBinaryMessage message] + (when (satisfies? wsp/PingListener listener) + (let [pooled (.getData message)] + (try + (wsp/on-ping listener socket (.getResource pooled)) + (finally (.free pooled)))))) + (onFullPongMessage [^WebSocketChannel channel ^BufferedBinaryMessage message] + (let [pooled (.getData message)] + (try + (wsp/on-pong listener socket (.getResource pooled)) + (finally (.free pooled))))))) + +(defn ws-callback [{:keys [ring.websocket/listener]}] + (reify WebSocketConnectionCallback + (^void onConnect [_ ^WebSocketHttpExchange _exchange ^WebSocketChannel channel] + (let [socket (ws-socket channel)] + (wsp/on-open listener socket) + (.set (.getReceiveSetter channel) (ws-listener listener socket)) + (.resumeReceives channel))))) diff --git a/test/ring/adapter/test/undertow.clj b/test/ring/adapter/test/undertow.clj index 4eb5afd..3623865 100644 --- a/test/ring/adapter/test/undertow.clj +++ b/test/ring/adapter/test/undertow.clj @@ -2,6 +2,8 @@ (:require [clojure.test :refer :all] [ring.adapter.undertow :refer :all] + [ring.websocket :as ws] + [ring.websocket.protocols :as wsp] [clj-http.client :as http] [gniazdo.core :as gniazdo] [clojure.java.io :as io]) @@ -209,4 +211,34 @@ (is (= (:status response) 200)) (is (.startsWith (get-in response [:headers "content-type"]) "text/plain")) - (is (= (:body response) "Hello World")))))) \ No newline at end of file + (is (= (:body response) "Hello World")))))) + +(deftest undertow-ring-websockets + (let [events (atom []) + received (atom []) + socket (atom nil) + result (promise) + listener (reify wsp/Listener + (on-open [_ sock] + (reset! socket sock) + (swap! events conj [:open])) + (on-message [_ sock mesg] + (swap! events conj [:message mesg]) + (ws/send sock mesg)) + (on-pong [_ _ _] + (swap! events conj :pong)) + (on-close [_ _sock code reason] + (deliver result (swap! events conj [:close code reason])))) + handler (constantly {:ring.websocket/listener listener})] + (with-server handler {:port test-port} + (let [socket (gniazdo/connect "ws://localhost:4347/" + :on-receive #(swap! received conj %))] + (gniazdo/send-msg socket "hello") + (wait-until #(seq @received)) + (gniazdo/close socket 1000 "normal closure")) + (is (= ["hello"] @received)) + (is (= [[:open] + [:message "hello"] + [:close 1000 "normal closure"]] + (deref result 2000 :fail))) + (is (wait-until #(not (ws/open? @socket))) "Client close acknowledged"))))