Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Ring websocket API (WIP) #30

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
:license {:name "MIT License"
:url "http://opensource.org/licenses/MIT"}
:dependencies [[io.undertow/undertow-core "2.3.5.Final"]
[ring/ring-core "1.9.6"]]
[ring/ring-core "1.11.0-beta2"]]
:profiles {:dev {:dependencies [[org.clojure/clojure "1.11.1"]
[clj-http "3.12.3"]
[stylefruits/gniazdo "1.2.1"]
Expand Down
8 changes: 6 additions & 2 deletions src/ring/adapter/undertow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
[ring.adapter.undertow.request :refer [build-exchange-map]]
[ring.adapter.undertow.response :refer [set-exchange-response]]
[ring.adapter.undertow.ssl :refer [keystore->ssl-context]]
[ring.adapter.undertow.websocket :as ws])
[ring.adapter.undertow.websocket :as ws]
[ring.adapter.undertow.ring-websocket :as rws]
[ring.websocket :as ring-ws])
(:import
[io.undertow Undertow Undertow$Builder UndertowOptions]
[org.xnio Options SslClientAuthMode]
Expand All @@ -20,7 +22,9 @@
(if websocket?
(if-let [ws-config (:undertow/websocket response-map)]
(->> ws-config (ws/ws-callback) (ws/ws-request exchange (:headers response-map)))
(set-exchange-response exchange response-map))
(if (ring-ws/websocket-response? response-map)
(->> (rws/ws-callback response-map) (ws/ws-request exchange (:headers response-map)))
(set-exchange-response exchange response-map)))
(set-exchange-response exchange response-map)))

(defn wrap-with-session-handler
Expand Down
71 changes: 71 additions & 0 deletions src/ring/adapter/undertow/ring_websocket.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns ring.adapter.undertow.ring-websocket
(:require [ring.websocket.protocols :as wsp])
(:import
[io.undertow.websockets
WebSocketConnectionCallback]
[io.undertow.websockets.core
AbstractReceiveListener
BufferedBinaryMessage
BufferedTextMessage
CloseMessage
WebSocketChannel
WebSockets
WebSocketCallback]
[io.undertow.websockets.spi WebSocketHttpExchange]
[java.nio ByteBuffer]))

(defn ws-socket [^WebSocketChannel channel]
(reify wsp/Socket
(-open? [_]
(.isOpen channel))
(-send [_ message]
(if (instance? CharSequence message)
(WebSockets/sendTextBlocking (.toString ^CharSequence message) channel)
(WebSockets/sendBinaryBlocking ^ByteBuffer message channel)))
(-ping [_ data]
(WebSockets/sendPingBlocking ^ByteBuffer data channel))
(-pong [_ data]
(WebSockets/sendPongBlocking ^ByteBuffer data channel))
(-close [_ code reason]
(WebSockets/sendCloseBlocking ^long code ^String reason channel))
wsp/AsyncSocket
(-send-async [_ message succeed fail]
(let [callback (reify WebSocketCallback
(complete [_ _ _] (succeed))
(onError [_ _ _ ex] (fail ex)))]
(if (instance? CharSequence message)
(WebSockets/sendText (.toString ^CharSequence message) channel callback)
(WebSockets/sendBinary ^ByteBuffer message channel callback))))))

(defn ws-listener [listener socket]
(proxy [AbstractReceiveListener] []
(onFullTextMessage [^WebSocketChannel _channel ^BufferedTextMessage message]
(wsp/on-message listener socket (.getData message)))
(onFullBinaryMessage [^WebSocketChannel _channel ^BufferedBinaryMessage message]
(let [pooled (.getData message)]
(try
(wsp/on-message listener socket (.getResource pooled))
(finally (.free pooled)))))
(onCloseMessage [^CloseMessage message ^WebSocketChannel _channel]
(wsp/on-close listener socket (.getCode message) (.getReason message)))
(onError [^WebSocketChannel channel ^Throwable error]
(wsp/on-error listener socket error))
(onFullPingMessage [^WebSocketChannel channel ^BufferedBinaryMessage message]
(when (satisfies? wsp/PingListener listener)
(let [pooled (.getData message)]
(try
(wsp/on-ping listener socket (.getResource pooled))
(finally (.free pooled))))))
(onFullPongMessage [^WebSocketChannel channel ^BufferedBinaryMessage message]
(let [pooled (.getData message)]
(try
(wsp/on-pong listener socket (.getResource pooled))
(finally (.free pooled)))))))

(defn ws-callback [{:keys [ring.websocket/listener]}]
(reify WebSocketConnectionCallback
(^void onConnect [_ ^WebSocketHttpExchange _exchange ^WebSocketChannel channel]
(let [socket (ws-socket channel)]
(wsp/on-open listener socket)
(.set (.getReceiveSetter channel) (ws-listener listener socket))
(.resumeReceives channel)))))
34 changes: 33 additions & 1 deletion test/ring/adapter/test/undertow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
(:require
[clojure.test :refer :all]
[ring.adapter.undertow :refer :all]
[ring.websocket :as ws]
[ring.websocket.protocols :as wsp]
[clj-http.client :as http]
[gniazdo.core :as gniazdo]
[clojure.java.io :as io])
Expand Down Expand Up @@ -209,4 +211,34 @@
(is (= (:status response) 200))
(is (.startsWith (get-in response [:headers "content-type"])
"text/plain"))
(is (= (:body response) "Hello World"))))))
(is (= (:body response) "Hello World"))))))

(deftest undertow-ring-websockets
(let [events (atom [])
received (atom [])
socket (atom nil)
result (promise)
listener (reify wsp/Listener
(on-open [_ sock]
(reset! socket sock)
(swap! events conj [:open]))
(on-message [_ sock mesg]
(swap! events conj [:message mesg])
(ws/send sock mesg))
(on-pong [_ _ _]
(swap! events conj :pong))
(on-close [_ _sock code reason]
(deliver result (swap! events conj [:close code reason]))))
handler (constantly {:ring.websocket/listener listener})]
(with-server handler {:port test-port}
(let [socket (gniazdo/connect "ws://localhost:4347/"
:on-receive #(swap! received conj %))]
(gniazdo/send-msg socket "hello")
(wait-until #(seq @received))
(gniazdo/close socket 1000 "normal closure"))
(is (= ["hello"] @received))
(is (= [[:open]
[:message "hello"]
[:close 1000 "normal closure"]]
(deref result 2000 :fail)))
(is (wait-until #(not (ws/open? @socket))) "Client close acknowledged"))))