Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disconnect from server when exceeded timeout #28

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions mqtt/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ function client_mt:__init(args)

-- state
self.first_connect = true -- contains true to perform one network connection attempt after client creation
self.send_time = 0 -- time of the last network send from client side
self._last_in_time = 0 -- time of the last network received from client side

-- packet creation/parse functions according version
if not a.version then
Expand Down Expand Up @@ -566,8 +566,11 @@ function client_mt:disconnect(rc, properties, user_properties)
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")

self.is_disconnecting = true

-- check connection is alive
if not self.connection then
self.is_disconnecting = false
return false, "network connection is not opened"
end

Expand All @@ -585,11 +588,13 @@ function client_mt:disconnect(rc, properties, user_properties)
err = "failed to send DISCONNECT: "..err
self:handle("error", err, self)
self:close_connection("error")
self.is_disconnecting = false
return false, err
end

-- now close connection
self:close_connection("connection closed by client")
self.is_disconnecting = false

return true
end
Expand Down Expand Up @@ -650,7 +655,7 @@ function client_mt:close_connection(reason)
-- check connection is still closed (self.connection may be re-created in "close" handler)
if not self.connection then
-- remove from ioloop
if self.ioloop and not args.reconnect then
if self.ioloop and (self.is_disconnecting or not args.reconnect) then
self.ioloop:remove(self)
end
end
Expand Down Expand Up @@ -780,6 +785,7 @@ function client_mt:send_connect()

-- reset last packet id
self._last_packet_id = nil
self._last_in_time = os_time()

return true
end
Expand Down Expand Up @@ -906,8 +912,16 @@ function client_mt:_ioloop_iteration()

if ok then
-- send PINGREQ if keep_alive interval is reached
if os_time() - self.send_time >= args.keep_alive then
self:send_pingreq()
if os_time() - self._last_in_time >= args.keep_alive then
if not self._ping_t then
self._ping_t = os_time()
self._last_in_time = os_time()
ok, err = self:send_pingreq()
else
err = "client has exceeded timeout, disconnecting."
self:handle("error", err, self)
self:close_connection("error")
end
end
end

Expand All @@ -917,7 +931,7 @@ function client_mt:_ioloop_iteration()
if self.first_connect then
self.first_connect = false
self:start_connecting()
elseif args.reconnect then
elseif args.reconnect and not self.is_disconnecting then
if args.reconnect == true then
self:start_connecting()
else
Expand Down Expand Up @@ -962,6 +976,9 @@ function client_mt:_io_iteration(recv)

-- check for communication error
if packet == false then
if self.is_disconnecting then
return true
end
if err == "closed" then
self:close_connection("connection closed by broker")
return false, err
Expand Down Expand Up @@ -1004,8 +1021,7 @@ function client_mt:_io_iteration(recv)
-- handle packet according its type
local ptype = packet.type
if ptype == packet_type.PINGRESP then -- luacheck: ignore
-- PINGREQ answer, nothing to do
-- TODO: break the connectin in absence of this packet in some timeout
self._ping_t = nil -- mark ping transfer flag to nil
elseif ptype == packet_type.SUBACK then
self:handle("subscribe", packet, self)
elseif ptype == packet_type.UNSUBACK then
Expand Down Expand Up @@ -1156,7 +1172,6 @@ function client_mt:_send_packet(packet)
return false, "connector.send failed: "..err
end
end
self.send_time = os_time()
return true
end

Expand All @@ -1171,6 +1186,7 @@ function client_mt:_receive_packet()
if not packet then
return false, err
end
self._last_in_time = os_time()
return packet
end

Expand Down