From e6b212a41cdd00c58a027c54d908cbbd59c920f2 Mon Sep 17 00:00:00 2001 From: Thijs Schreijer Date: Mon, 1 Nov 2021 01:21:30 +0100 Subject: [PATCH] feat(ioloop) add incremental back-off for idle clients --- mqtt/ioloop.lua | 55 ++++++++++++++++------------ mqtt/luasocket.lua | 4 +- tests/spec/01-module-basics_spec.lua | 1 - 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/mqtt/ioloop.lua b/mqtt/ioloop.lua index 575f799..b6d8bbe 100644 --- a/mqtt/ioloop.lua +++ b/mqtt/ioloop.lua @@ -50,18 +50,21 @@ ioloop_mt.__index = ioloop_mt --- Initialize ioloop instance -- @tparam table opts ioloop creation options table --- @tparam[opt=0.005] number opts.timeout network operations timeout in seconds --- @tparam[opt=0] number opts.sleep sleep interval after each iteration +-- @tparam[opt=0] number opts.sleep_min min sleep interval after each iteration +-- @tparam[opt=0.002] number opts.sleep_step increase in sleep after every idle iteration +-- @tparam[opt=0.030] number opts.sleep_max max sleep interval after each iteration -- @tparam[opt] function opts.sleep_function custom sleep function to call after each iteration -- @treturn ioloop_mt ioloop instance function ioloop_mt:__init(opts) log:debug("initializing ioloop instance '%s'", tostring(self)) opts = opts or {} - opts.timeout = opts.timeout or 0.005 - opts.sleep = opts.sleep or 0 + opts.sleep_min = opts.sleep_min or 0 + opts.sleep_step = opts.sleep_step or 0.002 + opts.sleep_max = opts.sleep_max or 0.030 opts.sleep_function = opts.sleep_function or require("socket").sleep self.opts = opts self.clients = {} + self.timeouts = setmetatable({}, { __mode = "v" }) self.running = false --ioloop running flag, used by MQTT clients which are adding after this ioloop started to run end @@ -81,6 +84,7 @@ function ioloop_mt:add(client) end clients[#clients + 1] = client clients[client] = true + self.timeouts[client] = self.opts.sleep_min if type(client) == "table" then log:info("adding client '%s' to ioloop '%s'", client.opts.id, tostring(self)) @@ -136,36 +140,41 @@ function ioloop_mt:remove(client) end --- Perform one ioloop iteration. --- TODO: make this smarter do not wake-up functions or clients returned a longer --- sleep delay. Currently it's pretty much a busy loop. +-- TODO: make this smarter do not wake-up functions or clients returning a longer +-- sleep delay. Currently they will be tried earlier if another returns a smaller delay. function ioloop_mt:iteration() local opts = self.opts - local sleep = opts.sleep + local sleep = opts.sleep_max for _, client in ipairs(self.clients) do local t, err -- read data and handle events if type(client) ~= "function" then t, err = client:step() - if t == -1 then - -- no data read, client is idle - t = nil - elseif not t then - if not client.opts.reconnect then - -- error and not reconnecting, remove the client - log:error("client '%s' failed with '%s', will not re-connect", client.opts.id, err) - self:remove(client) - t = nil - else - -- error, but will reconnect - log:error("client '%s' failed with '%s', will try re-connecting", client.opts.id, err) - t = 0 -- try immediately - end + else + t = client() or opts.sleep_max + end + if t == -1 then + -- no data read, client is idle, step up timeout + t = math_min(self.timeouts[client] + opts.sleep_step, opts.sleep_max) + self.timeouts[client] = t + elseif not t then + -- an error from a client was returned + if not client.opts.reconnect then + -- error and not reconnecting, remove the client + log:fatal("client '%s' failed with '%s', will not re-connect", client.opts.id, err) + self:remove(client) + t = opts.sleep_max + else + -- error, but will reconnect + log:error("client '%s' failed with '%s', will try re-connecting", client.opts.id, err) + t = opts.sleep_min -- try asap end else - t = client() + -- a number of seconds was returned + t = math_min(t, opts.sleep_max) + self.timeouts[client] = opts.sleep_min end - t = t or opts.sleep sleep = math_min(sleep, t) end -- sleep a bit diff --git a/mqtt/luasocket.lua b/mqtt/luasocket.lua index 7a36b99..4cd24f6 100644 --- a/mqtt/luasocket.lua +++ b/mqtt/luasocket.lua @@ -57,9 +57,9 @@ end function luasocket:plain_receive(size) local sock = self.sock - sock:settimeout(0.010) -- TODO: setting to 0 fails??? it shouldn't - local data, err = sock:receive(size) + sock:settimeout(0) + local data, err = sock:receive(size) if data then return data end diff --git a/tests/spec/01-module-basics_spec.lua b/tests/spec/01-module-basics_spec.lua index 67324f5..8d205b6 100644 --- a/tests/spec/01-module-basics_spec.lua +++ b/tests/spec/01-module-basics_spec.lua @@ -249,7 +249,6 @@ describe("MQTT lua library component test:", function() assert.are.equal(2097152, protocol.parse_var_length_nonzero(make_read_func("80808001"))) assert.are.equal(268435455, protocol.parse_var_length_nonzero(make_read_func("FFFFFF7F"))) assert.is_false(protocol.parse_var_length_nonzero(make_read_func("FFFFFFFF"))) - end) it("protocol.next_packet_id", function()