Skip to content

dkdhub/clj-mqtt-broker

Repository files navigation

MQTT broker for Clojure

GitHub clj mqtt broker GitHub tag (latest by date) GitHub last commit

Rationale

Unfortunately there are few to nothing libraries, natively suitable to be used in Clojure development that implement MQTT server side functionalities.

The Moquette library is Java MQTT broker based on an event model by Netty. The library has a good performance and is designed with embedding support available out of the box. Its configuration is compatible with well known Mosquitto Open Source MQTT server.

When being combined - these tools open the way to painless M2M communications for services written in Clojure.

As always we prefer to keep things as tiny and as simple as it ever possible, so the library interface is very ascetic and pure.

Compatibility

The following implementations are successfully passed compatibility tests:

  • Moquette

  • Mosquitto

  • Paho

  • MQTT.js

  • DKD/Brownie

any other tool that conform MQTT specs should pass too, we believe.

Usage

Add the following dependency into your project.clj’s `:dependecies section

[com.dkdhub/clj-mqtt-broker "0.0.5"]

include the library into your code, like:

(ns my.namespace
  (:require [clj-mqtt-broker.core :as mqtt-core])
  (:import (clj_mqtt_broker.core Broker)
           (com.dkdhub.mqtt_broker SimpleBroker)))

and start coding.

(defrecord BasicHandler [id]
  InterceptHandler
  (onPublish [_ msg]
    (log/info "got a message")
    (let [payload (-> msg .getPayload (.toString StandardCharsets/UTF_8))]
      (log/info "Received on topic: " + (.getTopicName msg) + " content: " + payload)))
  (getID [_] id)
  (getInterceptedMessageTypes [_] InterceptHandler/ALL_MESSAGE_TYPES))

(def config-name "moquette.conf")

(let [b (Broker. (SimpleBroker. config-name))]
          (with-open [srv (open b (BasicHandler. "12345"))]
            ;; do what ever your need here upon sending
            (send srv "MY-SERVER" "/MY_TOPIC" "TEST MESSAGE" 1 false)
            ;; do what ever you need here after sending
            ;; once you leave the `with-open` closure - the instance will be stopped automatically
            ;; once you leave the `let` statement - the instance will be destroyed automatically
            ))

The usage pattern is clear:

  1. define handlers

  2. create broker instance

  3. start the service

  4. start serving messages

That is - no complexity and no headaches, at all.

Defining handlers

The handlers record should implement InterceptHandler interface

(defrecord BasicHandler [id]
  InterceptHandler
  (onPublish [_ msg]
    (let [payload (-> msg .getPayload (.toString StandardCharsets/UTF_8))]
      (log/info "Received on topic: " + (.getTopicName msg) + " content: " + payload)))
  (getID [_] id)
  (getInterceptedMessageTypes [_] InterceptHandler/ALL_MESSAGE_TYPES))

The full set of method to be overridden:

  1. onPublish

  2. onConnect

  3. onDisconnect

  4. onConnectionLost

  5. onSubscribe

  6. onUnsubscribe

  7. onMessageAcknowledged

  8. getID

  9. getInterceptedMessageTypes

The ALL_MESSAGE_TYPES vector contains a full set of the related messages types.

Creating instance

The library contains default SimpleBroker implementation written in Java that requires resources' configuration file name to be passed into constructor.

In order to manage the instance comfortably it should be passed as a parameter into the Clojure record implements CljBroker interface.

CljBroker interface definition
(defprotocol CljBroker
  (start [o ^InterceptHandler handlers])
  (open [o ^InterceptHandler handlers])
  (stop [o])
  (close [o])
  (send [o from to data qos retain?]))

so the instantiation of the complete Broker is looks like:

(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))

Sample configuration

my-broker-settings.conf
port 1883
host 0.0.0.0
allow_anonymous true
telemetry_enabled false

Starting the service

The Clojure interface supports two approaches:

  1. controlling the instance by calling start/stop methods (that fully corresponds to its Java interface)

  2. controlling the instance by with-open macro

start/stop
...
(let [srv (Broker. (SimpleBroker. config-name))]
    (start srv (BasicHandler. "My Instance"))
    ;; your code here
    (stop srv))
...
with-open
...
(with-open [srv (Broker. (SimpleBroker. config-name))]
    (start srv (BasicHandler. "My Instance")))

;; or even

(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))

(with-open [srv (open srv-clj (BasicHandler. "My Instance"))]
    (comment "Do your stuff here"))

For more examples see test sources.

License

© 2022-2023 Fern Flower Lab

Distributed under the MIT Licence.