diff --git a/include/reactor-cpp/action.hh b/include/reactor-cpp/action.hh index 129f6526..0793df7c 100644 --- a/include/reactor-cpp/action.hh +++ b/include/reactor-cpp/action.hh @@ -44,7 +44,7 @@ public: [[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; } friend class Reaction; - friend class Scheduler; + template friend class Scheduler; }; template class Action : public BaseAction { @@ -154,6 +154,6 @@ public: } // namespace reactor -#include "impl/action_impl.hh" +#include "reactor-cpp/impl/action_impl.hh" #endif // REACTOR_CPP_ACTION_HH diff --git a/include/reactor-cpp/assert.hh b/include/reactor-cpp/assert.hh index 424ebfdf..03529a38 100644 --- a/include/reactor-cpp/assert.hh +++ b/include/reactor-cpp/assert.hh @@ -24,6 +24,7 @@ constexpr bool runtime_assertion = true; #include "environment.hh" #include +#include #include #include #include diff --git a/include/reactor-cpp/base_scheduler.hh b/include/reactor-cpp/base_scheduler.hh new file mode 100644 index 00000000..0567a29c --- /dev/null +++ b/include/reactor-cpp/base_scheduler.hh @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2022 TU Dresden + * All rights reserved. + * + * Authors: + * Christian Menard + */ + +#ifndef REACTOR_CPP_BASE_SCHEDULER_HH +#define REACTOR_CPP_BASE_SCHEDULER_HH + +#include +#include +#include +#include +#include + +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/logical_time.hh" + +namespace reactor { + +using EventMap = std::map>; + +class BaseScheduler { +protected: + Environment* environment_; // NOLINT + + LogicalTime logical_time_{}; // NOLINT + const bool using_workers_; // NOLINT + + std::mutex scheduling_mutex_; // NOLINT + std::unique_lock scheduling_lock_{scheduling_mutex_, std::defer_lock}; // NOLINT + std::condition_variable cv_schedule_; // NOLINT + + std::mutex lock_event_queue_; // NOLINT + std::map event_queue_; // NOLINT + +public: + BaseScheduler(Environment* env); + virtual ~BaseScheduler() = default; + BaseScheduler(const BaseScheduler&) = delete; + BaseScheduler(BaseScheduler&&) = delete; + auto operator=(const BaseScheduler&) -> BaseScheduler& = delete; + auto operator=(BaseScheduler&&) -> BaseScheduler& = delete; + + [[nodiscard]] inline auto logical_time() const noexcept -> const auto& { return logical_time_; } + void schedule_sync(const Tag& tag, BaseAction* action, std::function pre_handler); + void schedule_async(const Tag& tag, BaseAction* action, std::function pre_handler); + + virtual void set_port(BasePort* port) = 0; + + void inline lock() noexcept { scheduling_lock_.lock(); } + void inline unlock() noexcept { scheduling_lock_.unlock(); } +}; + +} // namespace reactor + +#endif // REACTOR_CPP_BASE_SCHEDULER_HH diff --git a/include/reactor-cpp/default_scheduling_policy.hh b/include/reactor-cpp/default_scheduling_policy.hh new file mode 100644 index 00000000..e1ecc5cf --- /dev/null +++ b/include/reactor-cpp/default_scheduling_policy.hh @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2022 TU Dresden + * All rights reserved. + * + * Authors: + * Christian Menard + */ + +#ifndef REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH +#define REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH + +#include +#include + +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/reaction.hh" +#include "reactor-cpp/semaphore.hh" + +namespace reactor { + +class DefaultSchedulingPolicy { + Scheduler& scheduler_; + Environment& environment_; + std::size_t identity_counter{0}; + + class ReadyQueue { + private: + std::vector queue_{}; + std::atomic size_{0}; + Semaphore sem_{0}; + std::ptrdiff_t waiting_workers_{0}; + const unsigned int num_workers_; + + public: + explicit ReadyQueue(unsigned num_workers) + : num_workers_(num_workers) {} + + /** + * Retrieve a ready reaction from the queue. + * + * This method may be called concurrently. In case the queue is empty, the + * method blocks and waits until a ready reaction becomes available. + */ + auto pop() -> Reaction*; + + /** + * Fill the queue up with ready reactions. + * + * This method assumes that the internal queue is empty. It moves all + * reactions from the provided `ready_reactions` vector to the internal + * queue, leaving `ready_reactions` empty. + * + * Note that this method is not thread-safe. The caller needs to ensure that + * no other thread will try to read from the queue during this operation. + */ + void fill_up(std::vector& ready_reactions); + }; + + ReadyQueue ready_queue_; + + std::vector> reaction_queue_; + unsigned int reaction_queue_pos_{std::numeric_limits::max()}; + + std::atomic reactions_to_process_{0}; + std::vector> triggered_reactions_; + + bool continue_execution_{true}; + + void schedule() noexcept; + void terminate_all_workers(); + auto schedule_ready_reactions() -> bool; + +public: + DefaultSchedulingPolicy(Scheduler& scheduler, Environment& env); + + void init(); + auto create_worker() -> Worker; + void worker_function(const Worker& worker); + + void trigger_reaction_from_next(Reaction* reaction); + void trigger_reaction_from_set_port(Reaction* reaction); +}; + +} // namespace reactor + +#endif // REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 8679f26f..8973b5c3 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -9,12 +9,16 @@ #ifndef REACTOR_CPP_ENVIRONMENT_HH #define REACTOR_CPP_ENVIRONMENT_HH +#include #include #include +#include +#include #include -#include "reactor.hh" -#include "scheduler.hh" +#include "reactor-cpp/default_scheduling_policy.hh" +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/reactor.hh" namespace reactor { @@ -38,7 +42,7 @@ private: std::set reactions_{}; std::vector dependencies_{}; - Scheduler scheduler_; + std::unique_ptr> scheduler_; Phase phase_{Phase::Construction}; TimePoint start_time_{}; @@ -47,11 +51,7 @@ private: public: explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever, - bool fast_fwd_execution = default_fast_fwd_execution) - : num_workers_(num_workers) - , run_forever_(run_forever) - , fast_fwd_execution_(fast_fwd_execution) - , scheduler_(this) {} + bool fast_fwd_execution = default_fast_fwd_execution); void register_reactor(Reactor* reactor); void assemble(); @@ -70,14 +70,13 @@ public: [[nodiscard]] auto top_level_reactors() const noexcept -> const auto& { return top_level_reactors_; } [[nodiscard]] auto phase() const noexcept -> Phase { return phase_; } - [[nodiscard]] auto scheduler() const noexcept -> const Scheduler* { return &scheduler_; } + [[nodiscard]] auto scheduler() const noexcept -> const BaseScheduler&; + [[nodiscard]] auto scheduler() noexcept -> BaseScheduler&; - auto scheduler() noexcept -> Scheduler* { return &scheduler_; } - - [[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); } + [[nodiscard]] auto logical_time() const noexcept -> const LogicalTime&; [[nodiscard]] auto start_time() const noexcept -> const TimePoint& { return start_time_; } - static auto physical_time() noexcept -> TimePoint { return get_physical_time(); } + [[nodiscard]] static auto physical_time() noexcept -> TimePoint { return get_physical_time(); } [[nodiscard]] auto num_workers() const noexcept -> unsigned int { return num_workers_; } [[nodiscard]] auto fast_fwd_execution() const noexcept -> bool { return fast_fwd_execution_; } diff --git a/include/reactor-cpp/fwd.hh b/include/reactor-cpp/fwd.hh index 52edced4..7494ac02 100644 --- a/include/reactor-cpp/fwd.hh +++ b/include/reactor-cpp/fwd.hh @@ -16,9 +16,13 @@ class BasePort; class Environment; class Reaction; class Reactor; -class Scheduler; class Tag; +class BaseScheduler; +class DefaultSchedulingPolicy; +template class Worker; +template class Scheduler; + template class Action; template class Port; diff --git a/include/reactor-cpp/impl/action_impl.hh b/include/reactor-cpp/impl/action_impl.hh index 3ed4fb12..c2dca9bd 100644 --- a/include/reactor-cpp/impl/action_impl.hh +++ b/include/reactor-cpp/impl/action_impl.hh @@ -9,40 +9,41 @@ #ifndef REACTOR_CPP_IMPL_ACTION_IMPL_HH #define REACTOR_CPP_IMPL_ACTION_IMPL_HH -#include "../assert.hh" -#include "../environment.hh" +#include "reactor-cpp/assert.hh" +#include "reactor-cpp/base_scheduler.hh" +#include "reactor-cpp/environment.hh" namespace reactor { template template void Action::schedule(const ImmutableValuePtr& value_ptr, Dur delay) { - Duration time_delay = std::chrono::duration_cast(delay); // NOLINT + Duration time_delay = std::chrono::duration_cast(delay); reactor::validate(time_delay >= Duration::zero(), "Schedule cannot be called with a negative delay!"); reactor::validate(value_ptr != nullptr, "Actions may not be scheduled with a nullptr value!"); - auto* scheduler = environment()->scheduler(); // NOLINT - auto setup = [value_ptr, this]() { this->value_ptr_ = std::move(value_ptr); }; // NOLINT + auto& scheduler = environment()->scheduler(); + auto setup = [value_ptr, this]() { this->value_ptr_ = std::move(value_ptr); }; if (is_logical()) { time_delay += this->min_delay(); - auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(time_delay); // NOLINT - scheduler->schedule_sync(tag, this, setup); + auto tag = Tag::from_logical_time(scheduler.logical_time()).delay(time_delay); + scheduler.schedule_sync(tag, this, setup); } else { - auto tag = Tag::from_physical_time(get_physical_time() + time_delay); // NOLINT - scheduler->schedule_async(tag, this, setup); + auto tag = Tag::from_physical_time(get_physical_time() + time_delay); + scheduler.schedule_async(tag, this, setup); } } template void Action::schedule(Dur delay) { - auto time_delay = std::chrono::duration_cast(delay); // NOLINT + auto time_delay = std::chrono::duration_cast(delay); reactor::validate(time_delay >= Duration::zero(), "Schedule cannot be called with a negative delay!"); - auto* scheduler = environment()->scheduler(); // NOLINT - auto setup = [this]() { this->present_ = true; }; // NOLINT + auto& scheduler = environment()->scheduler(); + auto setup = [this]() { this->present_ = true; }; if (is_logical()) { time_delay += this->min_delay(); - auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(time_delay); // NOLINT - scheduler->schedule_sync(tag, this, setup); + auto tag = Tag::from_logical_time(scheduler.logical_time()).delay(time_delay); + scheduler.schedule_sync(tag, this, setup); } else { // physical action - auto tag = Tag::from_physical_time(get_physical_time() + time_delay); // NOLINT - scheduler->schedule_async(tag, this, setup); + auto tag = Tag::from_physical_time(get_physical_time() + time_delay); + scheduler.schedule_async(tag, this, setup); } } diff --git a/include/reactor-cpp/impl/port_impl.hh b/include/reactor-cpp/impl/port_impl.hh index 0755aeea..6fac48cd 100644 --- a/include/reactor-cpp/impl/port_impl.hh +++ b/include/reactor-cpp/impl/port_impl.hh @@ -9,8 +9,11 @@ #ifndef REACTOR_CPP_IMPL_PORT_IMPL_HH #define REACTOR_CPP_IMPL_PORT_IMPL_HH -#include "../assert.hh" -#include "../environment.hh" +#include "reactor-cpp/port.hh" + +#include "reactor-cpp/assert.hh" +#include "reactor-cpp/base_scheduler.hh" +#include "reactor-cpp/environment.hh" namespace reactor { @@ -31,9 +34,9 @@ template void Port::set(const ImmutableValuePtr& value_ptr) { reactor::validate(!has_inward_binding(), "set() may only be called on ports that do not have an inward " "binding!"); reactor::validate(value_ptr != nullptr, "Ports may not be set to nullptr!"); - auto scheduler = environment()->scheduler(); + auto& scheduler = environment()->scheduler(); this->value_ptr_ = std::move(value_ptr); - scheduler->set_port(this); + scheduler.set_port(this); } template auto Port::get() const noexcept -> const ImmutableValuePtr& { diff --git a/include/reactor-cpp/impl/scheduler_impl.hh b/include/reactor-cpp/impl/scheduler_impl.hh new file mode 100644 index 00000000..327c8cb6 --- /dev/null +++ b/include/reactor-cpp/impl/scheduler_impl.hh @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2022 TU Dresden + * All rights reserved. + * + * Authors: + * Christian Menard + */ + +#ifndef REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH +#define REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH + +#include "reactor-cpp/action.hh" +#include "reactor-cpp/assert.hh" +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/logging.hh" +#include "reactor-cpp/port.hh" +#include "reactor-cpp/reaction.hh" +#include "reactor-cpp/trace.hh" + +#include + +namespace reactor { + +template +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +thread_local std::size_t Worker::current_worker_id_{0}; + +template auto Worker::current_worker_id() -> std::size_t { + return current_worker_id_; +} + +template +Worker::Worker(Worker&& worker) // NOLINT(performance-noexcept-move-constructor) + : policy_{worker.policy_} + , identity_{worker.identity_} { + // Need to provide the move constructor in order to organize workers in a + // std::vector. However, moving is not save if the thread is already running, + // thus we throw an exception here if the worker is moved but the + // internal thread is already running. + + if (worker.thread_.joinable()) { + throw std::runtime_error{"Running workers cannot be moved!"}; + } +} + +template void Worker::work() const { + // initialize the current worker thread local variable + current_worker_id_ = identity_; + + log::Debug() << "(Worker " << identity_ << ") Starting"; + + policy_.worker_function(*this); + + log::Debug() << "(Worker " << identity_ << ") terminates"; +} + +template void Worker::execute_reaction(Reaction* reaction) const { + log::Debug() << "(Worker " << identity_ << ") " + << "execute reaction " << reaction->fqn(); + + tracepoint(reactor_cpp, reaction_execution_starts, id, reaction->fqn(), scheduler.logical_time()); + reaction->trigger(); + tracepoint(reactor_cpp, reaction_execution_finishes, id, reaction->fqn(), scheduler.logical_time()); +} + +template +Scheduler::Scheduler(Environment* env) + : BaseScheduler(env) + , policy_(*this, *env) {} + +template void Scheduler::start() { + log::Debug() << "Starting the scheduler..."; + + auto num_workers = environment_->num_workers(); + // initialize the reaction queue, set ports vector, and triggered reactions + // vector + set_ports_.resize(num_workers); + policy_.init(); + + // Initialize and start the workers. By resizing the workers vector first, + // we make sure that there is sufficient space for all the workers and non of + // them needs to be moved. This is important because a running worker may not + // be moved. + workers_.reserve(num_workers); + for (unsigned i = 0; i < num_workers; i++) { + workers_.emplace_back(policy_.create_worker()); + workers_.back().start(); + } + + // join all worker threads + for (auto& worker : workers_) { + worker.join(); + } +} + +// FIXME: Reduce complexity of this function +// NOLINTNEXTLINE(readability-function-cognitive-complexity) +template auto Scheduler::next() -> bool { + static EventMap events{}; + bool continue_execution{true}; + + // clean up before scheduling any new events + if (!events.empty()) { + // cleanup all triggered actions + for (auto& vec_ports : events) { + vec_ports.first->cleanup(); + } + // cleanup all set ports + for (auto& vec_ports : set_ports_) { + for (auto& port : vec_ports) { + port->cleanup(); + } + vec_ports.clear(); + } + events.clear(); + } + + { + std::unique_lock lock{scheduling_mutex_}; + + // shutdown if there are no more events in the queue + if (event_queue_.empty() && !stop_) { + if (environment_->run_forever()) { + // wait for a new asynchronous event + cv_schedule_.wait(lock, [this]() { return !event_queue_.empty() || stop_; }); + } else { + log::Debug() << "No more events in queue_. -> Terminate!"; + environment_->sync_shutdown(); + } + } + + while (events.empty()) { + if (stop_) { + continue_execution = false; + log::Debug() << "Shutting down the scheduler"; + Tag t_next = Tag::from_logical_time(logical_time_).delay(); + if (t_next == event_queue_.begin()->first) { + log::Debug() << "Schedule the last round of reactions including all " + "termination reactions"; + events = std::move(event_queue_.begin()->second); + event_queue_.erase(event_queue_.begin()); + log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]"; + logical_time_.advance_to(t_next); + } else { + return continue_execution; + } + } else { + // collect events of the next tag + auto t_next = event_queue_.begin()->first; + + // synchronize with physical time if not in fast forward mode + if (!environment_->fast_fwd_execution()) { + // keep track of the current physical time in a static variable + static auto physical_time = TimePoint::min(); + + // If physical time is smaller than the next logical time point, + // then update the physical time. This step is small optimization to + // avoid calling get_physical_time() in every iteration as this + // would add a significant overhead. + if (physical_time < t_next.time_point()) { + physical_time = get_physical_time(); + } + + // If physical time is still smaller than the next logical time + // point, then wait until the next tag or until a new event is + // inserted asynchronously into the queue + if (physical_time < t_next.time_point()) { + auto status = cv_schedule_.wait_until(lock, t_next.time_point()); + // Start over if the event queue was modified + if (status == std::cv_status::no_timeout) { + continue; + } + // update physical time and continue otherwise + physical_time = t_next.time_point(); + } + } + + // retrieve all events with tag equal to current logical time from the + // queue + events = std::move(event_queue_.begin()->second); + event_queue_.erase(event_queue_.begin()); + + // advance logical time + log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]"; + logical_time_.advance_to(t_next); + } + } + } // mutex schedule_ + + // execute all setup functions; this sets the values of the corresponding + // actions + for (auto& vec_reactor : events) { + auto& setup = vec_reactor.second; + if (setup != nullptr) { + setup(); + } + } + + log::Debug() << "events: " << events.size(); + for (auto& vec_reactor : events) { + log::Debug() << "Action " << vec_reactor.first->fqn(); + for (auto* reaction : vec_reactor.first->triggers()) { + // There is no need to acquire the mutex. At this point the scheduler + // should be the only thread accessing the reaction queue as none of the + // workers_ are running + log::Debug() << "(Scheduler) trigger reaction " << reaction->fqn() << " with index " << reaction->index(); + policy_.trigger_reaction_from_next(reaction); + } + } + + return continue_execution; +} + +template void Scheduler::set_port(BasePort* port) { + log::Debug() << "Set port " << port->fqn(); + + // We do not check here if port is already in the list. This means clean() + // could be called multiple times for a single port. However, calling + // clean() multiple time is not harmful and more efficient then checking if + set_ports_[Worker::current_worker_id()].push_back(port); + + // recursively search for triggered reactions + set_port_helper(port); +} + +template void Scheduler::set_port_helper(BasePort* port) { + for (auto* reaction : port->triggers()) { + policy_.trigger_reaction_from_set_port(reaction); + } + for (auto* binding : port->outward_bindings()) { + set_port_helper(binding); + } +} + +template void Scheduler::stop() { + stop_ = true; + cv_schedule_.notify_one(); +} + +} // namespace reactor + +#endif // REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH diff --git a/include/reactor-cpp/logging.hh b/include/reactor-cpp/logging.hh index e27863fc..9c738238 100644 --- a/include/reactor-cpp/logging.hh +++ b/include/reactor-cpp/logging.hh @@ -9,8 +9,9 @@ #ifndef REACTOR_CPP_LOGGING_HH #define REACTOR_CPP_LOGGING_HH -#include "reactor-cpp/config.hh" //NOLINT +#include "reactor-cpp/config.hh" #include "reactor-cpp/time.hh" + #include #include #include diff --git a/include/reactor-cpp/port.hh b/include/reactor-cpp/port.hh index f2345532..b8dd8d8e 100644 --- a/include/reactor-cpp/port.hh +++ b/include/reactor-cpp/port.hh @@ -56,7 +56,7 @@ public: [[nodiscard]] inline auto antidependencies() const noexcept -> const auto& { return antidependencies_; } friend class Reaction; - friend class Scheduler; + template friend class Scheduler; }; template class Port : public BasePort { @@ -127,6 +127,6 @@ public: } // namespace reactor -#include "impl/port_impl.hh" +#include "reactor-cpp/impl/port_impl.hh" #endif // REACTOR_CPP_PORT_HH diff --git a/include/reactor-cpp/reactor-cpp.hh b/include/reactor-cpp/reactor-cpp.hh index 024597c0..56155a3c 100644 --- a/include/reactor-cpp/reactor-cpp.hh +++ b/include/reactor-cpp/reactor-cpp.hh @@ -17,6 +17,7 @@ #include "port.hh" #include "reaction.hh" #include "reactor.hh" +#include "scheduler.hh" #include "time.hh" #endif // REACTOR_CPP_REACTOR_CPP_HH diff --git a/include/reactor-cpp/scheduler.hh b/include/reactor-cpp/scheduler.hh index 652afb6b..924a6e26 100644 --- a/include/reactor-cpp/scheduler.hh +++ b/include/reactor-cpp/scheduler.hh @@ -10,6 +10,7 @@ #define REACTOR_CPP_SCHEDULER_HH #include +#include #include #include #include @@ -18,128 +19,78 @@ #include #include -#include "fwd.hh" -#include "logical_time.hh" -#include "semaphore.hh" +#include "reactor-cpp/base_scheduler.hh" +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/logical_time.hh" namespace reactor { -// forward declarations -class Scheduler; -class Worker; - -class Worker { // NOLINT -public: - Scheduler& scheduler_; - const unsigned int identity_{0}; +template class Worker { +private: + SchedulingPolicy& policy_; + const std::size_t identity_{0}; std::thread thread_{}; // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) - static thread_local const Worker* current_worker; + static thread_local std::size_t current_worker_id_; void work() const; void execute_reaction(Reaction* reaction) const; - Worker(Scheduler& scheduler, unsigned int identity) - : scheduler_{scheduler} +public: + Worker(SchedulingPolicy& policy, std::size_t identity) + : policy_{policy} , identity_{identity} {} Worker(Worker&& worker); // NOLINT(performance-noexcept-move-constructor) Worker(const Worker& worker) = delete; + ~Worker() = default; - void start_thread() { thread_ = std::thread(&Worker::work, this); } - void join_thread() { thread_.join(); } + auto operator=(const Worker& worker) -> Worker& = delete; + auto operator=(Worker&& worker) -> Worker& = delete; - static auto current_worker_id() -> unsigned { return current_worker->identity_; } -}; + void start() { thread_ = std::thread(&Worker::work, this); } + void join() { thread_.join(); } -class ReadyQueue { -private: - std::vector queue_{}; - std::atomic size_{0}; - Semaphore sem_{0}; - std::ptrdiff_t waiting_workers_{0}; - const unsigned int num_workers_; + [[nodiscard]] auto id() const -> std::size_t { return identity_; } -public: - explicit ReadyQueue(unsigned num_workers) - : num_workers_(num_workers) {} - - /** - * Retrieve a ready reaction from the queue. - * - * This method may be called concurrently. In case the queue is empty, the - * method blocks and waits until a ready reaction becomes available. - */ - auto pop() -> Reaction*; - - /** - * Fill the queue up with ready reactions. - * - * This method assumes that the internal queue is empty. It moves all - * reactions from the provided `ready_reactions` vector to the internal - * queue, leaving `ready_reactions` empty. - * - * Note that this method is not thread-safe. The caller needs to ensure that - * no other thread will try to read from the queue during this operation. - */ - void fill_up(std::vector& ready_reactions); -}; + static auto current_worker_id() -> std::size_t; -using EventMap = std::map>; + friend SchedulingPolicy; +}; -class Scheduler { // NOLINT +template class Scheduler : public BaseScheduler { private: - const bool using_workers_; - LogicalTime logical_time_{}; + using worker_t = Worker; - Environment* environment_; - std::vector workers_{}; + SchedulingPolicy policy_; - std::mutex scheduling_mutex_; - std::unique_lock scheduling_lock_{scheduling_mutex_, std::defer_lock}; - std::condition_variable cv_schedule_; - - std::mutex lock_event_queue_; - std::map event_queue_; + std::vector workers_{}; std::vector> set_ports_; - std::vector> triggered_reactions_; - - std::vector> reaction_queue_; - unsigned int reaction_queue_pos_{std::numeric_limits::max()}; - - ReadyQueue ready_queue_; - std::atomic reactions_to_process_{0}; // NOLINT std::atomic stop_{false}; - bool continue_execution_{true}; - void schedule() noexcept; - auto schedule_ready_reactions() -> bool; - void next(); - void terminate_all_workers(); + auto next() -> bool; void set_port_helper(BasePort* port); public: explicit Scheduler(Environment* env); - ~Scheduler(); - - void schedule_sync(const Tag& tag, BaseAction* action, std::function pre_handler); - void schedule_async(const Tag& tag, BaseAction* action, std::function pre_handler); - - void inline lock() noexcept { scheduling_lock_.lock(); } - void inline unlock() noexcept { scheduling_lock_.unlock(); } - - void set_port(BasePort* port); - - [[nodiscard]] inline auto logical_time() const noexcept -> const auto& { return logical_time_; } + Scheduler(Scheduler&&) = delete; + Scheduler(const Scheduler&) = delete; + ~Scheduler() override = default; + auto operator=(Scheduler&&) -> Scheduler& = delete; + auto operator=(const Scheduler&) -> Scheduler& = delete; void start(); void stop(); - friend Worker; + void set_port(BasePort* port) override; + + friend SchedulingPolicy; }; } // namespace reactor +#include "reactor-cpp/impl/scheduler_impl.hh" + #endif // REACTOR_CPP_SCHEDULER_HH diff --git a/include/reactor-cpp/value_ptr.hh b/include/reactor-cpp/value_ptr.hh index fcb0e20e..edc6f366 100644 --- a/include/reactor-cpp/value_ptr.hh +++ b/include/reactor-cpp/value_ptr.hh @@ -75,7 +75,6 @@ template auto make_mutable_value(Args&&... args) -> Mut } } - namespace detail { /** @@ -510,7 +509,6 @@ public: friend auto reactor::make_immutable_value(Args&&... args) -> reactor::ImmutableValuePtr; }; - // Comparison operators template @@ -529,16 +527,20 @@ template auto operator==(const MutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) noexcept -> bool { return ptr1.get() == ptr2.get(); } -template auto operator==(const MutableValuePtr& ptr1, std::nullptr_t) noexcept -> bool { +template +auto operator==(const MutableValuePtr& ptr1, std::nullptr_t) noexcept -> bool { return ptr1.get() == nullptr; } -template auto operator==(std::nullptr_t, const MutableValuePtr& ptr2) noexcept -> bool { +template +auto operator==(std::nullptr_t, const MutableValuePtr& ptr2) noexcept -> bool { return ptr2.get() == nullptr; } -template auto operator==(const ImmutableValuePtr& ptr1, std::nullptr_t) noexcept -> bool { +template +auto operator==(const ImmutableValuePtr& ptr1, std::nullptr_t) noexcept -> bool { return ptr1.get() == nullptr; } -template auto operator==(std::nullptr_t, const ImmutableValuePtr& ptr1) noexcept -> bool { +template +auto operator==(std::nullptr_t, const ImmutableValuePtr& ptr1) noexcept -> bool { return ptr1.get() == nullptr; } @@ -551,10 +553,12 @@ template auto operator!=(const ImmutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) -> bool { return ptr1.get() != ptr2.get(); } -template auto operator!=(const ImmutableValuePtr& ptr1, const MutableValuePtr& ptr2) -> bool { +template +auto operator!=(const ImmutableValuePtr& ptr1, const MutableValuePtr& ptr2) -> bool { return ptr1.get() != ptr2.get(); } -template auto operator!=(const MutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) -> bool { +template +auto operator!=(const MutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) -> bool { return ptr1.get() != ptr2.get(); } template auto operator!=(const MutableValuePtr& ptr1, std::nullptr_t) -> bool { @@ -570,7 +574,6 @@ template auto operator!=(std::nullptr_t, const ImmutableV return ptr1.get() != nullptr; } - } // namespace detail } // namespace reactor diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index f8b6c9a1..64a444f9 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -1,12 +1,13 @@ set(SOURCE_FILES action.cc assert.cc + base_scheduler.cc + default_scheduling_policy.cc environment.cc logical_time.cc port.cc reaction.cc reactor.cc - scheduler.cc time.cc ) diff --git a/lib/action.cc b/lib/action.cc index 87eb643c..97a48760 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -9,6 +9,7 @@ #include "reactor-cpp/action.hh" #include "reactor-cpp/assert.hh" +#include "reactor-cpp/base_scheduler.hh" #include "reactor-cpp/environment.hh" #include "reactor-cpp/reaction.hh" @@ -37,9 +38,9 @@ void BaseAction::register_scheduler(Reaction* reaction) { void Timer::startup() { Tag tag_zero = Tag::from_physical_time(environment()->start_time()); if (offset_ != Duration::zero()) { - environment()->scheduler()->schedule_sync(tag_zero.delay(offset_), this, nullptr); + environment()->scheduler().schedule_sync(tag_zero.delay(offset_), this, nullptr); } else { - environment()->scheduler()->schedule_sync(tag_zero, this, nullptr); + environment()->scheduler().schedule_sync(tag_zero, this, nullptr); } } @@ -48,13 +49,13 @@ void Timer::cleanup() { if (period_ != Duration::zero()) { Tag now = Tag::from_logical_time(environment()->logical_time()); Tag next = now.delay(period_); - environment()->scheduler()->schedule_sync(next, this, nullptr); + environment()->scheduler().schedule_sync(next, this, nullptr); } } void ShutdownAction::shutdown() { Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); - environment()->scheduler()->schedule_sync(tag, this, nullptr); + environment()->scheduler().schedule_sync(tag, this, nullptr); } } // namespace reactor diff --git a/lib/base_scheduler.cc b/lib/base_scheduler.cc new file mode 100644 index 00000000..b3730a92 --- /dev/null +++ b/lib/base_scheduler.cc @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2022 TU Dresden + * All rights reserved. + * + * Authors: + * Christian Menard + */ + +#include "reactor-cpp/base_scheduler.hh" + +#include "reactor-cpp/action.hh" +#include "reactor-cpp/assert.hh" +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/fwd.hh" +#include "reactor-cpp/logging.hh" +#include "reactor-cpp/port.hh" +#include "reactor-cpp/reaction.hh" +#include "reactor-cpp/trace.hh" + +namespace reactor { + +BaseScheduler::BaseScheduler(Environment* env) + : environment_(env) + , using_workers_(env->num_workers() > 1) {} + +void BaseScheduler::schedule_sync(const Tag& tag, BaseAction* action, std::function pre_handler) { + reactor_assert(logical_time_ < tag); + // TODO verify that the action is indeed allowed to be scheduled by the + // current reaction + log::Debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ") + << " with tag [" << tag.time_point() << ", " << tag.micro_step() << "]"; + { + auto unique_lock = + using_workers_ ? std::unique_lock(lock_event_queue_) : std::unique_lock(); + + tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag); // NOLINT + + // create a new event map or retrieve the existing one + auto emplace_result = event_queue_.try_emplace(tag, EventMap()); + auto& event_map = emplace_result.first->second; + + // insert the new event + event_map[action] = std::move(pre_handler); + } +} + +void BaseScheduler::schedule_async(const Tag& tag, BaseAction* action, std::function pre_handler) { + std::lock_guard lock_guard(scheduling_mutex_); + schedule_sync(tag, action, std::move(pre_handler)); + cv_schedule_.notify_one(); +} +} // namespace reactor diff --git a/lib/default_scheduling_policy.cc b/lib/default_scheduling_policy.cc new file mode 100644 index 00000000..601211bb --- /dev/null +++ b/lib/default_scheduling_policy.cc @@ -0,0 +1,183 @@ +/* + * Copyright (C) 2022 TU Dresden + * All rights reserved. + * + * Authors: + * Christian Menard + */ + +#include "reactor-cpp/default_scheduling_policy.hh" + +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/logging.hh" +#include "reactor-cpp/reaction.hh" +#include "reactor-cpp/scheduler.hh" +#include "reactor-cpp/trace.hh" + +namespace reactor { + +DefaultSchedulingPolicy::DefaultSchedulingPolicy(Scheduler& scheduler, Environment& env) + : scheduler_(scheduler) + , environment_(env) + , ready_queue_(env.num_workers()) {} + +void DefaultSchedulingPolicy::init() { + reaction_queue_.resize(environment_.max_reaction_index() + 1); + triggered_reactions_.resize(environment_.num_workers()); +} + +void DefaultSchedulingPolicy::worker_function(const Worker& worker) { + if (worker.id() == 0) { + log::Debug() << "(Worker 0) do the initial scheduling"; + schedule(); + } + + while (true) { + // wait for a ready reaction + auto* reaction = ready_queue_.pop(); + + // receiving a nullptr indicates that the worker should terminate + if (reaction == nullptr) { + break; + } + + // execute the reaction + worker.execute_reaction(reaction); + + // was this the very last reaction? + if (reactions_to_process_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + // Yes, then schedule. The atomic decrement above ensures that only one + // thread enters this block. + schedule(); + } + // continue otherwise + } +} + +auto DefaultSchedulingPolicy::create_worker() -> Worker { return {*this, identity_counter++}; } + +void DefaultSchedulingPolicy::schedule() noexcept { + bool found_ready_reactions = schedule_ready_reactions(); + + while (!found_ready_reactions) { + log::Debug() << "(DefaultSchedulingPolicy) call next()"; + continue_execution_ = scheduler_.next(); + reaction_queue_pos_ = 0; + + found_ready_reactions = schedule_ready_reactions(); + + if (!continue_execution_ && !found_ready_reactions) { + // let all workers know that they should terminate + terminate_all_workers(); + break; + } + } +} + +auto DefaultSchedulingPolicy::schedule_ready_reactions() -> bool { + // insert any triggered reactions_ into the reaction queue + for (auto& vec_reaction : triggered_reactions_) { + for (auto* reaction : vec_reaction) { + reaction_queue_[reaction->index()].push_back(reaction); + } + vec_reaction.clear(); + } + + log::Debug() << "(DefaultSchedulingPolicy) Scanning the reaction queue for ready reactions"; + + // continue iterating over the reaction queue + for (; reaction_queue_pos_ < reaction_queue_.size(); reaction_queue_pos_++) { + auto& reactions = reaction_queue_[reaction_queue_pos_]; + + // any ready reactions of current priority? + if (!reactions.empty()) { + log::Debug() << "(DefaultSchedulingPolicy) Process reactions of priority " << reaction_queue_pos_; + + // Make sure that any reaction is only executed once even if it + // was triggered multiple times. + std::sort(reactions.begin(), reactions.end()); + reactions.erase(std::unique(reactions.begin(), reactions.end()), reactions.end()); + + if constexpr (log::debug_enabled || tracing_enabled) { // NOLINT + for (auto* reaction : reactions) { + log::Debug() << "(DefaultSchedulingPolicy) Reaction " << reaction->fqn() << " is ready for execution"; + tracepoint(reactor_cpp, trigger_reaction, reaction->container()->fqn(), reaction->name(), logical_time_); + } + } + + reactions_to_process_.store(static_cast(reactions.size()), std::memory_order_release); + ready_queue_.fill_up(reactions); + + // break out of the loop and return + return true; + } + } + + log::Debug() << "(DefaultSchedulingPolicy) Reached end of reaction queue"; + return false; +} + +void DefaultSchedulingPolicy::terminate_all_workers() { + log::Debug() << "(DefaultSchedulingPolicy) Send termination signal to all workers"; + auto num_workers = environment_.num_workers(); + std::vector null_reactions{num_workers, nullptr}; + log::Debug() << null_reactions.size(); + ready_queue_.fill_up(null_reactions); +} + +void DefaultSchedulingPolicy::trigger_reaction_from_next(Reaction* reaction) { + reaction_queue_[reaction->index()].push_back(reaction); +} + +void DefaultSchedulingPolicy::trigger_reaction_from_set_port(Reaction* reaction) { + triggered_reactions_[Worker::current_worker_id()].push_back(reaction); +} + +auto DefaultSchedulingPolicy::ReadyQueue::pop() -> Reaction* { + auto old_size = size_.fetch_sub(1, std::memory_order_acq_rel); + + // If there is no ready reaction available, wait until there is one. + while (old_size <= 0) { + log::Debug() << "(Worker " << Worker::current_worker_id() << ") Wait for work"; + sem_.acquire(); + log::Debug() << "(Worker " << Worker::current_worker_id() << ") Waking up"; + old_size = size_.fetch_sub(1, std::memory_order_acq_rel); + // FIXME: Protect against underflow? + } + + auto pos = old_size - 1; + return queue_[pos]; +} + +void DefaultSchedulingPolicy::ReadyQueue::fill_up(std::vector& ready_reactions) { + // clear the internal queue and swap contents + queue_.clear(); + queue_.swap(ready_reactions); + + // update the atomic size counter and release the semaphore to wake up + // waiting worker threads + auto new_size = static_cast(queue_.size()); + auto old_size = size_.exchange(new_size, std::memory_order_acq_rel); + + // calculate how many workers to wake up. -old_size indicates the number of + // workers who started waiting since the last update. + // We want to wake up at most all the waiting workers. If we would release + // more, other workers that are out of work would not block when acquiring + // the semaphore. + // Also, we do not want to wake up more workers than there is work. new_size + // indicates the number of ready reactions. Since there is always at least + // one worker running running, new_size - running_workers indicates the + // number of additional workers needed to process all reactions. + waiting_workers_ += -old_size; + auto running_workers = num_workers_ - waiting_workers_; + auto workers_to_wakeup = std::min(waiting_workers_, new_size - running_workers); + + // wakeup other workers_ + if (workers_to_wakeup > 0) { + waiting_workers_ -= workers_to_wakeup; + log::Debug() << "Wakeup " << workers_to_wakeup << " workers"; + sem_.release(static_cast(workers_to_wakeup)); + } +} + +} // namespace reactor diff --git a/lib/environment.cc b/lib/environment.cc index 0429e7da..0a109116 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -14,12 +14,27 @@ #include "reactor-cpp/action.hh" #include "reactor-cpp/assert.hh" +#include "reactor-cpp/default_scheduling_policy.hh" #include "reactor-cpp/logging.hh" #include "reactor-cpp/port.hh" #include "reactor-cpp/reaction.hh" +#include "reactor-cpp/scheduler.hh" namespace reactor { +Environment::Environment(unsigned int num_workers, bool run_forever, bool fast_fwd_execution) + : num_workers_(num_workers) + , run_forever_(run_forever) + , fast_fwd_execution_(fast_fwd_execution) + , scheduler_(std::make_unique>(this)) {} + +[[nodiscard]] auto Environment::scheduler() const noexcept -> const BaseScheduler& { return *scheduler_; } +[[nodiscard]] auto Environment::scheduler() noexcept -> BaseScheduler& { return *scheduler_; } + +[[nodiscard]] auto Environment::logical_time() const noexcept -> const LogicalTime& { + return scheduler_->logical_time(); +} + void Environment::register_reactor(Reactor* reactor) { reactor_assert(reactor != nullptr); validate(this->phase() == Phase::Construction, "Reactors may only be registered during construction phase!"); @@ -97,13 +112,13 @@ void Environment::sync_shutdown() { } phase_ = Phase::Deconstruction; - scheduler_.stop(); + scheduler_->stop(); } void Environment::async_shutdown() { - scheduler_.lock(); + scheduler_->lock(); sync_shutdown(); - scheduler_.unlock(); + scheduler_->unlock(); } auto dot_name([[maybe_unused]] ReactorElement* reactor_element) -> std::string { @@ -227,7 +242,7 @@ auto Environment::startup() -> std::thread { // start processing events phase_ = Phase::Execution; - return std::thread([this]() { this->scheduler_.start(); }); + return std::thread([this]() { this->scheduler_->start(); }); } void Environment::dump_trigger_to_yaml(std::ofstream& yaml, const BaseAction& trigger) { diff --git a/lib/port.cc b/lib/port.cc index c626475b..16244757 100644 --- a/lib/port.cc +++ b/lib/port.cc @@ -9,6 +9,7 @@ #include "reactor-cpp/port.hh" #include "reactor-cpp/assert.hh" +#include "reactor-cpp/base_scheduler.hh" #include "reactor-cpp/environment.hh" #include "reactor-cpp/reaction.hh" @@ -28,7 +29,7 @@ void BasePort::base_bind_to(BasePort* port) { } else if (this->is_output() && port->is_input()) { validate(this->container()->container() == port->container()->container(), "An output port can only be bound to an input port if both ports " - "belong to reactors in the same hierarichal level"); + "belong to reactors in the same hierarchical level"); } else if (this->is_output() && port->is_output()) { validate(this->container()->container() == port->container(), "An output port A may only be bound to another output port B if A is " @@ -93,9 +94,9 @@ auto Port::typed_inward_binding() const noexcept -> Port* { void Port::set() { validate(!has_inward_binding(), "set() may only be called on a ports that do not have an inward " "binding!"); - auto* scheduler = environment()->scheduler(); + auto& scheduler = environment()->scheduler(); this->present_ = true; - scheduler->set_port(this); + scheduler.set_port(this); } auto Port::is_present() const noexcept -> bool { diff --git a/lib/reactor.cc b/lib/reactor.cc index b3bf0c09..85b3b9af 100644 --- a/lib/reactor.cc +++ b/lib/reactor.cc @@ -10,6 +10,7 @@ #include "reactor-cpp/action.hh" #include "reactor-cpp/assert.hh" +#include "reactor-cpp/base_scheduler.hh" #include "reactor-cpp/environment.hh" #include "reactor-cpp/logging.hh" #include "reactor-cpp/port.hh" @@ -30,7 +31,7 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ // completely constructed objects. Technically, the casts here return // invalid pointers as the objects they point to do not yet // exists. However, we are good as long as we only store the pointer and do - // not dereference it before construction is completeted. + // not de reference it before construction is completed. // It works, but maybe there is some nicer way of doing this... switch (type) { case Type::Action: @@ -154,15 +155,15 @@ void Reactor::shutdown() { auto Reactor::get_physical_time() noexcept -> TimePoint { return reactor::get_physical_time(); } auto Reactor::get_logical_time() const noexcept -> TimePoint { - return environment()->scheduler()->logical_time().time_point(); + return environment()->scheduler().logical_time().time_point(); } auto Reactor::get_microstep() const noexcept -> mstep_t { - return environment()->scheduler()->logical_time().micro_step(); + return environment()->scheduler().logical_time().micro_step(); } auto Reactor::get_tag() const noexcept -> Tag { - return Tag::from_logical_time(environment()->scheduler()->logical_time()); + return Tag::from_logical_time(environment()->scheduler().logical_time()); } auto Reactor::get_elapsed_logical_time() const noexcept -> Duration { diff --git a/lib/scheduler.cc b/lib/scheduler.cc deleted file mode 100644 index 1e9b541e..00000000 --- a/lib/scheduler.cc +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Copyright (C) 2019 TU Dresden - * All rights reserved. - * - * Authors: - * Christian Menard - */ - -#include - -#include "reactor-cpp/scheduler.hh" - -#include "reactor-cpp/action.hh" -#include "reactor-cpp/assert.hh" -#include "reactor-cpp/environment.hh" -#include "reactor-cpp/logging.hh" -#include "reactor-cpp/port.hh" -#include "reactor-cpp/reaction.hh" -#include "reactor-cpp/trace.hh" - -namespace reactor { - -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -thread_local const Worker* Worker::current_worker = nullptr; - -Worker::Worker(Worker&& work) // NOLINT(performance-noexcept-move-constructor) - : scheduler_{work.scheduler_} - , identity_{work.identity_} { - // Need to provide the move constructor in order to organize workers in a - // std::vector. However, moving is not save if the thread is already running, - // thus we throw an exception here if the worker is moved but the - // internal thread is already running. - - if (work.thread_.joinable()) { - throw std::runtime_error{"Running workers cannot be moved!"}; - } -} - -void Worker::work() const { - // initialize the current worker thread local variable - current_worker = this; - - log::Debug() << "(Worker " << this->identity_ << ") Starting"; - - if (identity_ == 0) { - log::Debug() << "(Worker 0) do the initial scheduling"; - scheduler_.schedule(); - } - - while (true) { - // wait for a ready reaction - auto* reaction = scheduler_.ready_queue_.pop(); - - // receiving a nullptr indicates that the worker should terminate - if (reaction == nullptr) { - break; - } - - // execute the reaction - execute_reaction(reaction); - - // was this the very last reaction? - if (scheduler_.reactions_to_process_.fetch_sub(1, std::memory_order_acq_rel) == 1) { - // Yes, then schedule. The atomic decrement above ensures that only one - // thread enters this block. - scheduler_.schedule(); - } - // continue otherwise - } - - log::Debug() << "(Worker " << identity_ << ") terminates"; -} - -void Worker::execute_reaction(Reaction* reaction) const { - log::Debug() << "(Worker " << identity_ << ") " - << "execute reaction " << reaction->fqn(); - - tracepoint(reactor_cpp, reaction_execution_starts, id, reaction->fqn(), scheduler.logical_time()); - reaction->trigger(); - tracepoint(reactor_cpp, reaction_execution_finishes, id, reaction->fqn(), scheduler.logical_time()); -} - -void Scheduler::schedule() noexcept { - bool found_ready_reactions = schedule_ready_reactions(); - - while (!found_ready_reactions) { - log::Debug() << "(Scheduler) call next()"; - next(); - reaction_queue_pos_ = 0; - - found_ready_reactions = schedule_ready_reactions(); - - if (!continue_execution_ && !found_ready_reactions) { - // let all workers know that they should terminate - terminate_all_workers(); - break; - } - } -} - -auto ReadyQueue::pop() -> Reaction* { - auto old_size = size_.fetch_sub(1, std::memory_order_acq_rel); - - // If there is no ready reaction available, wait until there is one. - while (old_size <= 0) { - log::Debug() << "(Worker " << Worker::current_worker_id() << ") Wait for work"; - sem_.acquire(); - log::Debug() << "(Worker " << Worker::current_worker_id() << ") Waking up"; - old_size = size_.fetch_sub(1, std::memory_order_acq_rel); - // FIXME: Protect against underflow? - } - - auto pos = old_size - 1; - return queue_[pos]; -} - -void ReadyQueue::fill_up(std::vector& ready_reactions) { - // clear the internal queue and swap contents - queue_.clear(); - queue_.swap(ready_reactions); - - // update the atomic size counter and release the semaphore to wake up - // waiting worker threads - auto new_size = static_cast(queue_.size()); - auto old_size = size_.exchange(new_size, std::memory_order_acq_rel); - - // calculate how many workers to wake up. -old_size indicates the number of - // workers who started waiting since the last update. - // We want to wake up at most all the waiting workers. If we would release - // more, other workers that are out of work would not block when acquiring - // the semaphore. - // Also, we do not want to wake up more workers than there is work. new_size - // indicates the number of ready reactions. Since there is always at least - // one worker running running, new_size - running_workers indicates the - // number of additional workers needed to process all reactions. - waiting_workers_ += -old_size; - auto running_workers = num_workers_ - waiting_workers_; - auto workers_to_wakeup = std::min(waiting_workers_, new_size - running_workers); - - // wakeup other workers_ - if (workers_to_wakeup > 0) { - waiting_workers_ -= workers_to_wakeup; - log::Debug() << "Wakeup " << workers_to_wakeup << " workers"; - sem_.release(static_cast(workers_to_wakeup)); - } -} - -void Scheduler::terminate_all_workers() { - log::Debug() << "(Scheduler) Send termination signal to all workers"; - auto num_workers = environment_->num_workers(); - std::vector null_reactions{num_workers, nullptr}; - log::Debug() << null_reactions.size(); - ready_queue_.fill_up(null_reactions); -} - -auto Scheduler::schedule_ready_reactions() -> bool { - // insert any triggered reactions_ into the reaction queue - for (auto& vec_reaction : triggered_reactions_) { - for (auto* reaction : vec_reaction) { - reaction_queue_[reaction->index()].push_back(reaction); - } - vec_reaction.clear(); - } - - log::Debug() << "(Scheduler) Scanning the reaction queue for ready reactions"; - - // continue iterating over the reaction queue - for (; reaction_queue_pos_ < reaction_queue_.size(); reaction_queue_pos_++) { - auto& reactions = reaction_queue_[reaction_queue_pos_]; - - // any ready reactions of current priority? - if (!reactions.empty()) { - log::Debug() << "(Scheduler) Process reactions of priority " << reaction_queue_pos_; - - // Make sure that any reaction is only executed once even if it - // was triggered multiple times. - std::sort(reactions.begin(), reactions.end()); - reactions.erase(std::unique(reactions.begin(), reactions.end()), reactions.end()); - - if constexpr (log::debug_enabled || tracing_enabled) { // NOLINT - for (auto* reaction : reactions) { - log::Debug() << "(Scheduler) Reaction " << reaction->fqn() << " is ready for execution"; - tracepoint(reactor_cpp, trigger_reaction, reaction->container()->fqn(), reaction->name(), logical_time_); - } - } - - reactions_to_process_.store(static_cast(reactions.size()), std::memory_order_release); - ready_queue_.fill_up(reactions); - - // break out of the loop and return - return true; - } - } - - log::Debug() << "(Scheduler) Reached end of reaction queue"; - return false; -} - -void Scheduler::start() { - log::Debug() << "Starting the scheduler..."; - - auto num_workers = environment_->num_workers(); - // initialize the reaction queue, set ports vector, and triggered reactions - // vector - reaction_queue_.resize(environment_->max_reaction_index() + 1); - set_ports_.resize(num_workers); - triggered_reactions_.resize(num_workers); - - // Initialize and start the workers. By resizing the workers vector first, - // we make sure that there is sufficient space for all the workers and non of - // them needs to be moved. This is important because a running worker may not - // be moved. - workers_.reserve(num_workers); - for (unsigned i = 0; i < num_workers; i++) { - workers_.emplace_back(*this, i); - workers_.back().start_thread(); - } - - // join all worker threads - for (auto& worker : workers_) { - worker.join_thread(); - } -} - -void Scheduler::next() { // NOLINT - static EventMap events{}; - - // clean up before scheduling any new events - if (!events.empty()) { - // cleanup all triggered actions - for (auto& vec_ports : events) { - vec_ports.first->cleanup(); - } - // cleanup all set ports - for (auto& vec_ports : set_ports_) { - for (auto& port : vec_ports) { - port->cleanup(); - } - vec_ports.clear(); - } - events.clear(); - } - - { - std::unique_lock lock{scheduling_mutex_}; - - // shutdown if there are no more events in the queue - if (event_queue_.empty() && !stop_) { - if (environment_->run_forever()) { - // wait for a new asynchronous event - cv_schedule_.wait(lock, [this]() { return !event_queue_.empty() || stop_; }); - } else { - log::Debug() << "No more events in queue_. -> Terminate!"; - environment_->sync_shutdown(); - } - } - - while (events.empty()) { - if (stop_) { - continue_execution_ = false; - log::Debug() << "Shutting down the scheduler"; - Tag t_next = Tag::from_logical_time(logical_time_).delay(); - if (t_next == event_queue_.begin()->first) { - log::Debug() << "Schedule the last round of reactions including all " - "termination reactions"; - events = std::move(event_queue_.begin()->second); - event_queue_.erase(event_queue_.begin()); - log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]"; - logical_time_.advance_to(t_next); - } else { - return; - } - } else { - // collect events of the next tag - auto t_next = event_queue_.begin()->first; - - // synchronize with physical time if not in fast forward mode - if (!environment_->fast_fwd_execution()) { - // keep track of the current physical time in a static variable - static auto physical_time = TimePoint::min(); - - // If physical time is smaller than the next logical time point, - // then update the physical time. This step is small optimization to - // avoid calling get_physical_time() in every iteration as this - // would add a significant overhead. - if (physical_time < t_next.time_point()) { - physical_time = get_physical_time(); - } - - // If physical time is still smaller than the next logical time - // point, then wait until the next tag or until a new event is - // inserted asynchronously into the queue - if (physical_time < t_next.time_point()) { - auto status = cv_schedule_.wait_until(lock, t_next.time_point()); - // Start over if the event queue was modified - if (status == std::cv_status::no_timeout) { - continue; - } - // update physical time and continue otherwise - physical_time = t_next.time_point(); - } - } - - // retrieve all events with tag equal to current logical time from the - // queue - events = std::move(event_queue_.begin()->second); - event_queue_.erase(event_queue_.begin()); - - // advance logical time - log::Debug() << "advance logical time to tag [" << t_next.time_point() << ", " << t_next.micro_step() << "]"; - logical_time_.advance_to(t_next); - } - } - } // mutex schedule_ - - // execute all setup functions; this sets the values of the corresponding - // actions - for (auto& vec_reactor : events) { - auto& setup = vec_reactor.second; - if (setup != nullptr) { - setup(); - } - } - - log::Debug() << "events: " << events.size(); - for (auto& vec_reactor : events) { - log::Debug() << "Action " << vec_reactor.first->fqn(); - for (auto* reaction : vec_reactor.first->triggers()) { - // There is no need to acquire the mutex. At this point the scheduler - // should be the only thread accessing the reaction queue as none of the - // workers_ are running - log::Debug() << "insert reaction " << reaction->fqn() << " with index " << reaction->index(); - reaction_queue_[reaction->index()].push_back(reaction); - } - } -} - -Scheduler::Scheduler(Environment* env) - : using_workers_(env->num_workers() > 1) - , environment_(env) - , ready_queue_(env->num_workers()) {} - -Scheduler::~Scheduler() = default; - -void Scheduler::schedule_sync(const Tag& tag, BaseAction* action, std::function pre_handler) { - reactor_assert(logical_time_ < tag); - // TODO verify that the action is indeed allowed to be scheduled by the - // current reaction - log::Debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ") - << " with tag [" << tag.time_point() << ", " << tag.micro_step() << "]"; - { - auto unique_lock = - using_workers_ ? std::unique_lock(lock_event_queue_) : std::unique_lock(); - - tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag); // NOLINT - - // create a new event map or retrieve the existing one - auto emplace_result = event_queue_.try_emplace(tag, EventMap()); - auto& event_map = emplace_result.first->second; - - // insert the new event - event_map[action] = std::move(pre_handler); - } -} - -void Scheduler::schedule_async(const Tag& tag, BaseAction* action, std::function pre_handler) { - std::lock_guard lock_guard(scheduling_mutex_); - schedule_sync(tag, action, std::move(pre_handler)); - cv_schedule_.notify_one(); -} - -void Scheduler::set_port(BasePort* port) { - log::Debug() << "Set port " << port->fqn(); - - // We do not check here if port is already in the list. This means clean() - // could be called multiple times for a single port. However, calling - // clean() multiple time is not harmful and more efficient then checking if - set_ports_[Worker::current_worker_id()].push_back(port); - - // recursively search for triggered reactions - set_port_helper(port); -} - -void Scheduler::set_port_helper(BasePort* port) { - for (auto* reaction : port->triggers()) { - triggered_reactions_[Worker::current_worker_id()].push_back(reaction); - } - for (auto* binding : port->outward_bindings()) { - set_port_helper(binding); - } -} - -void Scheduler::stop() { - stop_ = true; - cv_schedule_.notify_one(); -} - -} // namespace reactor