Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libboostasio: close connection on timeout #463

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 123 additions & 79 deletions include/amqpcpp/libboostasio.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/**
* LibBoostAsio.h
*
* Implementation for the AMQP::TcpHandler for boost::asio. You can use this class
* instead of a AMQP::TcpHandler class, just pass the boost asio service to the
* Implementation for the AMQP::TcpHandler for boost::asio. You can use this class
* instead of a AMQP::TcpHandler class, just pass the boost asio service to the
* constructor and you're all set. See tests/libboostasio.cpp for example.
*
* Watch out: this class was not implemented or reviewed by the original author of
* Watch out: this class was not implemented or reviewed by the original author of
* AMQP-CPP. However, we do get a lot of questions and issues from users of this class,
* so we cannot guarantee its quality. If you run into such issues too, it might be
* better to implement your own handler that interact with boost.
Expand Down Expand Up @@ -62,12 +62,6 @@ class LibBoostAsioHandler : public virtual TcpHandler
{
private:

/**
* The boost asio io_context which is responsible for detecting events.
* @var class boost::asio::io_context&
*/
boost::asio::io_context & _iocontext;

using strand_weak_ptr = std::weak_ptr<boost::asio::io_context::strand>;

/**
Expand All @@ -84,10 +78,16 @@ class LibBoostAsioHandler : public virtual TcpHandler
boost::asio::posix::stream_descriptor _socket;

/**
* The boost asynchronous deadline timer.
* The boost asynchronous deadline timer for heartbeats.
* @var class boost::asio::deadline_timer
*/
boost::asio::deadline_timer _heartbeat_timer;

/**
* The boost asynchronous deadline timer for read event timeouts.
* @var class boost::asio::deadline_timer
*/
boost::asio::deadline_timer _timer;
boost::asio::deadline_timer _expire_timer;

/**
* A boolean that indicates if the watcher is monitoring for read events.
Expand All @@ -113,28 +113,38 @@ class LibBoostAsioHandler : public virtual TcpHandler
*/
bool _write_pending{false};

using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
using timer_handler = boost::function<void(boost::system::error_code)>;
/**
* The timeout in seconds between each heartbeat to be sent.
* @var _timeout 0 if no timeout is agreed.
*/
uint16_t _heartbeat_interval{0};

/**
* The timeout in seconds negotiated with the server.
* @var _timeout 0 if no timeout is agreed.
*/
uint16_t _expire_timeout{0};

using handler = boost::function<void(boost::system::error_code)>;
using io_handler = boost::function<void(boost::system::error_code, const std::size_t)>;

/**
* Builds a io handler callback that executes the io callback in a strand.
* @param io_handler The handler callback to dispatch
* @return handler_cb A function wrapping the execution of the handler function in a io_context::strand.
* @return io_handler A function wrapping the execution of the handler function in a io_context::strand.
*/
handler_cb get_dispatch_wrapper(io_handler fn)
io_handler get_dispatch_wrapper(const handler& fn)
{
const strand_weak_ptr wpstrand = _wpstrand;

return [fn, wpstrand](const boost::system::error_code &ec, const std::size_t bytes_transferred)
return [fn, wpstrand] (const boost::system::error_code &ec, const std::size_t transfered_bytes)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0});
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec, bytes_transferred));
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
};
}

Expand All @@ -144,12 +154,11 @@ class LibBoostAsioHandler : public virtual TcpHandler
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_read_handler(TcpConnection *const connection, const int fd)
io_handler get_read_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::read_handler,
this,
boost::placeholders::_1,
boost::placeholders::_2,
PTR_FROM_THIS(Watcher),
connection,
fd);
Expand All @@ -162,58 +171,56 @@ class LibBoostAsioHandler : public virtual TcpHandler
* @param fd The file descripter being watched.
* @return handler callback
*/
handler_cb get_write_handler(TcpConnection *const connection, const int fd)
io_handler get_write_handler(TcpConnection *const connection, const int fd)
{
auto fn = boost::bind(&Watcher::write_handler,
this,
boost::placeholders::_1,
boost::placeholders::_2,
PTR_FROM_THIS(Watcher),
connection,
fd);
return get_dispatch_wrapper(fn);
}

/**
* Binds and returns a lamba function handler for the io operation.
* Binds and returns a lamba function handler for the heartbeat operation.
* @param connection The connection being watched.
* @param timeout The file descripter being watched.
* @return handler callback
*/
timer_handler get_timer_handler(TcpConnection *const connection, const uint16_t timeout)
handler get_heartbeat_handler(TcpConnection *const connection)
{
const auto fn = boost::bind(&Watcher::timeout_handler,
this,
boost::placeholders::_1,
PTR_FROM_THIS(Watcher),
connection,
timeout);

const strand_weak_ptr wpstrand = _wpstrand;
const auto fn = boost::bind(&Watcher::heartbeat_handler,
this,
boost::placeholders::_1,
PTR_FROM_THIS(Watcher),
connection);
return boost::bind(get_dispatch_wrapper(fn), boost::placeholders::_1, 0);
}

return [fn, wpstrand](const boost::system::error_code &ec)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
};
/**
* Binds and returns a lamba function handler for the expire operation.
* @param connection The connection being watched.
* @return handler callback
*/
handler get_expire_handler(TcpConnection *const connection)
{
const auto fn = boost::bind(&Watcher::expire_handler,
this,
boost::placeholders::_1,
PTR_FROM_THIS(Watcher),
connection);
return boost::bind(get_dispatch_wrapper(fn), boost::placeholders::_1, 0);
}

/**
* Handler method that is called by boost's io_context when the socket pumps a read event.
* @param ec The status of the callback.
* @param bytes_transferred The number of bytes transferred.
* @param awpWatcher A weak pointer to this object.
* @param connection The connection being watched.
* @param fd The file descriptor being watched.
* @note The handler will get called if a read is cancelled.
*/
void read_handler(const boost::system::error_code &ec,
const std::size_t bytes_transferred,
const std::weak_ptr<Watcher> awpWatcher,
TcpConnection *const connection,
const int fd)
Expand All @@ -227,6 +234,14 @@ class LibBoostAsioHandler : public virtual TcpHandler

if ((!ec || ec == boost::asio::error::would_block) && _read)
{
// if the server is readable, we have some extra time before it expires, the expire time
// is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent
if (_expire_timeout > 0)
{
_expire_timer.expires_from_now(boost::posix_time::seconds(_expire_timeout));
_expire_timer.async_wait(get_expire_handler(connection));
}

connection->process(fd, AMQP::readable);

_read_pending = true;
Expand All @@ -240,14 +255,12 @@ class LibBoostAsioHandler : public virtual TcpHandler
/**
* Handler method that is called by boost's io_context when the socket pumps a write event.
* @param ec The status of the callback.
* @param bytes_transferred The number of bytes transferred.
* @param awpWatcher A weak pointer to this object.
* @param connection The connection being watched.
* @param fd The file descriptor being watched.
* @note The handler will get called if a write is cancelled.
*/
void write_handler(const boost::system::error_code ec,
const std::size_t bytes_transferred,
const std::weak_ptr<Watcher> awpWatcher,
TcpConnection *const connection,
const int fd)
Expand All @@ -274,14 +287,13 @@ class LibBoostAsioHandler : public virtual TcpHandler
/**
* Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param loop The loop in which the event was triggered
* @param awpWatcher A weak pointer to this object.
* @param connection
* @param timeout
*/
void timeout_handler(const boost::system::error_code &ec,
std::weak_ptr<Watcher> awpThis,
TcpConnection *const connection,
const uint16_t timeout)
void heartbeat_handler(const boost::system::error_code &ec,
std::weak_ptr<Watcher> awpThis,
TcpConnection *const connection)
{
// Resolve any potential problems with dangling pointers
// (remember we are using async).
Expand All @@ -296,11 +308,33 @@ class LibBoostAsioHandler : public virtual TcpHandler
connection->heartbeat();
}

// Reschedule the timer for the future:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Reschedule the timer and register handler
_heartbeat_timer.expires_from_now(boost::posix_time::seconds(_heartbeat_interval));
_heartbeat_timer.async_wait(get_heartbeat_handler(connection));
}
}

// Posts the timer event
_timer.async_wait(get_timer_handler(connection, timeout));
/**
* Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param awpWatcher A weak pointer to this object.
* @param connection
*/
void expire_handler(const boost::system::error_code &ec,
std::weak_ptr<Watcher> awpThis,
TcpConnection *const connection)
{
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apTimer = awpThis.lock();
if (!apTimer) { return; }
if (!ec)
{
if (connection)
{
// this is a connection timeout, close the connection from our side too
connection->close(true);
}
}
}

Expand All @@ -315,10 +349,10 @@ class LibBoostAsioHandler : public virtual TcpHandler
Watcher(boost::asio::io_context &io_context,
const strand_weak_ptr wpstrand,
const int fd) :
_iocontext(io_context),
_wpstrand(wpstrand),
_socket(io_context),
_timer(io_context)
_heartbeat_timer(io_context),
_expire_timer(io_context)
{
_socket.assign(fd);

Expand All @@ -341,7 +375,7 @@ class LibBoostAsioHandler : public virtual TcpHandler
_read = false;
_write = false;
_socket.release();
stop_timer();
stop_timers();
}

/**
Expand Down Expand Up @@ -378,29 +412,41 @@ class LibBoostAsioHandler : public virtual TcpHandler
}

/**
* Change the expire time
* Change the expire and heartbeat timers
* @param connection
* @param timeout
*/
void set_timer(TcpConnection *connection, uint16_t timeout)
void set_timers(TcpConnection *connection, uint16_t heartbeat_timeout)
{
// stop timer in case it was already set
stop_timer();
stop_timers();

// timeout not applicable
if (heartbeat_timeout == 0) { return; }

// Reschedule the timer for the future:
_timer.expires_from_now(boost::posix_time::seconds(timeout));
// save new interval
_heartbeat_interval = heartbeat_timeout * 0.5;

// Posts the timer event
_timer.async_wait(get_timer_handler(connection, timeout));
// Reschedule the timer and register handler
_heartbeat_timer.expires_from_now(boost::posix_time::seconds(_heartbeat_interval));
_heartbeat_timer.async_wait(get_heartbeat_handler(connection));

// save new timeout - increase the agreed timeout to be a bit permissive
_expire_timeout = heartbeat_timeout * 1.5;

// Reschedule the timer and register handler
_expire_timer.expires_from_now(boost::posix_time::seconds(_expire_timeout));
_expire_timer.async_wait(get_expire_handler(connection));
}

/**
* Stop the timer
*/
void stop_timer()
void stop_timers()
{
// do nothing if it was never set
_timer.cancel();
_heartbeat_timer.cancel();
_expire_timer.cancel();
}
};

Expand Down Expand Up @@ -446,7 +492,6 @@ class LibBoostAsioHandler : public virtual TcpHandler
// construct a new pair (watcher/timer), and put it in the map
const std::shared_ptr<Watcher> apWatcher =
std::make_shared<Watcher>(_iocontext, _strand, fd);

_watchers[fd] = apWatcher;

// explicitly set the events to monitor
Expand All @@ -468,24 +513,24 @@ class LibBoostAsioHandler : public virtual TcpHandler
/**
* Method that is called when the heartbeat frequency is negotiated between the server and the client.
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
* @param timeout The suggested timeout from the server
* @return uint16_t The timeout to use
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t timeout) override
{
// skip if no heartbeats are needed
if (interval == 0) return 0;
if (timeout == 0) return 0;

const auto fd = connection->fileno();

auto iter = _watchers.find(fd);
if (iter == _watchers.end()) return 0;

// set the timer
iter->second->set_timer(connection, interval);
iter->second->set_timers(connection, timeout);

// we agree with the interval
return interval;
// we agree with the timeout
return timeout;
Comment on lines +516 to +533
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This renaming is to achieve consistency with libev.h.

Although "interval" is the most used term in the library, I feel that calling "interval" wouldn't be intuitive to read "heartbeat interval = interval / 2". So I agree more with the "timeout" term chosen in libev.h

}

public:
Expand All @@ -504,7 +549,6 @@ class LibBoostAsioHandler : public virtual TcpHandler
explicit LibBoostAsioHandler(boost::asio::io_context &io_context) :
_iocontext(io_context),
_strand(std::make_shared<boost::asio::io_context::strand>(_iocontext))
//_timer(std::make_shared<Timer>(_iocontext,_strand))
{

}
Expand Down