From 76066d6c11f8bf1fa7fd5761d62696a1b9222c7e Mon Sep 17 00:00:00 2001 From: Bruce Merry Date: Thu, 3 Aug 2023 10:20:20 +0200 Subject: [PATCH 1/2] Defer deleting readers until stream destruction Fixes #230 --- doc/dev-recv-destruction.rst | 5 +++++ include/spead2/recv_stream.h | 20 +++++++++++++++++--- src/recv_stream.cpp | 5 +++-- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/doc/dev-recv-destruction.rst b/doc/dev-recv-destruction.rst index 4c3999784..b09424459 100644 --- a/doc/dev-recv-destruction.rst +++ b/doc/dev-recv-destruction.rst @@ -58,3 +58,8 @@ encapsulates this workflow in its provides the facilities to move the shared pointer along a linear chain of completion handlers so that the reference count does not need to be adjusted. + +Readers are only destroyed when the stream is destroyed. This ensures that the +reader's destructor is called from a user's thread (which in Python bindings, +will hold the GIL). To handle more immediate cleanup when a stream is stopped, +readers may override :cpp:func:`reader::stop`. diff --git a/include/spead2/recv_stream.h b/include/spead2/recv_stream.h index 08b2f037c..d73c2cb9f 100644 --- a/include/spead2/recv_stream.h +++ b/include/spead2/recv_stream.h @@ -803,13 +803,18 @@ class stream_base * - The queue mutex is taken * - the stream stops * - the reader mutex is taken - * - destruction - * - the stream is destroyed + * - @ref stop is called + * - The stream is destroyed + * - destruction of the reader + * + * @ref stop may be called from any thread (either user call via + * @ref stream::stop, or I/O thread for a network stop). The destructor is + * always called from @ref stream::stop (possibly via the destructor). * * Destruction must ensure that any pending asynchronous operations are * handled. Since destruction may happen on a separate thread to the one * running in-flight handlers, care must be taken not to access the stream or - * the reader after the stream is stopped. In many cases this can be + * the reader after the stream is destroyed. In many cases this can be * facilitated using @ref bind_handler, which will keep the stream alive * and locked for the duration of the bound handler. */ @@ -923,6 +928,15 @@ class reader * should be given when the consumer is applying back-pressure. */ virtual bool lossy() const; + + /** + * Release resources. + * + * This may be called from any thread, so resources that can only be + * safely released from particular threads should be cleaned up in the + * destructor instead. + */ + virtual void stop() {} }; /** diff --git a/src/recv_stream.cpp b/src/recv_stream.cpp index 1946ff48e..326298381 100644 --- a/src/recv_stream.cpp +++ b/src/recv_stream.cpp @@ -649,8 +649,9 @@ void stream::stop_received() { stream_base::stop_received(); std::lock_guard lock(reader_mutex); - readers.clear(); - /* This ensures that once we clear out the readers, any future call to + for (const auto &r : readers) + r->stop(); + /* This ensures that once we stop the readers, any future call to * emplace_reader will silently be ignored. This avoids issues if there * is a race between the user calling emplace_reader and a stop packet * in the stream. From d551b235e210d359d7d60ac4ce4d0feaf0abc5b6 Mon Sep 17 00:00:00 2001 From: Bruce Merry Date: Thu, 3 Aug 2023 11:31:45 +0200 Subject: [PATCH 2/2] Re-introduce stop in some reader subclasses Bring back the implementations from spead2 3.11, so that OS resources will still be freed as soon as the stream stops. --- include/spead2/recv_inproc.h | 1 + include/spead2/recv_tcp.h | 2 +- include/spead2/recv_udp.h | 2 ++ include/spead2/recv_udp_ibv.h | 2 ++ src/recv_inproc.cpp | 5 +++++ src/recv_tcp.cpp | 13 +++++++++++++ src/recv_udp.cpp | 10 ++++++++++ src/recv_udp_ibv.cpp | 6 ++++++ 8 files changed, 40 insertions(+), 1 deletion(-) diff --git a/include/spead2/recv_inproc.h b/include/spead2/recv_inproc.h index a44af7ae1..b39340847 100644 --- a/include/spead2/recv_inproc.h +++ b/include/spead2/recv_inproc.h @@ -55,6 +55,7 @@ class inproc_reader : public reader stream &owner, std::shared_ptr queue); + virtual void stop() override; virtual bool lossy() const override; }; diff --git a/include/spead2/recv_tcp.h b/include/spead2/recv_tcp.h index b1167504c..7224b0233 100644 --- a/include/spead2/recv_tcp.h +++ b/include/spead2/recv_tcp.h @@ -151,8 +151,8 @@ class tcp_reader : public reader boost::asio::ip::tcp::acceptor &&acceptor, std::size_t max_size = default_max_size); + virtual void stop() override; virtual bool lossy() const override; - }; } // namespace recv diff --git a/include/spead2/recv_udp.h b/include/spead2/recv_udp.h index c9164bee1..5261dcb85 100644 --- a/include/spead2/recv_udp.h +++ b/include/spead2/recv_udp.h @@ -171,6 +171,8 @@ class udp_reader : public udp_reader_base stream &owner, boost::asio::ip::udp::socket &&socket, std::size_t max_size = default_max_size); + + virtual void stop() override; }; /** diff --git a/include/spead2/recv_udp_ibv.h b/include/spead2/recv_udp_ibv.h index 91b5b88c7..a4c6df6e1 100644 --- a/include/spead2/recv_udp_ibv.h +++ b/include/spead2/recv_udp_ibv.h @@ -125,6 +125,8 @@ class udp_ibv_reader_core : public udp_reader_base udp_ibv_reader_core( stream &owner, const udp_ibv_config &config); + + virtual void stop() override; }; /** diff --git a/src/recv_inproc.cpp b/src/recv_inproc.cpp index 6a64733e7..614ee5d5d 100644 --- a/src/recv_inproc.cpp +++ b/src/recv_inproc.cpp @@ -95,6 +95,11 @@ void inproc_reader::enqueue(handler_context ctx) bind_handler(std::move(ctx), std::bind(&inproc_reader::packet_handler, this, _1, _2, _3, _4))); } +void inproc_reader::stop() +{ + data_sem_wrapper.close(); +} + bool inproc_reader::lossy() const { return false; diff --git a/src/recv_tcp.cpp b/src/recv_tcp.cpp index 869304f0d..0c0c0fa9b 100644 --- a/src/recv_tcp.cpp +++ b/src/recv_tcp.cpp @@ -252,6 +252,19 @@ void tcp_reader::enqueue_receive(handler_context ctx) bind_handler(std::move(ctx), std::bind(&tcp_reader::packet_handler, this, _1, _2, _3, _4))); } +void tcp_reader::stop() +{ + /* asio guarantees that closing a socket will cancel any pending + * operations on it. + * Don't put any logging here: it could be running in a shutdown + * path where it is no longer safe to do so. + */ + if (peer.is_open()) + peer.close(); + if (acceptor.is_open()) + acceptor.close(); +} + bool tcp_reader::lossy() const { return false; diff --git a/src/recv_udp.cpp b/src/recv_udp.cpp index 1b18ff70d..a32e29daf 100644 --- a/src/recv_udp.cpp +++ b/src/recv_udp.cpp @@ -232,6 +232,16 @@ void udp_reader::enqueue_receive(handler_context ctx) bind_handler(std::move(ctx), std::bind(&udp_reader::packet_handler, this, _1, _2, _3, _4))); } +void udp_reader::stop() +{ + /* asio guarantees that closing a socket will cancel any pending + * operations on it. + * Don't put any logging here: it could be running in a shutdown + * path where it is no longer safe to do so. + */ + socket.close(); +} + ///////////////////////////////////////////////////////////////////////////// static bool ibv_override; diff --git a/src/recv_udp_ibv.cpp b/src/recv_udp_ibv.cpp index 1c3134dae..0625e928b 100644 --- a/src/recv_udp_ibv.cpp +++ b/src/recv_udp_ibv.cpp @@ -118,6 +118,12 @@ void udp_ibv_reader_core::join_groups( } } +void udp_ibv_reader_core::stop() +{ + if (comp_channel) + comp_channel_wrapper.close(); +} + } // namespace detail static std::size_t compute_n_slots(const rdma_cm_id_t &cm_id, std::size_t buffer_size,