Skip to content

Commit

Permalink
abort_source: subscription: allow calling on_abort explicitly
Browse files Browse the repository at this point in the history
Today, when calling `subscribe` after abort has alredy
been requested on an `abort_source` we return a default-
initialized subscription that signifies that subscription
failed.

With this change, `subscribe` would return an unlinked
subscription, that still holds the required callback.
`bool(subscription)` stil elaluates to `false` in the
same way, since the subscription is unlinked, but,
it supports `on_abort`, that calls the subscribed
callback, at a later time, the same way as if
abort is requested at a later time.

Thise allows the users of this interface to
implement a unified abort path that can either be called
by the abort_source, if subscribe happened before
abort was requested, or called by the user `on_abort`
call, if abort weas never requested, or if it was requested
before `subscribe` was called.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy committed Jul 9, 2024
1 parent ded56dc commit 0ca51ec
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
18 changes: 12 additions & 6 deletions include/seastar/core/abort_source.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,31 @@ public:
class subscription : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
friend class abort_source;

subscription_callback_type _target;
subscription_callback_type _target = noop_handler;

explicit subscription(abort_source& as, subscription_callback_type target)
: _target(std::move(target)) {
if (!as.abort_requested()) {
as._subscriptions.push_back(*this);
}
}

struct naive_cb_tag {}; // to disambiguate constructors
explicit subscription(naive_cb_tag, abort_source& as, naive_subscription_callback_type naive_cb)
: _target([cb = std::move(naive_cb)] (const std::optional<std::exception_ptr>&) noexcept { cb(); }) {
if (!as.abort_requested()) {
as._subscriptions.push_back(*this);
}
}

public:
static void noop_handler(const std::optional<std::exception_ptr>&) noexcept {}

/// the subscribed callback is called at most once
void on_abort(const std::optional<std::exception_ptr>& ex) noexcept {
_target(ex);
unlink();
auto target = std::exchange(_target, noop_handler);
target(ex);
}

public:
Expand Down Expand Up @@ -119,7 +129,6 @@ private:
auto subs = std::move(_subscriptions);
while (!subs.empty()) {
subscription& s = subs.front();
s.unlink();
s.on_abort(ex);
}
}
Expand All @@ -140,9 +149,6 @@ public:
std::is_nothrow_invocable_r_v<void, Func>)
[[nodiscard]]
optimized_optional<subscription> subscribe(Func&& f) {
if (abort_requested()) {
return { };
}
if constexpr (std::is_invocable_v<Func, std::exception_ptr>) {
return { subscription(*this, std::forward<Func>(f)) };
} else {
Expand Down
70 changes: 70 additions & 0 deletions tests/unit/abort_source_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,73 @@ SEASTAR_THREAD_TEST_CASE(test_request_abort_twice) {
as.request_abort();
BOOST_REQUIRE_THROW(as.check(), std::runtime_error);
}

SEASTAR_THREAD_TEST_CASE(test_on_abort_call_after_abort) {
std::exception_ptr signalled_ex;
auto as = abort_source();
auto sub = as.subscribe([&] (const std::optional<std::exception_ptr>& ex) noexcept {
BOOST_REQUIRE(!signalled_ex);
signalled_ex = *ex;
});
BOOST_REQUIRE_EQUAL(bool(sub), true);
BOOST_REQUIRE(signalled_ex == nullptr);

// on_abort should trigger the subscribed callback
as.request_abort_ex(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE_EQUAL(bool(sub), false);
BOOST_REQUIRE(signalled_ex != nullptr);
BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error);

// on_abort is single-shot
signalled_ex = nullptr;
sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE(signalled_ex == nullptr);
}

SEASTAR_THREAD_TEST_CASE(test_on_abort_call_before_abort) {
std::exception_ptr signalled_ex;
auto as = abort_source();
auto sub = as.subscribe([&] (const std::optional<std::exception_ptr>& ex) noexcept {
BOOST_REQUIRE(!signalled_ex);
signalled_ex = *ex;
});
BOOST_REQUIRE_EQUAL(bool(sub), true);
BOOST_REQUIRE(signalled_ex == nullptr);

// on_abort should trigger the subscribed callback
sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE_EQUAL(bool(sub), false);
BOOST_REQUIRE(signalled_ex != nullptr);
BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error);

// subscription is single-shot
signalled_ex = nullptr;
as.request_abort_ex(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE(signalled_ex == nullptr);
}

SEASTAR_THREAD_TEST_CASE(test_subscribe_aborted_source) {
std::exception_ptr signalled_ex;
auto as = abort_source();
as.request_abort();
auto sub = as.subscribe([&] (const std::optional<std::exception_ptr>& ex) noexcept {
BOOST_REQUIRE(!signalled_ex);
signalled_ex = *ex;
});

// subscription is expected to evaluate to false
// if abort_source was already aborted
BOOST_REQUIRE_EQUAL(bool(sub), false);
BOOST_REQUIRE(signalled_ex == nullptr);

// on_abort should trigger the subscribed callback
// if abort_source was already aborted
sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE(signalled_ex != nullptr);
BOOST_REQUIRE_THROW(std::rethrow_exception(signalled_ex), std::runtime_error);

// on_abort is single-shot
signalled_ex = nullptr;
sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE(signalled_ex == nullptr);
}

0 comments on commit 0ca51ec

Please sign in to comment.