Skip to content
/ erlbus Public

Simple, Distributed and Scalable PubSub Message Bus written in Erlang

License

Notifications You must be signed in to change notification settings

cabol/erlbus

Repository files navigation

Erlbus

Message/Event Bus written in Erlang.

CI

The PubSub core is a clone of the original, remarkable, and proven Phoenix PubSub Layer, but re-written in Erlang.

A new way to build soft real-time and high scalable messaging-based applications, not centralized but distributed!

See the online documentation.

Introduction

Erlbus is a simple and lightweight library/tool to build messaging-based applications.

Erlbus PubSub implementation was taken from Phoenix Framework, which provides an amazing, scalable and proven PubSub solution. In addition to this, Erlbus provides an usable and simpler interface on top of this implementation.

You can read more about the PubSub implementation HERE.

Installation

Erlang

In your rebar.config:

{deps, [
  {ebus, "0.3.0", {pkg, erlbus}}
]}.

Elixir

In your mix.exs:

def deps do
  [
    {:ebus, "~> 0.3", hex: :erlbus}
  ]
end

Getting Started

Assuming you have a working Erlang installation (18 or later), building erlbus should be as simple as:

$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make

Quick Start Example

Start an Erlang console with erlbus running:

make shell

Once into the erlang console:

% subscribe the current shell process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% publish a message
ebus:pub("foo", {foo, "hi"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"}]

% unsubscribe self
ebus:unsub(self(), "foo").
ok

% publish other message
ebus:pub("foo", {foo, "hello"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self (last message didn't arrive)
ebus_proc:messages(self()).
[{foo,"hi"}]

% check subscribers (only Pid should be in the returned list)
ebus:subscribers("foo").
[<0.57.0>]

% check topics
ebus:topics().
[<<"foo">>]

% subscribe self to other topic
ebus:sub(self(), "bar").
ok

% check topics
ebus:topics().
[<<"bar">>,<<"foo">>]

% publish other message
ebus:pub("bar", {bar, "hi bar"}).
ok

% check received message for Pid (last message didn't arrive)
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"},{bar,"hi bar"}]

Note:

You may have noticed that is not necessary additional steps/calls to create/delete a topic, this is automatically handled by ebus, so you don't worry about it!

Now, let's make it more fun, start two Erlang consoles, first one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

The second one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

Then what we need to do is put these Erlang nodes in cluster, so from any of them send a ping to the other:

% From node1 ping node2
net_adm:ping('[email protected]').
pong

Excellent, we have both nodes in cluster, thanks to the beauty of Distributed Erlang. So, let's repeat the above exercise but now in two nodes.

In the node1 create a handler and subscription to some topic:

% create a callback fun to use ebus_proc utility
CB1 = fun(Msg) ->
  io:format("CB1: ~p~n", [Msg])
end
#Fun<erl_eval.6.54118792>

% other callback but receiving additional arguments,
% which may be used when message arrives
CB2 = fun(Msg, Args) ->
  io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])
end.
#Fun<erl_eval.12.54118792>

% use ebus_proc utility to spawn a handler
H1 = ebus_proc:spawn_handler(CB1).
<0.70.0>
H2 = ebus_proc:spawn_handler(CB2, ["any_ctx"]).
<0.72.0>

% subscribe handlers
ebus:sub(H1, "foo").
ok
ebus:sub(H2, "foo").
ok

Repeat the same thing above in node2.

Once you have handlers subscribed to the same channel in both nodes, publish some messages from any node:

% publish message
ebus:pub("foo", {foo, "again"}).
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
ok

And in the other node you will see those messages have arrived too:

CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"

Let's check subscribers, so from any Erlang console:

% returns local and remote subscribers
ebus:subscribers("foo").
[<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]

You can also check the tests for more examples about using ebus.

So far, so good! Let's continue!

Point-To-Point Example

The great thing here is that you don't need something special to implement a point-to-point behavior. It is as simple as this:

ebus:dispatch("topic1", #{payload => "M1"}).

Dispatch function gets the subscribers and then picks one of them to send the message out. You can provide a dispatch function to pick up a subscriber, otherwise, a default function is provided (picks a subscriber random).

Dispatch function comes in 3 different flavors:

  • ebus:dispatch/2: receives the topic and the message.
  • ebus:dispatch/3: receives the topic, message and a list of options.
  • ebus:dispatch/4: same as previous but receives as 1st argument the name of the server, which is placed by default in the other functions.

Dispatch options are:

  • {scope, local | global}: allows you to choose if you want to pick a local subscriber o any. Default value: local.
  • {dispatch_fun, fun(([term()]) -> term())}: function to pick up a subscriber. If it isn't provided, a default random function is provided.

To see how this function is implemented go HERE.

Let's see an example:

% subscribe local process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% check that we have two subscribers
ebus:subscribers("foo").
[<0.57.0>,<0.38.0>]

% now dispatch a message (default dispatch fun and scope)
ebus:dispatch("foo", #{payload => foo}).
ok

% check that only one subscriber received the message
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[]

% dispatch with options
Fun = fun([H | _]) -> H end.
#Fun<erl_eval.6.54118792>
ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).
ok

% check again
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[<<"M1">>]

Extremely easy isn't?

Distributed Erlbus

Erlbus is distributed by nature, it doesn't require any additional/magical thing.

Once you have an Erlang cluster, messages are broadcasted using PG2, which is the default PubSub adapter. Remember, it's a Phoenix PubSub clone, so the architecture and design it's the same.

Phoenix Channels are supported on PubSub layer, which is the core. Take a look at this blog post.

Important links

Running Tests

make test

Building docs

make docs

Note: Once you run previous command, a new folder doc is created, and you'll have a pretty nice HTML documentation.

Change Log

All notable changes to this project will be documented in the CHANGELOG.md.

Copyright and License

Original work Copyright (c) 2014 Chris McCord

Modified work Copyright (c) 2016 Carlos Andres Bolaños

Erlbus source code is licensed under the MIT License.

NOTE: Pub/Sub implementation was taken from Phoenix Framework.