Skip to content

Commit

Permalink
fix(deps) bump to Copas 4
Browse files Browse the repository at this point in the history
  • Loading branch information
Tieske committed Aug 6, 2022
1 parent 6e6c28b commit 10080da
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion mqtt/connector/base/non_buffered_base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ non_buffered.signal_closed = {} -- remote closed the connection

--- Validate connection options.
function non_buffered:shutdown() -- luacheck: ignore
error("method 'validate' on connector wasn't implemented")
error("method 'validate' on connector wasn't implemented") --TODO: comments and text doesn't match name
end

--- Clears consumed bytes.
Expand Down
2 changes: 2 additions & 0 deletions mqtt/connector/copas.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ end
function connector:connect()
self:validate()
local sock = copas.wrap(socket.tcp(), self.secure_params)
copas.setsocketname("mqtt@"..self.host..":"..self.port, sock)

sock:settimeouts(self.timeout, self.timeout, -1) -- no timout on reading

local ok, err = sock:connect(self.host, self.port)
Expand Down
13 changes: 7 additions & 6 deletions mqtt/loop/copas.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ function _M.add(cl)

do -- make mqtt device async for incoming packets
local handle_received_packet = cl.handle_received_packet

local count = 0
-- replace packet handler; create a new thread for each packet received
cl.handle_received_packet = function(mqttdevice, packet)
copas.addthread(handle_received_packet, mqttdevice, packet)
count = count + 1
copas.addnamedthread(handle_received_packet, cl.opts.id..":receive_"..count, mqttdevice, packet)
return true
end
end

-- add keep-alive timer
local timer = copas.addthread(function()
local timer = copas.addnamedthread(function()
while client_registry[cl] do
copas.sleep(cl:check_keep_alive())
end
end)
end, cl.opts.id .. ":keep_alive")

-- add client to connect and listen
copas.addthread(function()
copas.addnamedthread(function()
while client_registry[cl] do
local timeout = cl:step()
if not timeout then
Expand All @@ -54,7 +55,7 @@ function _M.add(cl)
end
end
end
end)
end, cl.opts.id .. ":listener")

return true
end
Expand Down

0 comments on commit 10080da

Please sign in to comment.