Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Scheduling Policies #19

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/reactor-cpp/action.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public:
[[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; }

friend class Reaction;
friend class Scheduler;
template<class SchedulingPolicy> friend class Scheduler;
};

template <class T> class Action : public BaseAction {
Expand Down Expand Up @@ -154,6 +154,6 @@ public:

} // namespace reactor

#include "impl/action_impl.hh"
#include "reactor-cpp/impl/action_impl.hh"

#endif // REACTOR_CPP_ACTION_HH
1 change: 1 addition & 0 deletions include/reactor-cpp/assert.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ constexpr bool runtime_assertion = true;
#include "environment.hh"

#include <cassert>
#include <map>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down
60 changes: 60 additions & 0 deletions include/reactor-cpp/base_scheduler.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2022 TU Dresden
* All rights reserved.
*
* Authors:
* Christian Menard
*/

#ifndef REACTOR_CPP_BASE_SCHEDULER_HH
#define REACTOR_CPP_BASE_SCHEDULER_HH

#include <atomic>
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>

#include "reactor-cpp/environment.hh"
#include "reactor-cpp/fwd.hh"
#include "reactor-cpp/logical_time.hh"

namespace reactor {

using EventMap = std::map<BaseAction*, std::function<void(void)>>;

class BaseScheduler {
protected:
Environment* environment_; // NOLINT

LogicalTime logical_time_{}; // NOLINT
const bool using_workers_; // NOLINT

std::mutex scheduling_mutex_; // NOLINT
std::unique_lock<std::mutex> scheduling_lock_{scheduling_mutex_, std::defer_lock}; // NOLINT
std::condition_variable cv_schedule_; // NOLINT

std::mutex lock_event_queue_; // NOLINT
std::map<Tag, EventMap> event_queue_; // NOLINT

public:
BaseScheduler(Environment* env);
virtual ~BaseScheduler() = default;
BaseScheduler(const BaseScheduler&) = delete;
BaseScheduler(BaseScheduler&&) = delete;
auto operator=(const BaseScheduler&) -> BaseScheduler& = delete;
auto operator=(BaseScheduler&&) -> BaseScheduler& = delete;

[[nodiscard]] inline auto logical_time() const noexcept -> const auto& { return logical_time_; }
void schedule_sync(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);
void schedule_async(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);

virtual void set_port(BasePort* port) = 0;

void inline lock() noexcept { scheduling_lock_.lock(); }
void inline unlock() noexcept { scheduling_lock_.unlock(); }
};

} // namespace reactor

#endif // REACTOR_CPP_BASE_SCHEDULER_HH
86 changes: 86 additions & 0 deletions include/reactor-cpp/default_scheduling_policy.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2022 TU Dresden
* All rights reserved.
*
* Authors:
* Christian Menard
*/

#ifndef REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH
#define REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH

#include <cstddef>
#include <vector>

#include "reactor-cpp/fwd.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/semaphore.hh"

namespace reactor {

class DefaultSchedulingPolicy {
Scheduler<DefaultSchedulingPolicy>& scheduler_;
Environment& environment_;
std::size_t identity_counter{0};

class ReadyQueue {
private:
std::vector<Reaction*> queue_{};
std::atomic<std::ptrdiff_t> size_{0};
Semaphore sem_{0};
std::ptrdiff_t waiting_workers_{0};
const unsigned int num_workers_;

public:
explicit ReadyQueue(unsigned num_workers)
: num_workers_(num_workers) {}

/**
* Retrieve a ready reaction from the queue.
*
* This method may be called concurrently. In case the queue is empty, the
* method blocks and waits until a ready reaction becomes available.
*/
auto pop() -> Reaction*;

/**
* Fill the queue up with ready reactions.
*
* This method assumes that the internal queue is empty. It moves all
* reactions from the provided `ready_reactions` vector to the internal
* queue, leaving `ready_reactions` empty.
*
* Note that this method is not thread-safe. The caller needs to ensure that
* no other thread will try to read from the queue during this operation.
*/
void fill_up(std::vector<Reaction*>& ready_reactions);
};

ReadyQueue ready_queue_;

std::vector<std::vector<Reaction*>> reaction_queue_;
unsigned int reaction_queue_pos_{std::numeric_limits<unsigned>::max()};

std::atomic<std::ptrdiff_t> reactions_to_process_{0};
std::vector<std::vector<Reaction*>> triggered_reactions_;

bool continue_execution_{true};

void schedule() noexcept;
void terminate_all_workers();
auto schedule_ready_reactions() -> bool;

public:
DefaultSchedulingPolicy(Scheduler<DefaultSchedulingPolicy>& scheduler, Environment& env);

void init();
auto create_worker() -> Worker<DefaultSchedulingPolicy>;
void worker_function(const Worker<DefaultSchedulingPolicy>& worker);

void trigger_reaction_from_next(Reaction* reaction);
void trigger_reaction_from_set_port(Reaction* reaction);
};

} // namespace reactor

#endif // REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH
25 changes: 12 additions & 13 deletions include/reactor-cpp/environment.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
#ifndef REACTOR_CPP_ENVIRONMENT_HH
#define REACTOR_CPP_ENVIRONMENT_HH

#include <memory>
#include <set>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "reactor.hh"
#include "scheduler.hh"
#include "reactor-cpp/default_scheduling_policy.hh"
#include "reactor-cpp/fwd.hh"
#include "reactor-cpp/reactor.hh"

namespace reactor {

Expand All @@ -38,7 +42,7 @@ private:
std::set<Reaction*> reactions_{};
std::vector<Dependency> dependencies_{};

Scheduler scheduler_;
std::unique_ptr<Scheduler<DefaultSchedulingPolicy>> scheduler_;
Phase phase_{Phase::Construction};
TimePoint start_time_{};

Expand All @@ -47,11 +51,7 @@ private:

public:
explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever,
bool fast_fwd_execution = default_fast_fwd_execution)
: num_workers_(num_workers)
, run_forever_(run_forever)
, fast_fwd_execution_(fast_fwd_execution)
, scheduler_(this) {}
bool fast_fwd_execution = default_fast_fwd_execution);

void register_reactor(Reactor* reactor);
void assemble();
Expand All @@ -70,14 +70,13 @@ public:

[[nodiscard]] auto top_level_reactors() const noexcept -> const auto& { return top_level_reactors_; }
[[nodiscard]] auto phase() const noexcept -> Phase { return phase_; }
[[nodiscard]] auto scheduler() const noexcept -> const Scheduler* { return &scheduler_; }
[[nodiscard]] auto scheduler() const noexcept -> const BaseScheduler&;
[[nodiscard]] auto scheduler() noexcept -> BaseScheduler&;

auto scheduler() noexcept -> Scheduler* { return &scheduler_; }

[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime&;
[[nodiscard]] auto start_time() const noexcept -> const TimePoint& { return start_time_; }

static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }
[[nodiscard]] static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }

[[nodiscard]] auto num_workers() const noexcept -> unsigned int { return num_workers_; }
[[nodiscard]] auto fast_fwd_execution() const noexcept -> bool { return fast_fwd_execution_; }
Expand Down
6 changes: 5 additions & 1 deletion include/reactor-cpp/fwd.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ class BasePort;
class Environment;
class Reaction;
class Reactor;
class Scheduler;
class Tag;

class BaseScheduler;
class DefaultSchedulingPolicy;
template <class SchedulingPolicy> class Worker;
template <class SchedulingPolicy> class Scheduler;

template <class T> class Action;
template <class T> class Port;

Expand Down
33 changes: 17 additions & 16 deletions include/reactor-cpp/impl/action_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,41 @@
#ifndef REACTOR_CPP_IMPL_ACTION_IMPL_HH
#define REACTOR_CPP_IMPL_ACTION_IMPL_HH

#include "../assert.hh"
#include "../environment.hh"
#include "reactor-cpp/assert.hh"
#include "reactor-cpp/base_scheduler.hh"
#include "reactor-cpp/environment.hh"

namespace reactor {

template <class T> template <class Dur> void Action<T>::schedule(const ImmutableValuePtr<T>& value_ptr, Dur delay) {
Duration time_delay = std::chrono::duration_cast<Duration>(delay); // NOLINT
Duration time_delay = std::chrono::duration_cast<Duration>(delay);
reactor::validate(time_delay >= Duration::zero(), "Schedule cannot be called with a negative delay!");
reactor::validate(value_ptr != nullptr, "Actions may not be scheduled with a nullptr value!");
auto* scheduler = environment()->scheduler(); // NOLINT
auto setup = [value_ptr, this]() { this->value_ptr_ = std::move(value_ptr); }; // NOLINT
auto& scheduler = environment()->scheduler();
auto setup = [value_ptr, this]() { this->value_ptr_ = std::move(value_ptr); };
if (is_logical()) {
time_delay += this->min_delay();
auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(time_delay); // NOLINT
scheduler->schedule_sync(tag, this, setup);
auto tag = Tag::from_logical_time(scheduler.logical_time()).delay(time_delay);
scheduler.schedule_sync(tag, this, setup);
} else {
auto tag = Tag::from_physical_time(get_physical_time() + time_delay); // NOLINT
scheduler->schedule_async(tag, this, setup);
auto tag = Tag::from_physical_time(get_physical_time() + time_delay);
scheduler.schedule_async(tag, this, setup);
}
}

template <class Dur> void Action<void>::schedule(Dur delay) {
auto time_delay = std::chrono::duration_cast<Duration>(delay); // NOLINT
auto time_delay = std::chrono::duration_cast<Duration>(delay);
reactor::validate(time_delay >= Duration::zero(), "Schedule cannot be called with a negative delay!");
auto* scheduler = environment()->scheduler(); // NOLINT
auto setup = [this]() { this->present_ = true; }; // NOLINT
auto& scheduler = environment()->scheduler();
auto setup = [this]() { this->present_ = true; };
if (is_logical()) {
time_delay += this->min_delay();
auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(time_delay); // NOLINT
scheduler->schedule_sync(tag, this, setup);
auto tag = Tag::from_logical_time(scheduler.logical_time()).delay(time_delay);
scheduler.schedule_sync(tag, this, setup);
} else {
// physical action
auto tag = Tag::from_physical_time(get_physical_time() + time_delay); // NOLINT
scheduler->schedule_async(tag, this, setup);
auto tag = Tag::from_physical_time(get_physical_time() + time_delay);
scheduler.schedule_async(tag, this, setup);
}
}

Expand Down
11 changes: 7 additions & 4 deletions include/reactor-cpp/impl/port_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
#ifndef REACTOR_CPP_IMPL_PORT_IMPL_HH
#define REACTOR_CPP_IMPL_PORT_IMPL_HH

#include "../assert.hh"
#include "../environment.hh"
#include "reactor-cpp/port.hh"

#include "reactor-cpp/assert.hh"
#include "reactor-cpp/base_scheduler.hh"
#include "reactor-cpp/environment.hh"

namespace reactor {

Expand All @@ -31,9 +34,9 @@ template <class T> void Port<T>::set(const ImmutableValuePtr<T>& value_ptr) {
reactor::validate(!has_inward_binding(), "set() may only be called on ports that do not have an inward "
"binding!");
reactor::validate(value_ptr != nullptr, "Ports may not be set to nullptr!");
auto scheduler = environment()->scheduler();
auto& scheduler = environment()->scheduler();
this->value_ptr_ = std::move(value_ptr);
scheduler->set_port(this);
scheduler.set_port(this);
}

template <class T> auto Port<T>::get() const noexcept -> const ImmutableValuePtr<T>& {
Expand Down
Loading