- Definitions
- What is a graph transition?
- What is a message, and how is it dispatched?
- How many message at most at any moment?
- Usage
- Code walkthrough
- Visalisation export
Namespace nabab.specs
contains formal definitions of what’s been
described above.
;; Generated from litterate programming file `README.org`.
(ns nabab.specs
"Formal definitions of nabab domain concepts"
(:require [clojure.spec.alpha :as spec])
(:import (clojure.core.async.impl.channels ManyToManyChannel)))
(defn chan? [c] (instance? ManyToManyChannel c))
A ::graph
fully describes how data flow between channels. It’s
defined by its ::transitions
and several other features we can skip
for a general overview. This graph is passed to processing function
nabab.lifecycle/bootstrap!
which creates channels from transitions
and subscribe them to correct topics.
(spec/def ::nabab-graph (spec/keys :req [:nabab/fixed-buffer-size
:nabab/publication
:nabab/dispatch-ifn
:nabab/transitions
:nabab/publisher]))
::transitions
is a map of transition name and definition.
(spec/def :nabab/transitions (spec/map-of :nabab/transition-name ::transition))
(spec/def :nabab/transition-name any?)
<<::transition>>
Each ::transition
is made of a :nabab/subscriber
channel and a
:nabab/block
which processes messages received in the
subscriber. These attributes are usually deduced on bootstrapping from
:nabab/subscribed-topics
and :nabab/block-fn
. The name is derived
from the ::transitions
map key.
(spec/def ::transition (spec/keys :req [:nabab/subscriber
:nabab/block
:nabab/transition-name]
:opt [:nabab/subscribed-topics
:nabab/doc-published-topics
:nabab/block-fn]))
(spec/def :nabab/subscriber chan?)
(spec/def :nabab/block chan?)
:nabab/subscribed-topics
is a set of topics to subscribe the
transition to. Channel :nabab/subscriber
is made on
bootstrapping. It gets all messages sent to the subscribed topics.
(spec/def :nabab/subscribed-topics (spec/coll-of any? :kind set?))
(spec/def :nabab/block-fn ifn?)
Attribute :nabab/doc-published-topics
is entirely optional. Actually
it’s only for documentation purpose as it’s not even considered in the
code. Use it for more descriptive graph visualisations.
(spec/def :nabab/doc-published-topics (spec/map-of any? (spec/coll-of any? :kind set?)))
Messages could be of any kind. They are closely related to the
function which dispatch them to topics. An opinionated choice is to
define a valid message as a map with at least key
:message/topic
. You can override this spec if you need to.
(spec/def ::message (spec/keys :req [:message/topic]))
(spec/def :message/topic any?)
Message are read by :nabab/dispatch-ifn
. As ifn
implies, it can be
a function, or an invocable datastructure like a map or a set.
(spec/def :nabab/dispatch-ifn ifn?)
(defn example-dispatch-ifn
[message]
:message/topic)
(def other-dispatch-ifn {:message/some-attribute #{:topic/output}
:message/other-attribute #{:topic/failure :topic/report}})
Function :nabab/dispatch-ifn
is called for each message received by
the :nabab/publisher
. This publisher itself is a simple channel fed
by the :nabab/publication
of the publisher-subscriber pattern.
(spec/def :nabab/publisher chan?)
(spec/def :nabab/publication chan?)
:nabab/fixed-buffer-size
has quite an explicit name: throughout the
whole nabab graph some channels must be created. How large will their
buffers be? I’ve chosen to create all channels equals. You should set
it to the largest number of messages which can be waiting in a topic
before being processed. Perhaps a further version of nabab will give
you more freedom to use sliding buffers or that kind of stuff. For the
time being, it’s quite opinionated.
The best way to get acquainted to a library is to crawl its
tests. Nabab is quite a straightforward library so tests aren’t very
complex. They’re all in namespace nabab.nabab-test
.
(ns nabab.nabab-test
(:require [nabab.lifecycle :as nabab]
[nabab.viz :as viz]
[clojure.core.async :refer [close! chan go go-loop pub sub <! >! >!! <!! pipe pipeline alts!! timeout]]
[clojure.test :refer :all]))
Here is a simple yet pretty exhaustive nabab description:
(def test-description
{:nabab/fixed-buffer-size 2
:context/addition-value 2
:context/addition-timeout 80
:nabab/dispatch-ifn (fn [message]
(:message/topic message))
:nabab/transitions {:input
{:nabab/subscribed-topics #{:topic/input}
:nabab/doc-published-topics {:topic/input #{::1}}
:nabab/pipeline-transducer (map #(-> %
(update :message/content inc)
(assoc :message/topic ::1)))}
:simple-processing
{:nabab/subscribed-topics #{::1}
:nabab/doc-published-topics {::1 #{::2}}
:nabab/pipeline-transducer (map #(-> %
(update :message/content (partial * 2))
(assoc :message/topic ::2)))}
:fork
{:nabab/subscribed-topics #{::2}
:nabab/doc-published-topics {::2 #{::3 :topic/output}}
:nabab/pipeline-transducer (mapcat (juxt #(assoc % :message/topic :topic/output)
#(-> %
(assoc :message/topic ::3)
(update :message/content (partial * 2)))
#(-> %
(assoc :message/topic ::3)
(update :message/content (partial * 2)))))}
:output
{:nabab/subscribed-topics #{::3}
:nabab/doc-published-topics {::3 #{:topic/output}}
:nabab/block-fn (fn [description subscriber]
(go-loop []
(when-let [message (<! subscriber)]
(let [addition-value (:context/addition-value description)
addition-timeout (:context/addition-timeout description)
publisher (:nabab/publisher description)]
(<! (timeout addition-timeout))
(>! publisher {:message/topic :topic/output
:message/content (+ (:message/content message)
addition-value)})))
(recur)))}}})
You can already read it quite easily. There are several transitions:
:input
, simple-processing
, :fork
, and :output
. Most of them
are basically a pipeline from topic channel to publisher channel so
only the pipeline transducer is provided. Transition :fork
returns
several messages in different channels for each message it gets.
Finally :output
exhibits a more complex behaviour which would be
less explicit as a pipeline. Its :nabab/block-fn
has two parameters:
the runtime implemented nabab graph, and the subscriber; it’s really a
pure function which gets all it needs to setup a go-loop.
Try to figure out what the output of this graph when the following
message is sent in topic :topic/input
:
{:message/topic :topic/input
:message/content 5} ;; or anything else
Then you can keep reading and find the solution below.
(deftest nabab-test
(let [implementation (nabab/bootstrap! test-description)
output-chan (chan)
addition-value (-> implementation :context/addition-value)
input-chan (:nabab/publisher implementation)
n (rand-int 100)]
(sub (:nabab/publication implementation)
:topic/output
output-chan)
(>!! input-chan
{:message/topic :topic/input
:message/content n})
(is (= (<!! output-chan) {:message/topic :topic/output
:message/content (* 2 (inc n))}))
(is (= (<!! output-chan) {:message/topic :topic/output
:message/content (+ addition-value
(* 4 (inc n)))}))
(is (= (<!! output-chan) {:message/topic :topic/output
:message/content (+ addition-value
(* 4 (inc n)))}))
(testing "doesn't output more messages"
(when-let [[maybe-message _] (alts!! [output-chan (timeout 500)] :priority true)]
(is (nil? maybe-message))))
(nabab/shutdown! implementation)))
Finally, when you want to export your nabab graph to some graph visualisation tool, two handy functions come at play:
(deftest viz-test
(testing "edges"
(is (= (set (viz/graph-edges test-description))
#{#:edge{:name :input
:from :topic/input
:to ::1}
#:edge{:name :simple-processing
:from ::1
:to ::2}
#:edge{:name :fork
:from ::2
:to :topic/output}
#:edge{:name :fork
:from ::2
:to ::3}
#:edge{:name :output
:from ::3
:to :topic/output}})))
(testing "nodes"
(is (= (set (viz/graph-nodes test-description))
#{:topic/input ::1 ::2 ::3 :topic/output}))))
Namespace nabab.lifecycle
is the core of this library, but it’s
actually very tiny.
;; Generated from litterate programming file `README.org`.
(ns nabab.lifecycle
"Manage lifecycle of a nabab graph"
(:require [clojure.core.async :refer [close! chan go go-loop pub sub <! >! pipe pipeline]]
[clojure.spec.alpha :as spec]
[nabab.specs :as specs]))
Because of some curent technical limitation with org-mode litterate
programming, source code snippets will be appended to the file in the
order they appear here. Suffice to say this namespace has two main
function at the end of it: nabab.lifecycle/bootstrap!
and
nabab.lifecycle/shutdown!
. Your declarative map gets read by
nabab.lifecycle/bootstrap!
and a copy of that map is returned with
started, publisher-subsriber established, and useful vars to these
channels put in the map.
nabab.lifecycle/shutdown!
winds down this whole net of
channels. It’s actually not really difficult because default
core.async
options close a channel when its source gets
closed. Hence, simply closing the :nabab/publisher
is like
distributing a poison pill to all other related channels. If several
nabab graphes are communicating, beware undesirable side effects.
(defn topics->subscriptions!
"Subscribe a `subscriber` channel to topics of a `publication`."
[publication subscriber topics-to-subscribe]
(doseq [topic topics-to-subscribe]
(sub publication topic subscriber)))
We’ve seen that a =::transition= eventually has :nabab/subscriber
and :nabab/block
when the nabab graph is bootstrapped. However you
usually provide :nabab/subscribed-topics
, :nabab/block-fn
, or
nabab/pipeline-transducer
which other attributes are derived
from. This allow different graphs to share same transitions, so they
can communicate.
(defn transducer-pipeline
[description subscriber transducer]
(pipeline (:nabab/fixed-buffer-size description)
(:nabab/publisher description)
transducer
subscriber))
(defn ->transition-block
[description transition-name transition]
(let [subscriber (or (:nabab/subscriber transition)
(chan (:nabab/fixed-buffer-size description)))]
(topics->subscriptions! (:nabab/publication description)
subscriber
(:nabab/subscribed-topics transition))
(merge {:nabab/subscriber subscriber
:nabab/transition-name transition-name
:nabab/block (or (:nabab/block transition)
(condp #(get %2 %1) transition
:nabab/block-fn :>> #(% description subscriber)
:nabab/pipeline-transducer :>> #(transducer-pipeline description subscriber %)
(comment "else, will invalidate spec")))}
transition)))
(defn implement-transitions [description]
(reduce (fn [acc [transition-name transition]]
(assoc-in acc
[:nabab/transitions transition-name]
(->transition-block acc transition-name transition)))
description
(:nabab/transitions description)))
nabab.lifecycle/bootstrap!
ensures the output is a valid nabab
graph, and raise the explanation string if it isn’t.
(defn bootstrap!
[description]
(let [publisher (or (:nabab/publisher description)
(chan (:nabab/fixed-buffer-size description)))
publication (pub publisher (:nabab/dispatch-ifn description))
description (-> description
(assoc :nabab/publisher publisher)
(assoc :nabab/publication publication)
implement-transitions
doall)]
(if (spec/valid? ::specs/nabab-graph description)
(throw (ex-info "description doesn't give a valid nabab graph"
{:spec/explanation (spec/explain ::specs/nabab-graph description)}))
description)))
(defn shutdown!
[implementation]
(close! (:nabab/publisher implementation)))
Namespace nabab.viz
provides two very simple functions,
graph-edges
and graph-nodes
to easily export your naba graph into
your favourite graph visualisation tool.
(ns nabab.viz)
Here is the transducers which process transitions and derive edges from them.
(def xf-edges
(comp (map (fn retrieve-edge-name [[transition-name transition]]
(assoc transition :edge/name transition-name)))
(mapcat (fn retrieve-edge-source [transition]
(->> (:nabab/doc-published-topics transition)
keys
(map (fn [node]
(assoc transition :edge/from node))))))
(mapcat (fn retrieve-edge-target [transition]
(->> (:edge/from transition)
(get (:nabab/doc-published-topics transition))
(map (fn [node]
(assoc transition :edge/to node))))))
(map #(select-keys % #{:edge/name
:edge/from
:edge/to}))))
If you need a refresher about transducers, I highly recommend these authors in addition to the official Clojure litterature:
A transducer is a function that takes one reducing function and returns another. A reducing function is a function that has the structure of what you’d pass to reduce. It is a function which accepts an element of input and a previous reduction, and returns a new reduction.
(defn xf-expand
[expanding-fn]
(fn [rf]
(fn ([] (rf))
([result] (rf result))
([result el] (let [els (expanding-fn el)]
(reduce rf result els))))))
(sequence (xf-expand range) (range 5))
;; => (0 0 1 0 1 2 0 1 2 3)
(sequence (mapcat range) (range 5))
;; => (0 0 1 0 1 2 0 1 2 3)
(eduction (xf-expand range) (xf-expand range) (range 5))
;; => (0 0 0 1 0 0 1 0 1 2)
(defn graph-edges
[description]
(->> description
:nabab/transitions
(eduction xf-edges)))
(defn graph-nodes
[description]
(->> (graph-edges description)
(mapcat (comp concat
(juxt :edge/from
:edge/to)))
set))