From e9e5037ddbaa3526c87d267f5c8bace9a6387c2a Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Tue, 9 Nov 2021 12:32:57 +0100 Subject: [PATCH] chore(docs) update docs --- LICENSE | 4 +- README.md | 133 ++------- docs/config.ld | 26 +- docs_topics/01-installation.md | 15 + docs_topics/02-dependencies.md | 31 ++ docs_topics/03-lua_versions.md | 9 + docs_topics/04-mqtt_versions.md | 8 + docs_topics/05-connectors.md | 30 ++ examples/openresty/README.md | 3 +- .../{luamqtt-example.lua => openresty.lua} | 20 +- examples/openresty/conf/nginx.conf | 2 +- mqtt/client.lua | 281 ++++++++++-------- mqtt/connector/copas.lua | 26 +- mqtt/connector/init.lua | 25 +- mqtt/connector/luasocket.lua | 23 +- mqtt/connector/nginx.lua | 27 +- mqtt/init.lua | 69 ++--- mqtt/ioloop.lua | 122 ++++---- mqtt/loop/copas.lua | 12 +- mqtt/loop/init.lua | 28 +- mqtt/loop/ioloop.lua | 10 + mqtt/loop/nginx.lua | 14 +- 22 files changed, 553 insertions(+), 365 deletions(-) create mode 100644 docs_topics/01-installation.md create mode 100644 docs_topics/02-dependencies.md create mode 100644 docs_topics/03-lua_versions.md create mode 100644 docs_topics/04-mqtt_versions.md create mode 100644 docs_topics/05-connectors.md rename examples/openresty/app/{luamqtt-example.lua => openresty.lua} (85%) diff --git a/LICENSE b/LICENSE index 1bd5a40..0233293 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -MIT License +# MIT License -Copyright (c) 2018 Alexander Kiranov +Copyright (c) 2018-2021 Alexander Kiranov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 04a8e4e..dfcf1f3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -# luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client +# luamqtt + +Pure-lua MQTT v3.1.1 and v5.0 client ![luamqtt logo](./logo.svg) @@ -17,7 +19,7 @@ This library is written in **pure-lua** to provide maximum portability. * Full MQTT v3.1.1 client-side support * Full MQTT v5.0 client-side support -* Several long-living MQTT clients in one script thanks to ioloop +* Support for Copas, OpenResty/Nginx, and an included lightweight ioloop. # Documentation @@ -31,116 +33,6 @@ See [flespi forum thread](https://forum.flespi.com/d/97-luamqtt-mqtt-client-writ [https://github.com/xHasKx/luamqtt](https://github.com/xHasKx/luamqtt) -# Dependencies - -The only main dependency is a [**luasocket**](https://luarocks.org/modules/luasocket/luasocket) to establishing TCP connection to the MQTT broker. Install it like this: - -```sh -luarocks install luasocket -``` - -On Lua 5.1 it also depends on [**LuaBitOp**](http://bitop.luajit.org/) (**bit**) library to perform bitwise operations. -It's not listed in package dependencies, please install it manually like this: - -```sh -luarocks install luabitop -``` - -## luasec (SSL/TLS) - -To establish secure network connection (SSL/TSL) to MQTT broker -you also need [**luasec**](https://github.com/brunoos/luasec) module, please install it manually like this: - -```sh -luarocks install luasec -``` - -This stage is optional and may be skipped if you don't need the secure network connection (e.g. broker is located in your local network). - -# Lua versions - -It's tested to work on Debian 9 GNU/Linux with Lua versions: -* Lua 5.1 ... Lua 5.3 (**i.e. any modern Lua version**) -* LuaJIT 2.0.0 ... LuaJIT 2.1.0 beta3 -* It may also work on other Lua versions without any guarantees - -Also I've successfully run it under **Windows** and it was ok, but installing luarock-modules may be a non-trivial task on this OS. - -# Installation - -As the luamqtt is almost zero-dependency you have to install required Lua libraries by yourself, before using the luamqtt library: - -```sh -luarocks install luasocket # optional if you will use your own connectors (see below) -luarocks install luabitop # you don't need this for lua 5.3 -luarocks install luasec # you don't need this if you don't want to use SSL connections -``` - -Then you may install the luamqtt library itself: - -```sh -luarocks install luamqtt -``` - -[LuaRocks page](http://luarocks.org/modules/xhaskx/luamqtt) - -# Examples - -Here is a short version of [`examples/simple.lua`](examples/simple.lua): - -```lua --- load mqtt library -local mqtt = require("mqtt") - --- create MQTT client, flespi tokens info: https://flespi.com/kb/tokens-access-keys-to-flespi-platform -local client = mqtt.client{ uri = "mqtt.flespi.io", username = os.getenv("FLESPI_TOKEN"), clean = true } - --- assign MQTT client event handlers -client:on{ - connect = function(connack) - if connack.rc ~= 0 then - print("connection to broker failed:", connack:reason_string(), connack) - return - end - - -- connection established, now subscribe to test topic and publish a message after - assert(client:subscribe{ topic="luamqtt/#", qos=1, callback=function() - assert(client:publish{ topic = "luamqtt/simpletest", payload = "hello" }) - end}) - end, - - message = function(msg) - assert(client:acknowledge(msg)) - - -- receive one message and disconnect - print("received message", msg) - client:disconnect() - end, -} - --- run ioloop for client -mqtt.run_ioloop(client) -``` - -More examples placed in [`examples/`](examples/) directory. Also checkout tests in [`tests/spec/mqtt-client.lua`](tests/spec/mqtt-client.lua) - -Also you can learn MQTT protocol by reading [`tests/spec/protocol4-make.lua`](tests/spec/protocol4-make.lua) and [`tests/spec/protocol4-parse.lua`](tests/spec/protocol4-parse.lua) tests - -# Connectors - -Connector is a network connection layer for luamqtt. There is a three standard connectors included: - -* [`luasocket`](mqtt/luasocket.lua) -* [`luasocket_ssl`](mqtt/luasocket_ssl.lua) -* [`ngxsocket`](mqtt/ngxsocket.lua) - for using in [openresty environment](examples/openresty) - -The `luasocket` or `luasocket_ssl` connector will be used by default, if not specified, according `secure=true/false` option per MQTT client. - -In simple terms, connector is a set of functions to establish a network stream (TCP connection usually) and send/receive data through it. -Every MQTT client instance may have their own connector. - -And it's very simple to implement your own connector to make luamqtt works in your environment. - # Bugs & contributing Please [file a GitHub issue](https://github.com/xHasKx/luamqtt/issues) if you found any bug. @@ -149,10 +41,21 @@ And of course, any contribution are welcome! # Tests -To run tests in this git repo you need [**busted**](https://luarocks.org/modules/olivine-labs/busted): +To run tests in this git repo you need [**busted**](https://luarocks.org/modules/olivine-labs/busted) as well as some dependencies: + +Prepare: +```sh +luarocks install busted +luarocks install luacov +luarocks install luasocket +luarocks install luasec +luarocks install copas +luarocks install lualogging +``` +Running the tests: ```sh -busted -e 'package.path="./?/init.lua;./?.lua;"..package.path' tests/spec/*.lua +busted ``` There is a script to run all tests for all supported lua versions, using [hererocks](https://github.com/mpeterv/hererocks): @@ -169,7 +72,7 @@ To collect code coverage stats - install luacov using luarocks and then execute: ```sh # collect stats during tests -busted -v -e 'package.path="./?/init.lua;./?.lua;"..package.path;require("luacov.runner")(".luacov")' tests/spec/*.lua +busted --coverage # generate report into luacov.report.out file luacov diff --git a/docs/config.ld b/docs/config.ld index e6b394e..8d97a61 100644 --- a/docs/config.ld +++ b/docs/config.ld @@ -1,7 +1,21 @@ -- usage: -- execute `ldoc .` in this docs directory -file = {"../mqtt/init.lua", "../mqtt/client.lua", "../mqtt/ioloop.lua"} +file = { + "../mqtt/init.lua", + "../mqtt/client.lua", + "../mqtt/ioloop.lua", + + "../mqtt/loop/init.lua", + "../mqtt/loop/copas.lua", + "../mqtt/loop/nginx.lua", + "../mqtt/loop/ioloop.lua", + + "../mqtt/connector/init.lua", + "../mqtt/connector/copas.lua", + "../mqtt/connector/nginx.lua", + "../mqtt/connector/luasocket.lua", +} project = "luamqtt" package = "mqtt" dir = "." @@ -12,11 +26,17 @@ full_description = "Source code: https://github.com/xHasKx/luamqtt" examples = { "../examples/simple.lua", - "../examples/sync.lua", "../examples/mqtt5-simple.lua", + "../examples/copas.lua", + "../examples/openresty/app/openresty.lua", } -topics = {"../README.md", "../LICENSE"} +use_markdown_titles = true +topics = { + "../README.md", + "../LICENSE", + "../docs_topics/", +} format = "markdown" plain = true diff --git a/docs_topics/01-installation.md b/docs_topics/01-installation.md new file mode 100644 index 0000000..94f0cd9 --- /dev/null +++ b/docs_topics/01-installation.md @@ -0,0 +1,15 @@ +# Installation + +As luamqtt is almost zero-dependency you have to install any optional Lua libraries by +yourself, before using the luamqtt library. + +When installing using [LuaRocks](http://luarocks.org/modules/xhaskx/luamqtt), the +LuaSocket dependency will automatically be installed as well, as it is a listed dependency +in the rockspec. + + luarocks install luamqtt + +To install from source clone the repo and make sure the `./mqtt/` folder is in your +Lua search path. + +Check the [dependencies](./02-dependencies.md.html) on how (and when) to install those. diff --git a/docs_topics/02-dependencies.md b/docs_topics/02-dependencies.md new file mode 100644 index 0000000..aede65e --- /dev/null +++ b/docs_topics/02-dependencies.md @@ -0,0 +1,31 @@ +# Dependencies + +The dependencies differ slightly based on the environment you use, and the requirements you have: + +* [**luasocket**](https://luarocks.org/modules/luasocket/luasocket) to establish TCP connections to the MQTT broker. + This is a listed dependency in the luamqtt rockspec, so it will automatically be installed if you use LuaRocks to + install luamqtt. To install it manually: + luarocks install luasocket + +* [**copas**](https://github.com/keplerproject/copas) module for asynchoneous IO. Copas is an advanced co-routine + scheduler with far more features than the included `ioloop`. For anything more than a few devices, or for devices which + require network IO beyond mqtt alone, Copas is the better alternative. Copas is also pure-Lua, but has parallel network + IO (as opposed to sequential network IO in `ioloop`), and has features like; threads, timers, locks, semaphores, and + non-blocking clients for http(s), (s)ftp, and smtp. + luarocks install copas + +* [**luasec**](https://github.com/brunoos/luasec) module for SSL/TLS based connections. This is optional and may be + skipped if you don't need secure network connections (e.g. broker is located in your local network). It's not listed + in package dependencies, please install it manually like this: + luarocks install luasec + +* [**LuaBitOp**](http://bitop.luajit.org/) library to perform bitwise operations, which is required only on + Lua 5.1. It's not listed in package dependencies, please install it manually like this: + luarocks install luabitop + +* [**LuaLogging**](https://github.com/lunarmodules/lualogging/) to enable logging by the MQTT client. This is optional + but highly recommended for long running clients. This is a great debugging aid when developing your clients. Also when + using OpenResty as your runtime, you'll definitely want to use this, see + [openresty.lua](https://xhaskx.github.io/luamqtt/examples/openresty.lua.html) for an example. + It's not listed in package dependencies, please install it manually like this: + luarocks install lualogging diff --git a/docs_topics/03-lua_versions.md b/docs_topics/03-lua_versions.md new file mode 100644 index 0000000..3674f37 --- /dev/null +++ b/docs_topics/03-lua_versions.md @@ -0,0 +1,9 @@ +# Lua versions + +It's tested to work on Debian 9 GNU/Linux with Lua versions: + +* Lua 5.1 ... Lua 5.3 (**i.e. any modern Lua version**) +* LuaJIT 2.0.0 ... LuaJIT 2.1.0 beta3 +* It may also work on other Lua versions without any guarantees + +Also has run under **Windows** and it was ok, but installing luarocks-modules may be a non-trivial task on this OS. diff --git a/docs_topics/04-mqtt_versions.md b/docs_topics/04-mqtt_versions.md new file mode 100644 index 0000000..db10c1b --- /dev/null +++ b/docs_topics/04-mqtt_versions.md @@ -0,0 +1,8 @@ +# MQTT versions + +Currently supported versions: + +* [MQTT v3.1.1 protocol](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html) version. +* [MQTT v5.0 protocol](http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) version. + +Both protocols have full control packets support. diff --git a/docs_topics/05-connectors.md b/docs_topics/05-connectors.md new file mode 100644 index 0000000..9023962 --- /dev/null +++ b/docs_topics/05-connectors.md @@ -0,0 +1,30 @@ +# Connectors + +A connector is a network connection layer for luamqtt. This ensures clean separation between the socket +implementation and the client/protocol implementation. + +By default luamqtt ships with connectors for `ioloop`, `copas`, and `nginx`. It will auto-detect which +one to use using the `mqtt.loop` module. + +## building your own + +If you have a different socket implementation you can write your own connector. + +There are 2 base-classes `mqtt.connector.base.buffered-base` and `mqtt.connector.base.non-buffered-base` +to build on, which to pick depends on the environment. + +The main question is what event/io loop mechanism does your implementation have? + +* a single main (co)routione that runs, and doesn't yield when doing network IO. In this case + you should use the `buffered_base` and read on sockets with a `0` timeout. Check the + `mqtt.connector.luasocket` implementation for an example (this is what `ioloop` uses). + +* multiple co-routines that run within a scheduler, and doing non-blocking network IO (receive/send + will implicitly yield control to the scheduler so it will run other tasks until the socket is ready). + This is what Copas and Nginx do, and it requires the `non_buffered_base`. + +The main thing to look for when checking out the existing implementations is the network timeout settings, +and the returned `signals`. + + + diff --git a/examples/openresty/README.md b/examples/openresty/README.md index 1d8e659..6479473 100644 --- a/examples/openresty/README.md +++ b/examples/openresty/README.md @@ -24,5 +24,6 @@ timer itself will go in an endless loop to do the keepalives. # Files * [conf/nginx.conf](conf/nginx.conf): configuration for the nginx daemon to run lua scripts -* [app/luamqtt-example.lua](app/luamqtt-example.lua): example lua script maintaining connection +* [app/openresty.lua](app/openresty.lua): example lua script maintaining connection +* [mqtt/loop/nginx.lua](mqtt/loop/nginx.lua): how to add a client in an Nginx environment * `start.sh`, `stop.sh`, `quit.sh`, `restart.sh`: optional scripts to manage the OpenResty instance diff --git a/examples/openresty/app/luamqtt-example.lua b/examples/openresty/app/openresty.lua similarity index 85% rename from examples/openresty/app/luamqtt-example.lua rename to examples/openresty/app/openresty.lua index 8138e6c..baec0f4 100644 --- a/examples/openresty/app/luamqtt-example.lua +++ b/examples/openresty/app/openresty.lua @@ -33,14 +33,18 @@ local client = mqtt.client{ end -- subscribe to test topic and publish message after it - assert(self:subscribe{ topic="luamqtt/#", qos=1, callback=function() - -- publish test message - assert(self:publish{ - topic = "luamqtt/simpletest", - payload = "hello", - qos = 1 - }) - end}) + assert(self:subscribe { + topic = "luamqtt/#", + qos = 1, + callback = function() + -- publish test message + assert(self:publish{ + topic = "luamqtt/simpletest", + payload = "hello", + qos = 1 + }) + end + }) end, message = function(msg, self) diff --git a/examples/openresty/conf/nginx.conf b/examples/openresty/conf/nginx.conf index 7b4dfdb..6dc6fdd 100644 --- a/examples/openresty/conf/nginx.conf +++ b/examples/openresty/conf/nginx.conf @@ -24,6 +24,6 @@ stream { resolver 8.8.8.8; # the code file to execute - init_worker_by_lua_file "app/luamqtt-example.lua"; + init_worker_by_lua_file "app/openresty.lua"; } diff --git a/mqtt/client.lua b/mqtt/client.lua index 3e70db8..8016ade 100644 --- a/mqtt/client.lua +++ b/mqtt/client.lua @@ -1,40 +1,6 @@ ---- MQTT client module --- @module mqtt.client --- @alias client -local client = {} - --- event names: - --- "error": function(errmsg, client_object, [packet]) --- on errors --- optional packet: only if received CONNACK.rc ~= 0 when connecting - --- "close": function(connection_object, client_object) --- upon closing the connection --- connection object will have .close_reason (string) - --- "shutdown": function(client_object) --- upon shutting down the client (diconnecting an no more reconnects) - --- "connect": function(packet, client_object) --- upon a succesful connect, after receiving the CONNACK packet from the broker --- ???? => on a refused connect; if received CONNACK.rc ~= 0 when connecting - --- "subscribe": function(packet, client_object) --- upon a succesful subscription, after receiving the SUBACK packet from the broker - --- "unsubscribe": function(packet, client_object) --- upon a succesful unsubscription, after receiving the UNSUBACK packet from the broker - --- "message": function(packet, client_object) --- upon receiving a PUBLISH packet from the broker - --- "acknowledge": function(packet, client_object) --- upon receiving PUBACK --- upon receiving PUBREC (event fires after sending PUBREL) - --- "auth": function(packet, client_object) --- upon receiving an AUTH packet +--- This class contains the MQTT client implementation. +-- @classmod Client +local _M = {} ------- @@ -79,39 +45,48 @@ local log = require "mqtt.log" ------- --- MQTT client instance metatable --- @type client_mt -local client_mt = {} -client_mt.__index = client_mt +local Client = {} +Client.__index = Client ---- Create and initialize MQTT client instance +--- Create and initialize MQTT client instance. Typically this is not called directly, +-- but through `Client.create`. -- @tparam table opts MQTT client creation options table --- @tparam string opts.uri MQTT broker uri to connect. --- Expecting '[mqtt[s]://][username[:password]@]hostname[:port]' format. Any option specifically added to the options +-- @tparam string opts.uri MQTT broker uri to connect. Expected format: +--
`[mqtt[s]://][username[:password]@]hostname[:port]` +--
Any option specifically added to the options -- table will take precedence over the option specified in this uri. --- @tparam[opt] string opts.protocol either 'mqtt' or 'mqtts' +-- @tparam boolean opts.clean clean session start flag +-- @tparam[opt] string opts.protocol either `"mqtt"` or `"mqtts"` -- @tparam[opt] string opts.username username for authorization on MQTT broker -- @tparam[opt] string opts.password password for authorization on MQTT broker; not acceptable in absence of username --- @tparam[opt] string opts.hostnamne hostname of the MQTT broker to connect to --- @tparam[opt] int opts.port port number to connect to on the MQTT broker, defaults to 1883 port for plain or 8883 for secure network connections --- @tparam string opts.clean clean session start flag --- @tparam[opt=4] number opts.version MQTT protocol version to use, either 4 (for MQTT v3.1.1) or 5 (for MQTT v5.0). --- Also you may use special values mqtt.v311 or mqtt.v50 for this field. +-- @tparam[opt] string opts.host hostname of the MQTT broker to connect to +-- @tparam[opt] int opts.port port number to connect to on the MQTT broker, defaults to `1883` port for plain or `8883` for secure network connections +-- @tparam[opt=4] number opts.version MQTT protocol version to use, either `4` (for MQTT v3.1.1) or `5` (for MQTT v5.0). +-- Also you may use special values `mqtt.v311` or `mqtt.v50` for this field. -- @tparam[opt] string opts.id MQTT client ID, will be generated by luamqtt library if absent --- @tparam[opt=false] boolean,table opts.secure use secure network connection, provided by luasec lua module; --- set to true to select default params: { mode="client", protocol="any", verify="none", options={ "all","no_sslv2","no_sslv3","no_tlsv1" } --- or set to luasec-compatible table, for example with cafile="...", certificate="...", key="..." --- @tparam[opt] table opts.will will message table with required fields { topic="...", payload="..." } --- and optional fields { qos=1...3, retain=true/false } --- @tparam[opt=60] number opts.keep_alive time interval for client to send PINGREQ packets to the server when network connection is inactive +-- @tparam[opt=false] boolean,table opts.secure use secure network connection, provided by the lua module set in `opts.ssl_module`. +-- Set to true to select default parameters, check individual `mqtt.connectors` for supported options. +-- @tparam[opt] table opts.will will message table with required fields `{ topic="...", payload="..." }` +-- and optional fields `{ qos=0...2, retain=true/false }` +-- @tparam[opt=60] number opts.keep_alive time interval (in seconds) for client to send PINGREQ packets to the server when network connection is inactive -- @tparam[opt=false] boolean opts.reconnect force created MQTT client to reconnect on connection close. -- Set to number value to provide reconnect timeout in seconds. --- It's not recommended to use values < 3. See also `client_mt:shutdown`. +-- It's not recommended to use values `< 3`. See also `Client:shutdown`. -- @tparam[opt] table opts.connector connector table to open and send/receive packets over network connection. --- default is require("mqtt.connector") which tries to auto-detect. --- @tparam[opt="ssl"] string opts.ssl_module module name for the luasec-compatible ssl module, default is "ssl" +-- default is `require("mqtt.connector")` which tries to auto-detect. See `mqtt.connector`. +-- @tparam[opt="ssl"] string opts.ssl_module module name for the luasec-compatible ssl module, default is `"ssl"` -- may be used in some non-standard lua environments with own luasec-compatible ssl module --- @treturn client_mt MQTT client instance table -function client_mt:__init(opts) +-- @tparam[opt] table opts.on List of event-handlers. See `Client:on` for the format. +-- @treturn Client MQTT client instance table +-- @usage +-- local Client = require "mqtt.client" +-- +-- local my_client = Client.create { +-- uri = "mqtts://broker.host.com", +-- clean = true, +-- version = mqtt.v50, +-- } +function Client:__init(opts) if not luamqtt_VERSION then luamqtt_VERSION = require("mqtt")._VERSION end @@ -199,7 +174,7 @@ function client_mt:__init(opts) -- validate connection properties local test_conn = setmetatable({ uri = opts.uri }, a.connector) - client_mt._parse_connection_opts(a, test_conn) + Client._parse_connection_opts(a, test_conn) test_conn:validate() -- will table content check @@ -263,9 +238,47 @@ function client_mt:__init(opts) log:info("MQTT client '%s' created", a.id) end ---- Add functions as handlers of given events --- @param ... (event_name, function) or { event1 = func1, event2 = func2 } table -function client_mt:on(...) +--- Add functions as handlers of given events. +-- @tparam table events MQTT client creation options table +-- @tparam function events.connect `function(connack_packet, client_obj)`
+-- After a connect attempt, after receiving the CONNACK packet from the broker. +-- check `connack_packet.rc == 0` for a succesful connect. +-- @tparam functon events.error `function(errmsg, client_obj [, packet])`
+-- on errors, optional `packet` is only provided if the +-- received `CONNACK.rc ~= 0` when connecting. +-- @tparam functon events.close `function(connection_obj, client_obj)`
+-- upon closing the connection. `connection_obj.close_reason` +-- (string) will hold the close reason. +-- @tparam functon events.shutdown `function(client_obj)`
+-- upon shutting down the client (diconnecting an no more reconnects). +-- @tparam functon events.subscribe `function(suback_packet, client_obj)`
+-- upon a succesful subscription, after receiving the SUBACK packet from the broker +-- @tparam functon events.unsubscribe `function(unsuback_packet, client_obj)`
+-- upon a succesful unsubscription, after receiving the UNSUBACK packet from the broker +-- @tparam functon events.message `function(publish_packet, client_obj)`
+-- upon receiving a PUBLISH packet from the broker +-- @tparam functon events.acknowledge `function(ack_packet, client_obj)`
+-- upon receiving a PUBACK or PUBREC packet from the broker +-- @tparam functon events.auth `function(auth_packet, client_obj)`
+-- upon receiving an AUTH packet +-- @usage +-- client:on { +-- connect = function(pck, self) +-- if pck.rc ~= 0 then +-- return -- connection failed +-- end +-- -- succesfully connected +-- end, +-- message = function(pck, self) +-- -- handle received message +-- end, +-- } +-- +-- -- an alternative way to add individual handlers; +-- client:on("message", function(pck, self) +-- -- handle received message +-- end) +function Client:on(...) local nargs = select("#", ...) local events if nargs == 2 then @@ -296,10 +309,22 @@ local function remove_item(list, item) end end ---- Remove given function handler for specified event +--- Remove given function handler for specified event. -- @tparam string event event name to remove handler -- @tparam function func handler function to remove -function client_mt:off(event, func) +-- @usage +-- local handler = function(pck, self) +-- -- handle received message +-- end +-- +-- -- add event handler +-- client:on { +-- message = handler +-- } +-- +-- -- remove it again +-- client:off("message", handler) +function Client:off(event, func) local handlers = self.handlers[event] if not handlers then error("invalid event '"..tostring(event).."' to handle") @@ -327,7 +352,7 @@ end -- @tparam[opt] table opts.user_properties for MQTT v5.0 only: user properties for subscribe operation -- @tparam[opt] function opts.callback callback function to be called when subscription is acknowledged by broker -- @return packet id on success or false and error message on failure -function client_mt:subscribe(opts) +function Client:subscribe(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") @@ -396,9 +421,9 @@ end -- @tparam string opts.topic topic to unsubscribe -- @tparam[opt] table opts.properties properties for unsubscribe operation -- @tparam[opt] table opts.user_properties user properties for unsubscribe operation --- @tparam[opt] function opts.callback callback function to be called when subscription will be removed on broker +-- @tparam[opt] function opts.callback callback function to be called when the unsubscre is acknowledged by the broker -- @return packet id on success or false and error message on failure -function client_mt:unsubscribe(opts) +function Client:unsubscribe(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") @@ -460,9 +485,9 @@ end -- @tparam[opt=false] boolean opts.dup dup message publication flag -- @tparam[opt] table opts.properties properties for publishing message -- @tparam[opt] table opts.user_properties user properties for publishing message --- @tparam[opt] function opts.callback callback to call when publihsed message has been acknowledged by the broker +-- @tparam[opt] function opts.callback callback to call when published message has been acknowledged by the broker -- @return true or packet id on success or false and error message on failure -function client_mt:publish(opts) +function Client:publish(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") @@ -532,7 +557,7 @@ end -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure -function client_mt:acknowledge(msg, rc, properties, user_properties) +function Client:acknowledge(msg, rc, properties, user_properties) assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet") assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") @@ -602,12 +627,14 @@ function client_mt:acknowledge(msg, rc, properties, user_properties) return true end ---- Send DISCONNECT packet to the broker and close the connection +--- Send DISCONNECT packet to the broker and close the connection. +-- Note: if the client is set to automatically reconnect, it will do so. If you +-- want to disconnect and NOT reconnect, use `Client:shutdown`. -- @tparam[opt=0] number rc The Disconnect Reason Code value from MQTT v5.0 protocol -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure -function client_mt:disconnect(rc, properties, user_properties) +function Client:disconnect(rc, properties, user_properties) -- validate opts assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") @@ -644,15 +671,17 @@ function client_mt:disconnect(rc, properties, user_properties) return true end ---- Shutsdown the client. --- Disconnects if still connected, and disables reconnecting. --- Raises the "shutdown" event --- @param see `client_mt:disconnect`. -function client_mt:shutdown(rc, properties, user_properties) +--- Shuts the client down. +-- Disconnects if still connected, and disables reconnecting. If the client is +-- added to an ioloop, this will prevent an automatic reconnect. +-- Raises the "shutdown" event. +-- @param ... see `Client:disconnect` +-- @return `true` +function Client:shutdown(...) log:debug("client '%s' shutting down", self.opts.id) self.first_connect = false self.opts.reconnect = false - self:disconnect(rc, properties, user_properties) + self:disconnect(...) self:handle("shutdown", self) return true end @@ -662,7 +691,7 @@ end -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure -function client_mt:auth(rc, properties, user_properties) +function Client:auth(rc, properties, user_properties) -- validate opts assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") @@ -699,7 +728,7 @@ end --- Immediately close established network connection, without graceful session finishing with DISCONNECT packet -- @tparam[opt] string reason the reasong string of connection close -function client_mt:close_connection(reason) +function Client:close_connection(reason) assert(not reason or type(reason) == "string", "expecting reason to be a string") local conn = self.connection if not conn then @@ -720,7 +749,7 @@ end --- Start connecting to broker -- @return true on success or false and error message on failure -function client_mt:start_connecting() +function Client:start_connecting() -- open network connection local ok, err = self:open_connection() if not ok then @@ -743,7 +772,7 @@ end --- Send PINGREQ packet -- @return true on success or false and error message on failure -function client_mt:send_pingreq() +function Client:send_pingreq() -- check connection is alive if not self.connection then return false, "network connection is not opened" @@ -774,7 +803,7 @@ end --- Open network connection to the broker -- @return true on success or false and error message on failure -function client_mt:open_connection() +function Client:open_connection() if self.connection then return true end @@ -788,7 +817,7 @@ function client_mt:open_connection() wait_for_pubrec = {}, -- a table with packet_id of parially acknowledged sent packets in QoS 2 exchange process wait_for_pubrel = {}, -- a table with packet_id of parially acknowledged received packets in QoS 2 exchange process }, connector) - client_mt._parse_connection_opts(opts, conn) + Client._parse_connection_opts(opts, conn) log:info("client '%s' connecting to broker '%s' (using: %s)", self.opts.id, opts.uri, conn.type or "unknown") @@ -812,7 +841,7 @@ end --- Send CONNECT packet into opened network connection -- @return true on success or false and error message on failure -function client_mt:send_connect() +function Client:send_connect() -- check connection is alive if not self.connection then return false, "network connection is not opened" @@ -852,8 +881,11 @@ function client_mt:send_connect() end --- Checks last message send, and sends a PINGREQ if necessary. --- Use this function to check and send keep-alives when using an external event loop. --- @return time till next keep_alive, in case of errors (eg. not connected) the second return value is an error string +-- Use this function to check and send keep-alives when using an external event loop. When using the +-- included modules to add clients (see `mqtt.loop`), this will be taken care of automatically. +-- @treturn[1] number time till next keep_alive (in seconds) +-- @treturn[2] number time till next keep_alive (in seconds) +-- @treturn[2] string in case of errors (eg. not connected) the second return value is an error string -- @usage -- -- example using a Copas event loop to send and check keep-alives -- copas.addthread(function() @@ -864,7 +896,7 @@ end -- copas.sleep(my_client:check_keep_alive()) -- end -- end) -function client_mt:check_keep_alive() +function Client:check_keep_alive() local interval = self.opts.keep_alive if not self.connection then return interval, "network connection is not opened" @@ -902,7 +934,7 @@ end -- Send PUBREL acknowledge packet - second phase of QoS 2 exchange -- Returns true on success or false and error message on failure -function client_mt:acknowledge_pubrel(packet_id) +function Client:acknowledge_pubrel(packet_id) -- check connection is alive if not self.connection then return false, "network connection is not opened" @@ -928,7 +960,7 @@ end -- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange -- Returns true on success or false and error message on failure -function client_mt:acknowledge_pubcomp(packet_id) +function Client:acknowledge_pubcomp(packet_id) -- check connection is alive if not self.connection then return false, "network connection is not opened" @@ -953,18 +985,18 @@ function client_mt:acknowledge_pubcomp(packet_id) end -- Call specified event handlers -function client_mt:handle(event, ...) +function Client:handle(event, ...) local handlers = self.handlers[event] if not handlers then error("invalid event '"..tostring(event).."' to handle") end - self._handling[event] = true -- protecting self.handlers[event] table from modifications by client_mt:off() when iterating + self._handling[event] = true -- protecting self.handlers[event] table from modifications by Client:off() when iterating for _, handler in ipairs(handlers) do handler(...) end self._handling[event] = nil - -- process handlers removing, scheduled by client_mt:off() + -- process handlers removing, scheduled by Client:off() local to_remove = self._to_remove_handlers[event] if to_remove then for _, func in ipairs(to_remove) do @@ -977,7 +1009,7 @@ end -- Internal methods -- Assign next packet id for given packet creation opts -function client_mt:_assign_packet_id(pargs) +function Client:_assign_packet_id(pargs) if not pargs.packet_id then if packet_id_required(pargs) then self._last_packet_id = next_packet_id(self._last_packet_id) @@ -987,7 +1019,7 @@ function client_mt:_assign_packet_id(pargs) end -- Handle a single received packet -function client_mt:handle_received_packet(packet) +function Client:handle_received_packet(packet) local conn = self.connection local err @@ -1107,22 +1139,22 @@ do --- Performs a single IO loop step. -- It will connect if not connected, will re-connect if set to. - -- This should be called repeatedly in a loop. + -- This should be called repeatedly in a loop. When using the included modules to + -- add clients (see `mqtt.loop`), this will be taken care of automatically. -- -- The return value is the time after which this method must be called again. - -- It can be called sooner, but shouldn't be called later. Return values: - -- - -- - `0`; a packet was succesfully handled, so retry immediately, no delays, - -- in case additional data is waiting to be read on the socket. - -- - `>0`; The reconnect timer needs a delay before it can retry (calling - -- sooner is not a problem, it will only reconnect when the delay - -- has actually passed) - -- - `-1`; the socket read timed out, so it is idle. This return code is only - -- returned with buffered connectors (luasocket), never for yielding sockets - -- (Copas or OpenResty) - -- - -- @return time after which to retry or nil+error - function client_mt:step() + -- It can be called sooner, but shouldn't be called later. + -- @return[1] `-1`: the socket read timed out, so it is idle. This return code is only + -- returned with buffered connectors (luasocket), never for yielding sockets + -- (Copas or OpenResty) + -- @return[2] `0`: a packet was succesfully handled, so retry immediately, no delays, + -- in case additional data is waiting to be read on the socket. + -- @return[3] `>0`: The reconnect timer needs a delay before it can retry (calling + -- sooner is not a problem, it will only reconnect when the delay + -- has actually passed) + -- @return[4] nil + -- @return[4] error message + function Client:step() local conn = self.connection local reconnect = self.opts.reconnect @@ -1161,7 +1193,7 @@ end -- Fill given connection table with host and port according given opts -- uri: mqtt[s]://[username][:password]@host.domain[:port] -function client_mt._parse_connection_opts(opts, conn) +function Client._parse_connection_opts(opts, conn) local uri = assert(conn.uri) -- protocol @@ -1263,7 +1295,7 @@ function client_mt._parse_connection_opts(opts, conn) end -- Send given packet to opened network connection -function client_mt:_send_packet(packet) +function Client:_send_packet(packet) local conn = self.connection if not conn then return false, "network connection is not opened" @@ -1283,7 +1315,7 @@ function client_mt:_send_packet(packet) end -- Receive one packet from established network connection -function client_mt:_receive_packet() +function Client:_receive_packet() local conn = self.connection if not conn then return false, "network connection is not opened" @@ -1313,12 +1345,12 @@ function client_mt:_receive_packet() end -- Represent MQTT client as string -function client_mt:__tostring() +function Client:__tostring() return str_format("mqtt.client{id=%q}", tostring(self.opts.id)) end -- Garbage collection handler -function client_mt:__gc() +function Client:__gc() -- close network connection if it's available, without sending DISCONNECT packet if self.connection then self:close_connection("garbage") @@ -1329,12 +1361,13 @@ end -- @section exported --- Create, initialize and return new MQTT client instance --- @param ... see arguments of client_mt:__init(opts) --- @see client_mt:__init --- @treturn client_mt MQTT client instance -function client.create(...) - local cl = setmetatable({}, client_mt) - cl:__init(...) +-- @name client.create +-- @param ... see arguments of `Client:__init` +-- @see Client:__init +-- @treturn Client MQTT client instance +function _M.create(opts) + local cl = setmetatable({}, Client) + cl:__init(opts) return cl end @@ -1342,10 +1375,10 @@ end if _G._TEST then -- export functions for test purposes (different name!) - client.__parse_connection_opts = client_mt._parse_connection_opts + _M.__parse_connection_opts = Client._parse_connection_opts end -- export module table -return client +return _M -- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/mqtt/connector/copas.lua b/mqtt/connector/copas.lua index 1369f1a..83453af 100644 --- a/mqtt/connector/copas.lua +++ b/mqtt/connector/copas.lua @@ -1,7 +1,27 @@ --- DOC: https://keplerproject.github.io/copas/ --- NOTE: you will need to install copas like this: luarocks install copas +--- Copas based connector. +-- +-- Copas is an advanced coroutine scheduler in pure-Lua. It uses LuaSocket +-- under the hood, but in a non-blocking way. It also uses LuaSec for TLS +-- based connections (like the `mqtt.connector.luasocket` one). And hence uses +-- the same defaults for the `secure` option when creating the `client`. +-- +-- Caveats: +-- +-- * the `client` option `ssl_module` is not supported by the Copas connector, +-- It will always use the module named `ssl`. +-- +-- * multiple threads cannot send simultaneously (simple scenarios will just +-- work) +-- +-- * since the client creates a long lived connection for reading, it returns +-- upon receiving a packet, to call an event handler. The handler must return +-- swiftly, since while the handler runs the socket will not be reading. +-- Any task that might take longer than a few milliseconds should be off +-- loaded to another thread. +-- +-- NOTE: you will need to install copas like this: `luarocks install copas`. +-- @module mqtt.connector.copas --- module table local super = require "mqtt.connector.base.non_buffered_base" local connector = setmetatable({}, super) connector.__index = connector diff --git a/mqtt/connector/init.lua b/mqtt/connector/init.lua index 826e587..20db5fa 100644 --- a/mqtt/connector/init.lua +++ b/mqtt/connector/init.lua @@ -1,6 +1,25 @@ ---- auto detect the connector to use. --- This is based on a.o. libraries already loaded, so 'require' this --- module as late as possible (after the other modules) +--- Auto detect the connector to use. +-- The different environments require different socket implementations to work +-- properly. The 'connectors' are an abstraction to facilitate that without +-- having to modify the client itself. +-- +-- This module is will auto-detect the environment and return the proper +-- module from; +-- +-- * `mqtt.connector.nginx` for using the non-blocking OpenResty co-socket apis +-- +-- * `mqtt.connector.copas` for the non-blocking Copas wrapped sockets +-- +-- * `mqtt.connector.luasocket` for LuaSocket based sockets (blocking) +-- +-- Since the selection is based on a.o. packages loaded, make sure that in case +-- of using the `copas` scheduler, you require it before the `mqtt` modules. +-- +-- Since the `client` defaults to this module (`mqtt.connector`) there typically +-- is no need to use this directly. When implementing your own connectors, +-- the included connectors provide good examples of what to look out for. +-- @module mqtt.connector + local loops = setmetatable({ copas = "mqtt.connector.copas", nginx = "mqtt.connector.nginx", diff --git a/mqtt/connector/luasocket.lua b/mqtt/connector/luasocket.lua index 7f27702..6a25431 100644 --- a/mqtt/connector/luasocket.lua +++ b/mqtt/connector/luasocket.lua @@ -1,6 +1,25 @@ --- DOC: http://w3.impa.br/~diego/software/luasocket/tcp.html +--- LuaSocket (and LuaSec) based connector. +-- +-- This connector works with the blocking LuaSocket sockets. This connector uses +-- `LuaSec` for TLS connections. This is the connector used for the included +-- `mqtt.ioloop` scheduler. +-- +-- When using TLS / MQTTS connections, the `secure` option passed to the `client` +-- when creating it, can be the standard table of options as used by LuaSec +-- for creating a context. When omitted the defaults will be; +-- `{ mode="client", protocol="any", verify="none", +-- options={ "all", "no_sslv2", "no_sslv3", "no_tlsv1" } }` +-- +-- Caveats: +-- +-- * since the client creates a long lived connection for reading, it returns +-- upon receiving a packet, to call an event handler. The handler must return +-- swiftly, since while the handler runs the socket will not be reading. +-- Any task that might take longer than a few milliseconds should be off +-- loaded to another task. +-- +-- @module mqtt.connector.luasocket --- module table local super = require "mqtt.connector.base.buffered_base" local luasocket = setmetatable({}, super) luasocket.__index = luasocket diff --git a/mqtt/connector/nginx.lua b/mqtt/connector/nginx.lua index 61912fa..00bdadb 100644 --- a/mqtt/connector/nginx.lua +++ b/mqtt/connector/nginx.lua @@ -1,5 +1,30 @@ --- module table +--- Nginx OpenResty co-sockets based connector. +-- +-- This connector works with the non-blocking openresty sockets. Note that the +-- secure setting haven't been implemented yet. It will simply use defaults +-- when doing a TLS handshake. +-- +-- Caveats: +-- +-- * sockets cannot cross phase/context boundaries. So all client interaction +-- must be done from the timer context in which the client threads run. +-- +-- * multiple threads cannot send simultaneously (simple scenarios will just +-- work) +-- +-- * since the client creates a long lived connection for reading, it returns +-- upon receiving a packet, to call an event handler. The handler must return +-- swiftly, since while the handler runs the socket will not be reading. +-- Any task that might take longer than a few milliseconds should be off +-- loaded to another thread. +-- +-- * Nginx timers should be short lived because memory is only released after +-- the context is destroyed. In this case we're using the fro prolonged periods +-- of time, so be aware of this and implement client restarts if required. +-- -- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13 +-- @module mqtt.connector.nginx + local super = require "mqtt.connector.base.non_buffered_base" local ngxsocket = setmetatable({}, super) ngxsocket.__index = ngxsocket diff --git a/mqtt/init.lua b/mqtt/init.lua index fa2f36a..f81c9a6 100644 --- a/mqtt/init.lua +++ b/mqtt/init.lua @@ -13,11 +13,17 @@ CONVENTIONS: ]] ---- Module table +--- Module level constants -- @field v311 MQTT v3.1.1 protocol version constant -- @field v50 MQTT v5.0 protocol version constant -- @field _VERSION luamqtt version string -- @table mqtt +-- @usage +-- local client = mqtt.client { +-- uri = "mqtts://aladdin:soopersecret@mqttbroker.com", +-- clean = true, +-- version = mqtt.v50, -- specify constant for MQTT version +-- } local mqtt = { -- supported MQTT protocol versions v311 = 4, -- supported protocol version, MQTT v3.1.1 @@ -40,20 +46,24 @@ local ioloop = require("mqtt.ioloop") local ioloop_get = ioloop.get --- Create new MQTT client instance --- @param ... Same as for mqtt.client.create(...) --- @see mqtt.client.client_mt:__init +-- @param ... Same as for `Client.create`(...) +-- @see Client:__init function mqtt.client(...) return client_create(...) end ---- Returns default ioloop instance +--- Returns default `ioloop` instance. Shortcut to `Ioloop.get`. -- @function mqtt.get_ioloop +-- @see Ioloop.get mqtt.get_ioloop = ioloop_get ---- Run default ioloop for given MQTT clients or functions. --- @param ... MQTT clients or loop functions to add to ioloop --- @see mqtt.ioloop.get --- @see mqtt.ioloop.run_until_clients +--- Run default `ioloop` for given MQTT clients or functions. +-- Will not return until all clients/functions have exited. +-- @param ... MQTT clients or loop functions to add to ioloop, see `Ioloop:add` for details on functions. +-- @see Ioloop.get +-- @see Ioloop.run_until_clients +-- @usage +-- mqtt.run_ioloop(client1, client2, func1) function mqtt.run_ioloop(...) log:info("starting default ioloop instance") local loop = ioloop_get() @@ -64,27 +74,11 @@ function mqtt.run_ioloop(...) return loop:run_until_clients() end ---- Run synchronous input/output loop for only one given MQTT client. --- Provided client's connection will be opened. --- Client reconnect feature will not work, and keep_alive too. --- @param cl MQTT client instance to run -function mqtt.run_sync(cl) - local ok, err = cl:start_connecting() - if not ok then - return false, err - end - while cl.connection do - ok, err = cl:_sync_iteration() - if not ok then - return false, err - end - end -end - --- Validates a topic with wildcards. -- @param t (string) wildcard topic to validate -- @return topic, or false+error +-- @usage local t = assert(mqtt.validate_subscribe_topic("base/+/thermostat/#")) function mqtt.validate_subscribe_topic(t) if type(t) ~= "string" then return false, "not a string" @@ -122,6 +116,7 @@ end --- Validates a topic without wildcards. -- @param t (string) topic to validate -- @return topic, or false+error +-- @usage local t = assert(mqtt.validate_publish_topic("base/living/thermostat/setpoint")) function mqtt.validate_publish_topic(t) if type(t) ~= "string" then return false, "not a string" @@ -160,30 +155,22 @@ function mqtt.compile_topic_pattern(t) end --- Parses wildcards in a topic into a table. --- Options include: --- --- - `opts.topic`: the wild-carded topic to match against (optional if `opts.pattern` is given) --- --- - `opts.pattern`: the compiled pattern for the wild-carded topic (optional if `opts.topic` +-- @tparam topic string incoming topic string +-- @tparam table opts parsing options table +-- @tparam string opts.topic the wild-carded topic to match against (optional if `opts.pattern` is given) +-- @tparam string opts.pattern the compiled pattern for the wild-carded topic (optional if `opts.topic` -- is given). If not given then topic will be compiled and the result will be -- stored in this field for future use (cache). --- --- - `opts.keys`: (optional) array of field names. The order must be the same as the +-- @tparam array opts.keys array of field names. The order must be the same as the -- order of the wildcards in `topic` --- --- Returned tables: --- --- - `fields` table: the array part will have the values of the wildcards, in +-- @return[1] `fields` table: the array part will have the values of the wildcards, in -- the order they appeared. The hash part, will have the field names provided -- in `opts.keys`, with the values of the corresponding wildcard. If a `#` -- wildcard was used, that one will be the last in the table. --- --- - `varargs` table: will only be returned if the wildcard topic contained the +-- @return[1] `varargs` table: will only be returned if the wildcard topic contained the -- `#` wildcard. The returned table is an array, with all segments that were -- matched by the `#` wildcard. --- @param topic (string) incoming topic string (required) --- @param opts (table) with options (required) --- @return fields (table) + varargs (table or nil), or false+err on error. +-- @return[2] false+err on error, eg. topic didn't match or pattern was invalid. -- @usage -- local opts = { -- topic = "homes/+/+/#", diff --git a/mqtt/ioloop.lua b/mqtt/ioloop.lua index 5a11148..94b895f 100644 --- a/mqtt/ioloop.lua +++ b/mqtt/ioloop.lua @@ -1,33 +1,25 @@ ---- ioloop module --- @module mqtt.ioloop --- @alias ioloop - ---[[ - ioloop module - - In short: allowing you to work with several MQTT clients in one script, and allowing them to maintain - a long-term connection to broker, using PINGs. - - NOTE: this module will work only with MQTT clients using standard luasocket/luasocket_ssl connectors. - - In long: - Providing an IO loop instance dealing with efficient (as much as possible in limited lua IO) network communication - for several MQTT clients in the same OS thread. - The main idea is that you are creating an ioloop instance, then adding created and connected MQTT clients to it. - The ioloop instance is setting a non-blocking mode for sockets in MQTT clients and setting a small timeout - for their receive/send operations. Then ioloop is starting an endless loop trying to receive/send data for all added MQTT clients. - You may add more or remove some MQTT clients from the ioloop after it's created and started. - - Using that ioloop is allowing you to run a MQTT client for long time, through sending PINGREQ packets to broker - in keepAlive interval to maintain long-term connection. - - Also, any function can be added to the ioloop instance, and it will be called in the same endless loop over and over - alongside with added MQTT clients to provide you a piece of processor time to run your own logic (like running your own - network communications or any other thing good working in an io-loop) -]] - --- module table -local ioloop = {} +--- This class contains the ioloop implementation. +-- +-- In short: allowing you to work with several MQTT clients in one script, and allowing them to maintain +-- a long-term connection to broker, using PINGs. This is the bundled alternative to Copas and Nginx. +-- +-- NOTE: this module will work only with MQTT clients using the `connector.luasocket` connector. +-- +-- Providing an IO loop instance dealing with efficient (as much as possible in limited lua IO) network communication +-- for several MQTT clients in the same OS thread. +-- The main idea is that you are creating an ioloop instance, then adding MQTT clients to it. +-- Then ioloop is starting an endless loop trying to receive/send data for all added MQTT clients. +-- You may add more or remove some MQTT clients to/from the ioloop after it has been created and started. +-- +-- Using an ioloop is allowing you to run a MQTT client for long time, through sending PINGREQ packets to broker +-- in keepAlive interval to maintain long-term connection. +-- +-- Also, any function can be added to the ioloop instance, and it will be called in the same endless loop over and over +-- alongside with added MQTT clients to provide you a piece of processor time to run your own logic (like running your own +-- network communications or any other thing good working in an io-loop) +-- @classmod Ioloop + +local _M = {} -- load required stuff local log = require "mqtt.log" @@ -44,18 +36,17 @@ local math = require("math") local math_min = math.min --- ioloop instances metatable --- @type ioloop_mt -local ioloop_mt = {} -ioloop_mt.__index = ioloop_mt +local Ioloop = {} +Ioloop.__index = Ioloop ---- Initialize ioloop instance +--- Initialize ioloop instance. -- @tparam table opts ioloop creation options table -- @tparam[opt=0] number opts.sleep_min min sleep interval after each iteration -- @tparam[opt=0.002] number opts.sleep_step increase in sleep after every idle iteration -- @tparam[opt=0.030] number opts.sleep_max max sleep interval after each iteration -- @tparam[opt=luasocket.sleep] function opts.sleep_function custom sleep function to call after each iteration --- @treturn ioloop_mt ioloop instance -function ioloop_mt:__init(opts) +-- @treturn Ioloop ioloop instance +function Ioloop:__init(opts) log:debug("initializing ioloop instance '%s'", tostring(self)) opts = opts or {} opts.sleep_min = opts.sleep_min or 0 @@ -68,10 +59,32 @@ function ioloop_mt:__init(opts) self.running = false --ioloop running flag, used by MQTT clients which are adding after this ioloop started to run end ---- Add MQTT client or a loop function to the ioloop instance +--- Add MQTT client or a loop function to the ioloop instance. +-- When adding a function, the function should on each call return the time (in seconds) it wishes to sleep. The ioloop +-- will sleep after each iteration based on what clients/functions returned. So the function may be called sooner than +-- the requested time, but will not be called later. -- @tparam client_mt|function client MQTT client or a loop function to add to ioloop -- @return true on success or false and error message on failure -function ioloop_mt:add(client) +-- @usage +-- -- create a timer on a 1 second interval +-- local timer do +-- local interval = 1 +-- local next_call = socket.gettime() + interval +-- timer = function() +-- if next_call >= socket.gettime() then +-- +-- -- do stuff here +-- +-- next_call = socket.gettime() + interval +-- return interval +-- else +-- return next_call - socket.gettime() +-- end +-- end +-- end +-- +-- loop:add(timer) +function Ioloop:add(client) local clients = self.clients if clients[client] then if type(client) == "table" then @@ -108,7 +121,7 @@ end --- Remove MQTT client or a loop function from the ioloop instance -- @tparam client_mt|function client MQTT client or a loop function to remove from ioloop -- @return true on success or false and error message on failure -function ioloop_mt:remove(client) +function Ioloop:remove(client) local clients = self.clients if not clients[client] then if type(client) == "table" then @@ -142,7 +155,7 @@ end --- Perform one ioloop iteration. -- TODO: make this smarter do not wake-up functions or clients returning a longer -- sleep delay. Currently they will be tried earlier if another returns a smaller delay. -function ioloop_mt:iteration() +function Ioloop:iteration() local opts = self.opts local sleep = opts.sleep_max @@ -183,8 +196,10 @@ function ioloop_mt:iteration() end end ---- Run ioloop while there is at least one client/function in the ioloop -function ioloop_mt:run_until_clients() +--- Run the ioloop. +-- While there is at least one client/function in the ioloop it will continue +-- iterating. After all clients/functions are gone, it will return. +function Ioloop:run_until_clients() log:info("ioloop started with %d clients/functions", #self.clients) self.running = true @@ -196,32 +211,35 @@ function ioloop_mt:run_until_clients() log:info("ioloop finished with %d clients/functions", #self.clients) end -------- +--- Exported functions +-- @section exported + --- Create IO loop instance with given options --- @see ioloop_mt:__init --- @treturn ioloop_mt ioloop instance -local function ioloop_create(opts) - local inst = setmetatable({}, ioloop_mt) +-- @name ioloop.create +-- @see Ioloop:__init +-- @treturn Ioloop ioloop instance +function _M.create(opts) + local inst = setmetatable({}, Ioloop) inst:__init(opts) return inst end -ioloop.create = ioloop_create -- Default ioloop instance local ioloop_instance --- Returns default ioloop instance +-- @name ioloop.get -- @tparam[opt=true] boolean autocreate Automatically create ioloop instance -- @tparam[opt] table opts Arguments for creating ioloop instance --- @treturn ioloop_mt ioloop instance -function ioloop.get(autocreate, opts) +-- @treturn Ioloop ioloop instance +function _M.get(autocreate, opts) if autocreate == nil then autocreate = true end if autocreate and not ioloop_instance then log:info("auto-creating default ioloop instance") - ioloop_instance = ioloop_create(opts) + ioloop_instance = _M.create(opts) end return ioloop_instance end @@ -229,6 +247,6 @@ end ------- -- export module table -return ioloop +return _M -- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/mqtt/loop/copas.lua b/mqtt/loop/copas.lua index 9b58661..3491528 100644 --- a/mqtt/loop/copas.lua +++ b/mqtt/loop/copas.lua @@ -1,3 +1,8 @@ +--- Copas specific client handling module. +-- Typically this module is not used directly, but through `mqtt.loop` when +-- auto-detecting the environment. +-- @module mqtt.loop.copas + local copas = require "copas" local log = require "mqtt.log" @@ -7,9 +12,10 @@ local _M = {} --- Add MQTT client to the Copas scheduler. --- The client will automatically be removed after it exits. --- @tparam cl client to add to the Copas scheduler --- @return true on success or false and error message on failure +-- The client will automatically be removed after it exits. It will set up a +-- thread to call `Client:check_keep_alive`. +-- @param cl mqtt-client to add to the Copas scheduler +-- @return `true` on success or `false` and error message on failure function _M.add(cl) if client_registry[cl] then log:warn("MQTT client '%s' was already added to Copas", cl.opts.id) diff --git a/mqtt/loop/init.lua b/mqtt/loop/init.lua index e8976de..f7a4c3d 100644 --- a/mqtt/loop/init.lua +++ b/mqtt/loop/init.lua @@ -1,6 +1,28 @@ ---- auto detect the connector to use. --- This is based on a.o. libraries already loaded, so 'require' this --- module as late as possible (after the other modules) +--- Auto detect the IO loop to use. +-- Interacting with the supported IO loops (ioloop, copas, and nginx) requires +-- specific implementations to get it right. +-- This module is will auto-detect the environment and return the proper +-- module from; +-- +-- * `mqtt.loop.ioloop` +-- +-- * `mqtt.loop.copas` +-- +-- * `mqtt.loop.nginx` +-- +-- Since the selection is based on a.o. packages loaded, make sure that in case +-- of using the `copas` scheduler, you require it before the `mqtt` modules. +-- +-- @usage +-- --local copas = require "copas" -- only if you use Copas +-- local mqtt = require "mqtt" +-- local add_client = require("mqtt.loop").add -- returns a loop-specific function +-- +-- local client = mqtt.create { ... options ... } +-- add_client(client) -- works for ioloop, copas, and nginx +-- +-- @module mqtt.loop + local loops = setmetatable({ copas = "mqtt.loop.copas", nginx = "mqtt.loop.nginx", diff --git a/mqtt/loop/ioloop.lua b/mqtt/loop/ioloop.lua index 4b83038..d585651 100644 --- a/mqtt/loop/ioloop.lua +++ b/mqtt/loop/ioloop.lua @@ -1,7 +1,17 @@ +--- IOloop specific client handling module. +-- Typically this module is not used directly, but through `mqtt.loop` when +-- auto-detecting the environment. +-- @module mqtt.loop.ioloop + local _M = {} local mqtt = require "mqtt" +--- Add MQTT client to the integrated ioloop. +-- The client will automatically be removed after it exits. It will set up a +-- function to call `Client:check_keep_alive` in the ioloop. +-- @param client mqtt-client to add to the ioloop +-- @return `true` on success or `false` and error message on failure function _M.add(client) local default_loop = mqtt.get_ioloop() return default_loop:add(client) diff --git a/mqtt/loop/nginx.lua b/mqtt/loop/nginx.lua index bcea387..3a8a3d9 100644 --- a/mqtt/loop/nginx.lua +++ b/mqtt/loop/nginx.lua @@ -1,3 +1,7 @@ +--- Nginx specific client handling module. +-- Typically this module is not used directly, but through `mqtt.loop` when +-- auto-detecting the environment. +-- @module mqtt.loop.nginx local client_registry = {} @@ -5,9 +9,10 @@ local _M = {} --- Add MQTT client to the Nginx environment. --- The client will automatically be removed after it exits. --- @tparam cl client to add --- @return true on success or false and error message on failure +-- The client will automatically be removed after it exits. It will set up a +-- thread to call `Client:check_keep_alive`. +-- @param client mqtt-client to add to the Nginx environment +-- @return `true` on success or `false` and error message on failure function _M.add(client) if client_registry[client] then ngx.log(ngx.WARN, "MQTT client '%s' was already added to Nginx", client.opts.id) @@ -46,7 +51,10 @@ function _M.add(client) if not ok then ngx.log(ngx.CRIT, "Failed to start timer-context for device '", client.id,"': ", err) + return false, "timer failed: " .. err end + + return true end