Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer deleting readers until stream destruction #232

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/dev-recv-destruction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,8 @@ encapsulates this workflow in its
provides the facilities to move the shared pointer along a linear chain of
completion handlers so that the reference count does not need to be
adjusted.

Readers are only destroyed when the stream is destroyed. This ensures that the
reader's destructor is called from a user's thread (which in Python bindings,
will hold the GIL). To handle more immediate cleanup when a stream is stopped,
readers may override :cpp:func:`reader::stop`.
1 change: 1 addition & 0 deletions include/spead2/recv_inproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class inproc_reader : public reader
stream &owner,
std::shared_ptr<inproc_queue> queue);

virtual void stop() override;
virtual bool lossy() const override;
};

Expand Down
20 changes: 17 additions & 3 deletions include/spead2/recv_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -803,13 +803,18 @@ class stream_base
* - The queue mutex is taken
* - the stream stops
* - the reader mutex is taken
* - destruction
* - the stream is destroyed
* - @ref stop is called
* - The stream is destroyed
* - destruction of the reader
*
* @ref stop may be called from any thread (either user call via
* @ref stream::stop, or I/O thread for a network stop). The destructor is
* always called from @ref stream::stop (possibly via the destructor).
*
* Destruction must ensure that any pending asynchronous operations are
* handled. Since destruction may happen on a separate thread to the one
* running in-flight handlers, care must be taken not to access the stream or
* the reader after the stream is stopped. In many cases this can be
* the reader after the stream is destroyed. In many cases this can be
* facilitated using @ref bind_handler, which will keep the stream alive
* and locked for the duration of the bound handler.
*/
Expand Down Expand Up @@ -923,6 +928,15 @@ class reader
* should be given when the consumer is applying back-pressure.
*/
virtual bool lossy() const;

/**
* Release resources.
*
* This may be called from any thread, so resources that can only be
* safely released from particular threads should be cleaned up in the
* destructor instead.
*/
virtual void stop() {}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion include/spead2/recv_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class tcp_reader : public reader
boost::asio::ip::tcp::acceptor &&acceptor,
std::size_t max_size = default_max_size);

virtual void stop() override;
virtual bool lossy() const override;

};

} // namespace recv
Expand Down
2 changes: 2 additions & 0 deletions include/spead2/recv_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class udp_reader : public udp_reader_base
stream &owner,
boost::asio::ip::udp::socket &&socket,
std::size_t max_size = default_max_size);

virtual void stop() override;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions include/spead2/recv_udp_ibv.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class udp_ibv_reader_core : public udp_reader_base
udp_ibv_reader_core(
stream &owner,
const udp_ibv_config &config);

virtual void stop() override;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions src/recv_inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ void inproc_reader::enqueue(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&inproc_reader::packet_handler, this, _1, _2, _3, _4)));
}

void inproc_reader::stop()
{
data_sem_wrapper.close();
}

bool inproc_reader::lossy() const
{
return false;
Expand Down
5 changes: 3 additions & 2 deletions src/recv_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ void stream::stop_received()
{
stream_base::stop_received();
std::lock_guard<std::mutex> lock(reader_mutex);
readers.clear();
/* This ensures that once we clear out the readers, any future call to
for (const auto &r : readers)
r->stop();
/* This ensures that once we stop the readers, any future call to
* emplace_reader will silently be ignored. This avoids issues if there
* is a race between the user calling emplace_reader and a stop packet
* in the stream.
Expand Down
13 changes: 13 additions & 0 deletions src/recv_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@ void tcp_reader::enqueue_receive(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&tcp_reader::packet_handler, this, _1, _2, _3, _4)));
}

void tcp_reader::stop()
{
/* asio guarantees that closing a socket will cancel any pending
* operations on it.
* Don't put any logging here: it could be running in a shutdown
* path where it is no longer safe to do so.
*/
if (peer.is_open())
peer.close();
if (acceptor.is_open())
acceptor.close();
}

bool tcp_reader::lossy() const
{
return false;
Expand Down
10 changes: 10 additions & 0 deletions src/recv_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ void udp_reader::enqueue_receive(handler_context ctx)
bind_handler(std::move(ctx), std::bind(&udp_reader::packet_handler, this, _1, _2, _3, _4)));
}

void udp_reader::stop()
{
/* asio guarantees that closing a socket will cancel any pending
* operations on it.
* Don't put any logging here: it could be running in a shutdown
* path where it is no longer safe to do so.
*/
socket.close();
}

/////////////////////////////////////////////////////////////////////////////

static bool ibv_override;
Expand Down
6 changes: 6 additions & 0 deletions src/recv_udp_ibv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ void udp_ibv_reader_core::join_groups(
}
}

void udp_ibv_reader_core::stop()
{
if (comp_channel)
comp_channel_wrapper.close();
}

} // namespace detail

static std::size_t compute_n_slots(const rdma_cm_id_t &cm_id, std::size_t buffer_size,
Expand Down