Skip to content

Commit

Permalink
[rd-cpp] Fixed signals to release resources from lambda closures on l…
Browse files Browse the repository at this point in the history
…ifetime termination. Fixed SequentialLifetimes to not keep lifetime callback after SequentialLifetimes destroyed.
  • Loading branch information
mirasrael committed Jan 15, 2024
1 parent 0f72fda commit 22eecd0
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 45 deletions.
13 changes: 13 additions & 0 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ Lifetime const& Lifetime::Eternal()
return ETERNAL;
}


Lifetime const& Lifetime::Terminated()
{
static Lifetime TERMINATED = []
{
Lifetime lifetime;
lifetime->terminate();
return lifetime;
}();

return TERMINATED;
}

bool operator==(Lifetime const& lw1, Lifetime const& lw2)
{
return lw1.ptr == lw2.ptr;
Expand Down
3 changes: 3 additions & 0 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/Lifetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class RD_CORE_API Lifetime final
std::shared_ptr<LifetimeImpl> ptr;

public:
typedef LifetimeImpl::counter_t counter_t;

static Lifetime const& Eternal();
static Lifetime const& Terminated();

// region ctor/dtor

Expand Down
2 changes: 1 addition & 1 deletion rd-cpp/src/rd_core_cpp/src/main/lifetime/LifetimeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class RD_CORE_API LifetimeImpl final
{
public:
friend class LifetimeDefinition;

friend class SequentialLifetimes;
friend class Lifetime;

using counter_t = int32_t;
Expand Down
22 changes: 11 additions & 11 deletions rd-cpp/src/rd_core_cpp/src/main/lifetime/SequentialLifetimes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@

namespace rd
{
SequentialLifetimes::SequentialLifetimes(Lifetime parent_lifetime) : parent_lifetime(std::move(parent_lifetime))
SequentialLifetimes::SequentialLifetimes(const Lifetime& parent_lifetime) : parent_lifetime(parent_lifetime)
{
this->parent_lifetime->add_action([this] { set_current_lifetime(LifetimeDefinition::get_shared_eternal()); });
}

Lifetime SequentialLifetimes::next()
{
std::shared_ptr<LifetimeDefinition> new_def = std::make_shared<LifetimeDefinition>(parent_lifetime);
set_current_lifetime(new_def);
return current_def->lifetime;
Lifetime new_lifetime = parent_lifetime.create_nested();
set_current_lifetime(new_lifetime);
return new_lifetime;
}

void SequentialLifetimes::terminate_current()
{
set_current_lifetime(LifetimeDefinition::get_shared_eternal());
set_current_lifetime(Lifetime::Terminated());
}

bool SequentialLifetimes::is_terminated() const
{
return current_def->is_eternal() || current_def->is_terminated();
return current_lifetime->is_terminated();
}

void SequentialLifetimes::set_current_lifetime(std::shared_ptr<LifetimeDefinition> new_def)
void SequentialLifetimes::set_current_lifetime(const Lifetime& lifetime)
{
std::shared_ptr<LifetimeDefinition> prev = current_def;
current_def = new_def;
prev->terminate();
const Lifetime prev = current_lifetime;
current_lifetime = lifetime;
if (!prev->is_terminated())
prev->terminate();
}
} // namespace rd
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace rd
{
class RD_CORE_API SequentialLifetimes
{
private:
std::shared_ptr<LifetimeDefinition> current_def = LifetimeDefinition::get_shared_eternal();
Lifetime parent_lifetime;
Lifetime current_lifetime = Lifetime::Terminated();
void set_current_lifetime(const Lifetime& lifetime);

public:
// region ctor/dtor
Expand All @@ -28,16 +28,14 @@ class RD_CORE_API SequentialLifetimes

SequentialLifetimes& operator=(SequentialLifetimes&&) = delete;

explicit SequentialLifetimes(Lifetime parent_lifetime);
explicit SequentialLifetimes(const Lifetime& parent_lifetime);
// endregion

Lifetime next();

void terminate_current();

bool is_terminated() const;

void set_current_lifetime(std::shared_ptr<LifetimeDefinition> new_def);
};
} // namespace rd

Expand Down
84 changes: 56 additions & 28 deletions rd-cpp/src/rd_core_cpp/src/main/reactive/base/SignalX.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#include "interfaces.h"
#include "SignalCookie.h"
#include "lifetime/LifetimeDefinition.h"

#include <lifetime/Lifetime.h>
#include <util/core_util.h>

#include <utility>
#include <functional>
#include <atomic>
#include <list>

namespace rd
{
Expand All @@ -22,67 +24,93 @@ class Signal final : public ISignal<T>
private:
using WT = typename ISignal<T>::WT;

class Event
struct Event
{
using F = std::function<void(T const&)>;
private:
std::function<void(T const&)> action;
Lifetime lifetime;
F action;
std::atomic_int8_t state;

constexpr static int8_t ACTIVE = 0;
constexpr static int8_t FIRING = 1;
constexpr static int8_t TERMINATED = 2;
public:
// region ctor/dtor
Event() = delete;

template <typename F>
Event(F&& action, Lifetime lifetime) : action(std::forward<F>(action)), lifetime(lifetime)
explicit Event(const Lifetime& lifetime, F&& action) : lifetime(lifetime), action(std::forward<F>(action)), state(ACTIVE)
{
}

Event(Event&&) = default;
// endregion
Event& operator=(Event&& other) = default;

bool is_alive() const
bool operator()(T const& arg)
{
return !lifetime->is_terminated();
}
if (lifetime->is_terminated())
return false;

void execute_if_alive(T const& value) const
{
if (is_alive())
auto expected_state = ACTIVE;
// set firing flag to prevent action destruction during action firing
// skip action if it isn't active (lifetime was terminated)
if (!state.compare_exchange_strong(expected_state, FIRING))
return false;

expected_state = FIRING;
try
{
action(value);
action(arg);
return state.compare_exchange_strong(expected_state, ACTIVE);
}
catch (...)
{
if (!state.compare_exchange_strong(expected_state, ACTIVE))
action = nullptr;
throw;
}
}

void terminate()
{
const auto old_state = state.exchange(TERMINATED);
// release action immediatelly if it isn't firing right now
if (old_state == ACTIVE)
action = nullptr;
lifetime = Lifetime::Terminated();
}
};

using counter_t = int32_t;
using listeners_t = std::map<counter_t, Event>;
using listeners_t = std::vector<std::shared_ptr<Event>>;

mutable counter_t advise_id = 0;
mutable listeners_t listeners, priority_listeners;

static void cleanup(listeners_t& queue)
{
util::erase_if(queue, [](Event const& e) -> bool { return !e.is_alive(); });
}

void fire_impl(T const& value, listeners_t& queue) const
{
for (auto const& p : queue)
auto it = queue.begin();
auto end = queue.end();
auto alive_it = it;
while (it != end)
{
auto const& event = p.second;
event.execute_if_alive(value);
if (it->get()->operator()(value))
{
if (alive_it != it)
*alive_it = std::move(*it);
++alive_it;
}
++it;
}
cleanup(queue);
if (alive_it != end)
queue.erase(alive_it, end);
}

template <typename F>
void advise0(const Lifetime& lifetime, F&& handler, listeners_t& queue) const
{
if (lifetime->is_terminated())
return;
counter_t id = advise_id /*.load()*/;
queue.emplace(id, Event(std::forward<F>(handler), lifetime));
++advise_id;
auto event_ptr = std::make_shared<Event>(lifetime, std::forward<F>(handler));
lifetime->add_action([event_ptr] { event_ptr->terminate(); });
queue.push_back(std::move(event_ptr));
}

public:
Expand Down
53 changes: 53 additions & 0 deletions rd-cpp/src/rd_framework_cpp/src/test/cases/RdSignalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,58 @@ TEST_F(RdFrameworkTestBase, signal_move)
RdSignal<int> signal1;
RdSignal<int> signal2(std::move(signal1));

AfterTest();
}

TEST_F(RdFrameworkTestBase, signal_release_resources)
{
RdSignal<int> signal;
statics(signal, 1);

bindStatic(serverProtocol.get(), signal, static_name);

EXPECT_NO_THROW(
auto ptr = std::make_shared<int>(0);
{
const LifetimeDefinition def;
signal.advise(def.lifetime, [ptr](auto const& value) { *ptr = value; });
}
EXPECT_TRUE(ptr.unique()) << "Signal should release reference to ptr from lambda.";
signal.fire(42);
EXPECT_EQ(*ptr, 0) << "Signal shouldn't impact ptr value after lifetime termination.";
);

AfterTest();
}

TEST_F(RdFrameworkTestBase, signal_release_resources_from_handler)
{
RdSignal<int> signal;
statics(signal, 1);
bindStatic(serverProtocol.get(), signal, static_name);

auto ptr = std::make_shared<int>(0);
{
struct Payload
{
LifetimeDefinition def;
std::shared_ptr<int> ptr;
};
auto payload = std::make_shared<Payload>(Payload{LifetimeDefinition(), ptr});
signal.advise(payload->def.lifetime, [payload](auto const& value)
{
payload->def.terminate();
*(payload->ptr) = value;
});
// only lambda keeps payload now, it also keeps def reference preventing it from auto-terminating on out-of-scope.
// instead from callback we terminate payload which then should successfully complete callback and release all resources
// effectively destructing Payload and releasing ptr reference.
}
signal.fire(42);
EXPECT_EQ(*ptr, 42);
EXPECT_TRUE(ptr.unique()) << "Signal should release reference to ptr from lambda.";
signal.fire(24);
EXPECT_EQ(*ptr, 42) << "Signal shouldn't impact ptr value after lifetime termination.";

AfterTest();
}

0 comments on commit 22eecd0

Please sign in to comment.