diff --git a/src/clojure/catacumba/experimental/stomp.clj b/src/clojure/catacumba/experimental/stomp.clj deleted file mode 100644 index d88d497..0000000 --- a/src/clojure/catacumba/experimental/stomp.clj +++ /dev/null @@ -1,11 +0,0 @@ -(ns catacumba.experimental.stomp - (:require [catacumba.experimental.stomp parser broker] - [potemkin.namespaces :refer [import-vars]])) - -(import-vars - [catacumba.experimental.stomp.broker - message-broker - send! - subscribe! - unsubscribe!]) - diff --git a/src/clojure/catacumba/experimental/stomp/broker.clj b/src/clojure/catacumba/experimental/stomp/broker.clj deleted file mode 100644 index 790e4c3..0000000 --- a/src/clojure/catacumba/experimental/stomp/broker.clj +++ /dev/null @@ -1,90 +0,0 @@ -(ns catacumba.experimental.stomp.broker - "A simple, in memmory pub sub broker implementation." - (:require [clojure.core.async :as async] - [clojure.core.async.impl.protocols :as async-proto] - [cuerdas.core :as str])) - -(defn- chan? - "Returns true if `x` is satisfies a Channel protocol." - [x] - (satisfies? async-proto/Channel x)) - -(defn- start-listening - "Start the listeing loop until subscriber is closed." - [publication topic listener] - (let [subscriber (async/chan (async/dropping-buffer 256))] - (async/sub publication topic subscriber) - (async/go-loop [] - (when-let [message (async/ (str (name topic) "-" (name key)) - (str/slugify) - (keyword))) - -(defprotocol IBroker - (^:private subscribe* [_ topic key listener] "Add subscriptor.") - (^:private unsubscribe* [_ topic key] "Remove subscription.") - (^:private send* [_ topic message] "Send a message to the topic.")) - -(deftype Broker [inputchan publication state] - IBroker - (subscribe* [_ topic key listener] - (let [subscriber (start-listening publication topic listener) - composedkey (generate-key topic key)] - (send state assoc composedkey subscriber))) - - (unsubscribe* [_ topic key] - (let [composedkey (generate-key topic key) - subscriber (get @state composedkey nil)] - (when subscriber - (async/close! subscriber) - (send state dissoc composedkey)))) - - (send* [_ topic message] - (async/put! inputchan {:topic topic :payload message})) - - java.io.Closeable - (close [_] - (reduce (fn [_ [key channel]] - (async/close! channel) - (send state dissoc key)) - nil - @state))) - -(defn message-broker - "The in memory broker engine constructor." - [& [{:keys [inputbuffer] :or {inputbuffer 256}}]] - (let [inputchan (async/chan (async/sliding-buffer inputbuffer)) - publication (async/pub inputchan :topic) - state (agent {})] - (Broker. inputchan publication state))) - -(defn subscribe! - "Create new subscription." - [broker topic key listener] - {:pre [(fn? listener)]} - (let [topic (keyword topic) - key (keyword key)] - (subscribe* broker topic key listener))) - -(defn unsubscribe! - "Unsubscribe the subscriber from a topic." - [broker topic key] - (let [topic (keyword topic) - key (keyword key)] - (unsubscribe* broker topic key))) - -(defn send! - "Publish message to the topic." - [broker topic message] - (let [topic (keyword topic) - key (keyword key)] - (send* broker topic message))) diff --git a/src/clojure/catacumba/experimental/stomp/parser.clj b/src/clojure/catacumba/experimental/stomp/parser.clj deleted file mode 100644 index 2d75f10..0000000 --- a/src/clojure/catacumba/experimental/stomp/parser.clj +++ /dev/null @@ -1,172 +0,0 @@ -(ns catacumba.experimental.stomp.parser - (:import (java.io PushbackReader StringReader))) - -;; this character denotes the end of a frame. -(def *frame-end* 0) ;; ASCII null - -;; this character denotes the end of a line -(def *line-end* 10) ;; ASCII linefeed - -;; delimeter between attributes and values -(def *header-delimiter* 58) ;; ASCII colon - -(defrecord StompFrame [command headers body]) - -(defn byte-length - "Returns the length in bytes of the provided sequence. The sequence - needs to be implement CharSequence (i.e. a String, CharBuffer, etc.)" - [body] - (.codePointCount body 0 (count body))) - -(defn emit - "Prints the provided frame to *out*." - [frame] - (with-out-str - ;; the command - (print (str (.toUpperCase (name (:command frame))) (char *line-end*))) - - ;; the headers - (doseq [[key val] (:headers frame)] - (print (str (name key) ":" (.trim val) (char *line-end*)))) - - ;; calculate the length of the body for :send - (print (str "content-length:" - (byte-length (:body frame)) (char *line-end*))) - - ;; the body - (print (char *line-end*)) - (print (apply str (:body frame))) - (print (char *frame-end*)))) - -(defmulti get-reader - "Returns a PushBackReader for the provided Object. We want to wrap - another Reader but we'll cast to a String and read that if required." - :class) - -(defmethod get-reader Readable - [frame-seq] - (PushbackReader. frame-seq)) - -(defmethod get-reader :default - [frame-seq] - (PushbackReader. (StringReader. (apply str frame-seq)))) - -(defn peek-for - "Returns the next character from the Reader, checks it against the - expected value, then pushes it back on the reader." - [peek-int reader] - (let [int-in (.read reader)] - (.unread reader int-in) - (if (= int-in peek-int) true false))) - -(defn read-command - "Reads in the command on the STOMP frame." - [reader] - (loop [int-in (.read reader) buffer []] - (cond - - (= int-in -1) - (throw (Exception. "End of file reached while reading command")) - - (= int-in *frame-end*) - (throw (Exception. "End of frame reached while reading command")) - - (= int-in *line-end*) - (apply str buffer) - - :else - (recur (.read reader) (conj buffer (char int-in)))))) - -(defn read-header-key - "Reads in the key name for a STOMP header." - [reader] - (loop [int-in (.read reader) buffer []] - (cond - - (= int-in -1) - (throw (Exception. "End of file reached while reading header key")) - - (= int-in *frame-end*) - (throw (Exception. "End of frame reached while reading header key")) - - (= int-in *header-delimiter*) - (apply str buffer) - - :else - (recur (.read reader) (conj buffer (char int-in)))))) - -(defn read-header-value - "Reads in the value for a STOMP header." - [reader] - (loop [int-in (.read reader) buffer []] - (cond - - (= int-in -1) - (throw (Exception. "End of file reached while reading header value")) - - (= int-in *frame-end*) - (throw (Exception. "End of frame reached while reading header value")) - - (= int-in *line-end*) - (apply str buffer) - - :else - (recur (.read reader) (conj buffer (char int-in)))))) - -(defn read-body - "Lazily reads in the body of a STOMP frame." - [reader] - (let [sb (StringBuffer.)] - (loop [int-in (.read reader)] - (cond - ;; the frame end marker should be the last bit of data - (and (= int-in *frame-end*) (not (peek-for -1 reader))) - (throw (Exception. "End of frame reached while reading body")) - - (= int-in -1) - (.toString sb) - - (not= int-in *frame-end*) - (do - (.append sb (char int-in)) - (recur (.read reader))))))) - -(defn parse-frame - "Parses the STOMP frame data from the provided reader into a - hash-map (frame-struct)." - [reader] - (loop [int-in (.read reader) parsing :command frame {}] - (cond - - (= int-in -1) - (throw (Exception. "End of file reached without an end of frame")) - - (= parsing :command) - (do (.unread reader int-in) - (recur nil :headers (assoc frame :command (read-command reader)))) - - (= parsing :headers) - (recur nil - (if (peek-for *line-end* reader) :body :headers) - (if (peek-for *line-end* reader) frame - (assoc frame :headers (assoc (:headers frame) - (keyword (.toLowerCase (read-header-key reader))) - (read-header-value reader))))) - - (= parsing :body) - (do (.read reader) - (recur nil :complete - (assoc frame :body (read-body reader)))) - - (= parsing :complete) - (map->StompFrame frame) - - (not= int-in *frame-end*) - (recur (.read reader) parsing frame)))) - -(defn parse - "Parses the provided STOMP frame data into a - hash-map (frame-struct). It will read the headers greedily but - return the body of the frame lazily; the body will be a sequence." - [frame-data] - (parse-frame (get-reader frame-data)))