Skip to content

Commit

Permalink
Merge pull request #232 from ska-sa/delay-freeing-readers
Browse files Browse the repository at this point in the history
Defer deleting readers until stream destruction
  • Loading branch information
bmerry authored Aug 7, 2023
2 parents 1938b1d + d551b23 commit 5f1dead
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 6 deletions.
5 changes: 5 additions & 0 deletions doc/dev-recv-destruction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,8 @@ encapsulates this workflow in its
provides the facilities to move the shared pointer along a linear chain of
completion handlers so that the reference count does not need to be
adjusted.

Readers are only destroyed when the stream is destroyed. This ensures that the
reader's destructor is called from a user's thread (which in Python bindings,
will hold the GIL). To handle more immediate cleanup when a stream is stopped,
readers may override :cpp:func:`reader::stop`.
1 change: 1 addition & 0 deletions include/spead2/recv_inproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class inproc_reader : public reader
stream &owner,
std::shared_ptr<inproc_queue> queue);

virtual void stop() override;
virtual bool lossy() const override;
};

Expand Down
20 changes: 17 additions & 3 deletions include/spead2/recv_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -803,13 +803,18 @@ class stream_base
* - The queue mutex is taken
* - the stream stops
* - the reader mutex is taken
* - destruction
* - the stream is destroyed
* - @ref stop is called
* - The stream is destroyed
* - destruction of the reader
*
* @ref stop may be called from any thread (either user call via
* @ref stream::stop, or I/O thread for a network stop). The destructor is
* always called from @ref stream::stop (possibly via the destructor).
*
* Destruction must ensure that any pending asynchronous operations are
* handled. Since destruction may happen on a separate thread to the one
* running in-flight handlers, care must be taken not to access the stream or
* the reader after the stream is stopped. In many cases this can be
* the reader after the stream is destroyed. In many cases this can be
* facilitated using @ref bind_handler, which will keep the stream alive
* and locked for the duration of the bound handler.
*/
Expand Down Expand Up @@ -923,6 +928,15 @@ class reader
* should be given when the consumer is applying back-pressure.
*/
virtual bool lossy() const;

/**
* Release resources.
*
* This may be called from any thread, so resources that can only be
* safely released from particular threads should be cleaned up in the
* destructor instead.
*/
virtual void stop() {}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion include/spead2/recv_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class tcp_reader : public reader
boost::asio::ip::tcp::acceptor &&acceptor,
std::size_t max_size = default_max_size);

virtual void stop() override;
virtual bool lossy() const override;

};

} // namespace recv
Expand Down
2 changes: 2 additions & 0 deletions include/spead2/recv_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class udp_reader : public udp_reader_base
stream &owner,
boost::asio::ip::udp::socket &&socket,
std::size_t max_size = default_max_size);

virtual void stop() override;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions include/spead2/recv_udp_ibv.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class udp_ibv_reader_core : public udp_reader_base
udp_ibv_reader_core(
stream &owner,
const udp_ibv_config &config);

virtual void stop() override;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions src/recv_inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ void inproc_reader::enqueue(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&inproc_reader::packet_handler, this, _1, _2, _3, _4)));
}

void inproc_reader::stop()
{
data_sem_wrapper.close();
}

bool inproc_reader::lossy() const
{
return false;
Expand Down
5 changes: 3 additions & 2 deletions src/recv_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ void stream::stop_received()
{
stream_base::stop_received();
std::lock_guard<std::mutex> lock(reader_mutex);
readers.clear();
/* This ensures that once we clear out the readers, any future call to
for (const auto &r : readers)
r->stop();
/* This ensures that once we stop the readers, any future call to
* emplace_reader will silently be ignored. This avoids issues if there
* is a race between the user calling emplace_reader and a stop packet
* in the stream.
Expand Down
13 changes: 13 additions & 0 deletions src/recv_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@ void tcp_reader::enqueue_receive(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&tcp_reader::packet_handler, this, _1, _2, _3, _4)));
}

void tcp_reader::stop()
{
/* asio guarantees that closing a socket will cancel any pending
* operations on it.
* Don't put any logging here: it could be running in a shutdown
* path where it is no longer safe to do so.
*/
if (peer.is_open())
peer.close();
if (acceptor.is_open())
acceptor.close();
}

bool tcp_reader::lossy() const
{
return false;
Expand Down
10 changes: 10 additions & 0 deletions src/recv_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ void udp_reader::enqueue_receive(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&udp_reader::packet_handler, this, _1, _2, _3, _4)));
}

void udp_reader::stop()
{
/* asio guarantees that closing a socket will cancel any pending
* operations on it.
* Don't put any logging here: it could be running in a shutdown
* path where it is no longer safe to do so.
*/
socket.close();
}

/////////////////////////////////////////////////////////////////////////////

static bool ibv_override;
Expand Down
6 changes: 6 additions & 0 deletions src/recv_udp_ibv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ void udp_ibv_reader_core::join_groups(
}
}

void udp_ibv_reader_core::stop()
{
if (comp_channel)
comp_channel_wrapper.close();
}

} // namespace detail

static std::size_t compute_n_slots(const rdma_cm_id_t &cm_id, std::size_t buffer_size,
Expand Down

0 comments on commit 5f1dead

Please sign in to comment.