diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index 405caa4c2..fc949cc07 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -82,7 +82,7 @@ class math_bulk_op : public fg::node, fg::IN input, std::span output) const noexcept { // classic for-loop for (std::size_t i = 0; i < input.size(); i++) { @@ -95,7 +95,7 @@ class math_bulk_op : public fg::node, fg::IN, fg::INname = name_; } fair::graph::work_return_t - work() noexcept { + work(std::size_t requested_work) noexcept { auto &out_port = output_port<0>(this); auto &in_port = input_port<0>(this); @@ -196,9 +196,9 @@ class gen_operation_SIMD : public fg::node, fg::IN, fg::IN, fg::IN(this); @@ -350,9 +350,9 @@ class convert : public fg::node, fg::IN, fg::IN ? n_simd_to_convert : scalars_to_convert; const auto objects_to_read = stdx::is_simd_v ? n_simd_to_convert : scalars_to_convert; - auto return_value = fair::graph::work_return_t::OK; + auto return_value = fair::graph::work_return_status_t::OK; writer.publish( // [&](std::span output) { const auto input = reader.get(); @@ -377,7 +377,7 @@ class convert : public fg::node, fg::IN::max()); } expect(eq(test::n_samples_produced, N_SAMPLES)) << "produced too many/few samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "consumed too many/few samples"; diff --git a/bench/bm_filter.cpp b/bench/bm_filter.cpp index 3f0bbe448..b6ecb2040 100644 --- a/bench/bm_filter.cpp +++ b/bench/bm_filter.cpp @@ -24,7 +24,7 @@ loop_over_work(auto &node) { test::n_samples_produced = 0LU; test::n_samples_consumed = 0LU; while (test::n_samples_consumed < N_SAMPLES) { - node.work(); + std::ignore = node.work(std::numeric_limits::max()); } expect(eq(test::n_samples_produced, N_SAMPLES)) << "produced too many/few samples"; expect(eq(test::n_samples_consumed, N_SAMPLES)) << "consumed too many/few samples"; @@ -92,7 +92,7 @@ inline const boost::ut::suite _constexpr_bm = [] { expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); - fair::graph::scheduler::simple sched{std::move(flow_graph)}; + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; "runtime src->sink overhead"_benchmark.repeat(N_SAMPLES) = [&sched]() { invoke_work(sched); }; } @@ -106,7 +106,7 @@ inline const boost::ut::suite _constexpr_bm = [] { expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source::out).to<"in">(filter))); expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink::in))); - fair::graph::scheduler::simple sched{std::move(flow_graph)}; + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; "runtime src->fir_filter->sink"_benchmark.repeat(N_SAMPLES) = [&sched]() { invoke_work(sched); }; } @@ -120,7 +120,7 @@ inline const boost::ut::suite _constexpr_bm = [] { expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source::out).to<"in">(filter))); expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink::in))); - fair::graph::scheduler::simple sched{std::move(flow_graph)}; + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; "runtime src->iir_filter->sink - direct-form I"_benchmark.repeat(N_SAMPLES) = [&sched]() { invoke_work(sched); }; } @@ -134,7 +134,7 @@ inline const boost::ut::suite _constexpr_bm = [] { expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source::out).to<"in">(filter))); expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink::in))); - fair::graph::scheduler::simple sched{std::move(flow_graph)}; + fair::graph::scheduler::simple sched{ std::move(flow_graph) }; "runtime src->iir_filter->sink - direct-form II"_benchmark.repeat(N_SAMPLES) = [&sched]() { invoke_work(sched); }; } diff --git a/bench/bm_test_helper.hpp b/bench/bm_test_helper.hpp index 705fd6d32..c725228ff 100644 --- a/bench/bm_test_helper.hpp +++ b/bench/bm_test_helper.hpp @@ -18,7 +18,7 @@ inline static std::size_t n_samples_produced = 0_UZ; template class source : public fg::node> { public: - uint64_t _n_samples_max; + uint64_t _n_samples_max; std::size_t _n_tag_offset; fg::OUT out; @@ -34,7 +34,7 @@ class source : public fg::node> { [[nodiscard]] constexpr auto process_one_simd(auto N) const noexcept -> fair::meta::simdize { n_samples_produced += N; - fair::meta::simdize x {}; + fair::meta::simdize x{}; benchmark::force_to_memory(x); return x; } @@ -48,7 +48,7 @@ class source : public fg::node> { } fair::graph::work_return_t - work() { + work(std::size_t requested_work) { const std::size_t n_to_publish = _n_samples_max - n_samples_produced; if (n_to_publish > 0) { auto &port = out; @@ -60,7 +60,7 @@ class source : public fg::node> { if constexpr (use_bulk_operation) { std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size())); if (n_write == 0_UZ) { - return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS; + return { requested_work, 0_UZ, fair::graph::work_return_status_t::INSUFFICIENT_INPUT_ITEMS }; } writer.publish( // @@ -73,14 +73,14 @@ class source : public fg::node> { } else { auto [data, token] = writer.get(1); if (data.size() == 0_UZ) { - return fair::graph::work_return_t::ERROR; + return { requested_work, 0_UZ, fair::graph::work_return_status_t::ERROR }; } data[0] = process_one(); writer.publish(token, 1); } - return fair::graph::work_return_t::OK; + return { requested_work, 1_UZ, fair::graph::work_return_status_t::OK }; } else { - return fair::graph::work_return_t::DONE; + return { requested_work, 0_UZ, fair::graph::work_return_status_t::DONE }; } } }; diff --git a/include/data_sink.hpp b/include/data_sink.hpp index 85e9499f3..703b26033 100644 --- a/include/data_sink.hpp +++ b/include/data_sink.hpp @@ -473,7 +473,7 @@ class data_sink : public node> { } } - [[nodiscard]] work_return_t + [[nodiscard]] work_return_status_t process_bulk(std::span in_data) noexcept { std::optional tagData; if (this->input_tags_present()) { @@ -499,7 +499,7 @@ class data_sink : public node> { } } - return work_return_t::OK; + return work_return_status_t::OK; } private: @@ -561,7 +561,7 @@ class data_sink : public node> { void ensure_history_size(std::size_t new_size) { - const auto old_size = _history ? _history->capacity() : std::size_t{0}; + const auto old_size = _history ? _history->capacity() : std::size_t{ 0 }; if (new_size <= old_size) { return; } diff --git a/include/graph.hpp b/include/graph.hpp index 81ea64233..403fba82c 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -440,7 +440,8 @@ class node_model { = 0; [[nodiscard]] virtual work_return_t - work() = 0; + work(std::size_t requested_work) + = 0; [[nodiscard]] virtual void * raw() = 0; @@ -526,8 +527,8 @@ class node_wrapper : public node_model { } [[nodiscard]] constexpr work_return_t - work() override { - return node_ref().work(); + work(std::size_t requested_work = std::numeric_limits::max()) override { + return node_ref().work(requested_work); } [[nodiscard]] std::string_view @@ -645,9 +646,9 @@ class edge { class graph { private: - std::vector> _connection_definitions; - std::vector> _nodes; - std::vector _edges; + std::vector> _connection_definitions; + std::vector> _nodes; + std::vector _edges; template std::unique_ptr & @@ -777,11 +778,16 @@ class graph { connect(Source &source, Port Source::*member_ptr); public: - graph(graph&) = delete; - graph(graph&&) = default; - graph() = default; - graph &operator=(graph&) = delete; - graph &operator=(graph&&) = default; + graph(graph &) = delete; + graph(graph &&) = default; + graph() = default; + graph & + operator=(graph &) + = delete; + graph & + operator=(graph &&) + = default; + /** * @return a list of all blocks contained in this graph * N.B. some 'blocks' may be (sub-)graphs themselves @@ -869,7 +875,7 @@ class graph { return result; } - const std::vector> & + const std::vector> & connection_definitions() { return _connection_definitions; } diff --git a/include/node.hpp b/include/node.hpp index 6dcccb226..5849632fc 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -42,14 +42,18 @@ simdize_tuple_load_and_apply(auto width, const std::tuple &rngs, auto off }(std::make_index_sequence()); } -enum class work_return_t { +enum class work_return_status_t { ERROR = -100, /// error occurred in the work function INSUFFICIENT_OUTPUT_ITEMS = -3, /// work requires a larger output buffer to produce output INSUFFICIENT_INPUT_ITEMS = -2, /// work requires a larger input buffer to produce output DONE = -1, /// this block has completed its processing and the flowgraph should be done OK = 0, /// work call was successful and return values in i/o structs are valid - CALLBACK_INITIATED = 1, /// rather than blocking in the work function, the block will call back to the - /// parent interface when it is ready to be called again +}; + +struct work_return_t { + std::size_t requested_work = std::numeric_limits::max(); + std::size_t performed_work = 0; + work_return_status_t status = work_return_status_t::OK; }; template @@ -103,7 +107,7 @@ output_ports(Self *self) noexcept { } template -concept NodeType = requires(T t) { +concept NodeType = requires(T t, std::size_t requested_work) { { t.unique_name } -> std::same_as; { unwrap_if_wrapped_t{} } -> std::same_as; { unwrap_if_wrapped_t{} } -> std::same_as; @@ -112,7 +116,7 @@ concept NodeType = requires(T t) { { t.is_blocking() } noexcept -> std::same_as; { t.settings() } -> std::same_as; - { t.work() } -> std::same_as; + { t.work(requested_work) } -> std::same_as; // N.B. TODO discuss these requirements requires !std::is_copy_constructible_v; @@ -192,9 +196,9 @@ concept HasRequiredProcessFunction = HasProcessOneFunction != HasProces * const auto n_readable = std::min(reader.available(), in_port.max_buffer_size()); * const auto n_writable = std::min(writer.available(), out_port.max_buffer_size()); * if (n_readable == 0) { - * return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS; + * return { 0, fair::graph::work_return_status_t::INSUFFICIENT_INPUT_ITEMS }; * } else if (n_writable == 0) { - * return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS; + * return { 0, fair::graph::work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS }; * } * const std::size_t n_to_publish = std::min(n_readable, n_writable); // N.B. here enforcing N_input == N_output * @@ -206,9 +210,9 @@ concept HasRequiredProcessFunction = HasProcessOneFunction != HasProces * }, n_to_publish); * * if (!reader.consume(n_to_publish)) { - * return fair::graph::work_return_t::ERROR; + * return { n_to_publish, fair::graph::work_return_status_t::ERROR }; * } - * return fair::graph::work_return_t::OK; + * return { n_to_publish, fair::graph::work_return_status_t::OK }; * } * @endcode *
  • case 4: Python -> map to cases 1-3 and/or dedicated callback (to-be-implemented) @@ -480,9 +484,14 @@ class node : protected std::tuple { std::for_each(_tags_at_output.begin(), _tags_at_output.end(), [](tag_t &tag) { tag.reset(); }); } -public: +protected: + /** + * @brief + * @return struct { std::size_t produced_work, work_return_t} + */ work_return_t - work() noexcept { + work_internal(std::size_t requested_work) noexcept { + using fair::graph::work_return_status_t; using input_types = traits::node::input_port_types; using output_types = traits::node::output_port_types; @@ -500,17 +509,17 @@ class node : protected std::tuple { meta::tuple_for_each([&max_buffer](auto &&out) { max_buffer = std::min(max_buffer, out.streamWriter().available()); }, output_ports(&self())); const std::make_signed_t available_samples = self().available_samples(self()); if (available_samples < 0 && max_buffer > 0) { - return work_return_t::DONE; + return { requested_work, 0_UZ, work_return_status_t::DONE }; } if (available_samples == 0) { - return work_return_t::OK; + return { requested_work, 0_UZ, work_return_status_t::OK }; } samples_to_process = std::max(0UL, std::min(static_cast(available_samples), max_buffer)); if (not enough_samples_for_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_INPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_INPUT_ITEMS }; } if (samples_to_process == 0) { - return work_return_t::INSUFFICIENT_OUTPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS }; } } else if constexpr (requires(const Derived &d) { { available_samples(d) } -> std::same_as; @@ -518,13 +527,13 @@ class node : protected std::tuple { // the (source) node wants to determine the number of samples to process samples_to_process = available_samples(self()); if (samples_to_process == 0) { - return work_return_t::OK; + return { requested_work, 0_UZ, work_return_status_t::OK }; } if (not enough_samples_for_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_INPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_INPUT_ITEMS }; } if (not space_available_on_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_OUTPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS }; } } else if constexpr (is_sink_node) { // no input or output buffers, derive from internal "buffer sizes" (i.e. what the @@ -537,7 +546,7 @@ class node : protected std::tuple { // derive value from output buffer size samples_to_process = std::apply([&](const auto &...ports) { return std::min({ ports.streamWriter().available()..., ports.max_buffer_size()... }); }, output_ports(&self())); if (not enough_samples_for_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_OUTPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS }; } // space_available_on_output_ports is true by construction of samples_to_process } @@ -545,15 +554,15 @@ class node : protected std::tuple { // Capturing structured bindings does not work in Clang... const auto [at_least_one_input_has_data, available_values_count, n_samples_until_next_tag_] = self().inputs_status(self()); if (available_values_count == 0) { - return at_least_one_input_has_data ? work_return_t::INSUFFICIENT_INPUT_ITEMS : work_return_t::DONE; + return { requested_work, 0_UZ, at_least_one_input_has_data ? work_return_status_t::INSUFFICIENT_INPUT_ITEMS : work_return_status_t::DONE }; } samples_to_process = available_values_count; n_samples_until_next_tag = n_samples_until_next_tag_; if (not enough_samples_for_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_INPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_INPUT_ITEMS }; } if (not space_available_on_output_ports(samples_to_process)) { - return work_return_t::INSUFFICIENT_OUTPUT_ITEMS; + return { requested_work, 0_UZ, work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS }; } } @@ -622,14 +631,17 @@ class node : protected std::tuple { // case sources: HW triggered vs. generating data per invocation (generators via Port::MIN) // case sinks: HW triggered vs. fixed-size consumer (may block/never finish for insufficient input data and fixed Port::MIN>0) + // clamp + samples_to_process = std::min(samples_to_process, requested_work); + if constexpr (HasProcessBulkFunction) { - const work_return_t ret = std::apply([this](auto... args) { return static_cast(this)->process_bulk(args...); }, - std::tuple_cat(input_spans, meta::tuple_transform([](auto &output_range) { return std::span(output_range); }, writers_tuple))); + const work_return_status_t ret = std::apply([this](auto... args) { return static_cast(this)->process_bulk(args...); }, + std::tuple_cat(input_spans, meta::tuple_transform([](auto &output_range) { return std::span(output_range); }, writers_tuple))); write_to_outputs(samples_to_process, writers_tuple); const bool success = consume_readers(self(), samples_to_process); forward_tags(); - return success ? ret : work_return_t::ERROR; + return { requested_work, samples_to_process, success ? ret : work_return_status_t::ERROR }; } else { // if constexpr (HasProcessOneFunction) { // TODO: nail down the required method parameters and return types // handle process_one(...) @@ -674,13 +686,32 @@ class node : protected std::tuple { } #endif forward_tags(); - return success ? work_return_t::OK : work_return_t::ERROR; + return { requested_work, samples_to_process, success ? work_return_status_t::OK : work_return_status_t::ERROR }; } // process_one(...) handling // else { // static_assert(fair::meta::always_false, "neither process_bulk(...) nor process_one(...) implemented"); // } - return work_return_t::ERROR; - } // end: work_return_t work() noexcept { ..} + return { requested_work, 0_UZ, work_return_status_t::ERROR }; + } // end: work_return_t work_internal() noexcept { ..} + +public: + /** + * @brief Process as many samples as available and compatible with the internal boundary requirements or limited by 'requested_work` + * + * @param requested_work: usually the processed number of input samples, but could be any other metric as long as + * requested_work limit as an affine relation with the returned performed_work. + * @return { requested_work, performed_work, status} + */ + work_return_t + work(std::size_t requested_work = std::numeric_limits::max()) noexcept { + constexpr bool is_blocking = node_template_parameters::template contains; + if constexpr (is_blocking) { + return work_internal(requested_work); + //static_assert(fair::meta::always_false, "not yet implemented"); + } else { + return work_internal(requested_work); + } + } }; template @@ -724,9 +755,9 @@ node_description() noexcept { ret += fmt::format("{}{:10} {:<20} - annotated info: {} unit: [{}] documentation: {}{}\n", RawType::visible() ? "" : "_", // type_name, - member_name, // + member_name, // RawType::description(), RawType::unit(), - RawType::documentation(), // + RawType::documentation(), // RawType::visible() ? "" : "_"); } else { const std::string type_name = refl::detail::get_type_name().str(); @@ -921,8 +952,8 @@ class merged_node : public node, meta::con } // end:: process_one work_return_t - work() noexcept { - return base::work(); + work(std::size_t requested_work) noexcept { + return base::work(requested_work); } }; diff --git a/include/scheduler.hpp b/include/scheduler.hpp index d8ca40429..6df2a1737 100644 --- a/include/scheduler.hpp +++ b/include/scheduler.hpp @@ -112,9 +112,7 @@ class scheduler_base { // // }); FMT_FALLTHROUGH; - case PAUSED: - _state = INITIALISED; - break; + case PAUSED: _state = INITIALISED; break; case SHUTTING_DOWN: case INITIALISED: case ERROR: break; @@ -135,7 +133,7 @@ class scheduler_base { uint32_t done = 0; uint32_t progress_count = 0; while (done < n_batches && !_stop_requested) { - bool something_happened = work() == work_return_t::OK; + bool something_happened = work().status == work_return_status_t::OK; uint64_t progress_local, progress_new; if (something_happened) { // something happened in this thread => increase progress and reset done count do { @@ -153,7 +151,7 @@ class scheduler_base { done = static_cast(progress_local & ((1ULL << 32) - 1)); if (progress_count == progress_count_old) { // nothing happened => increase done count progress_new = ((progress_count + 0ULL) << 32) + done + 1; - } else { // something happened in another thread => keep progress and done count and rerun this task without waiting + } else { // something happened in another thread => keep progress and done count and rerun this task without waiting progress_new = ((progress_count + 0ULL) << 32) + done; } } while (!_progress.compare_exchange_strong(progress_local, progress_new)); @@ -200,18 +198,21 @@ class simple : public scheduler_base { template work_return_t work_once(const std::span &nodes) { - bool something_happened = false; + constexpr std::size_t requested_work = std::numeric_limits::max(); + bool something_happened = false; + std::size_t performed_work = 0_UZ; for (auto ¤tNode : nodes) { - auto result = currentNode->work(); - if (result == work_return_t::ERROR) { - return work_return_t::ERROR; - } else if (result == work_return_t::INSUFFICIENT_INPUT_ITEMS || result == work_return_t::DONE) { + auto result = currentNode->work(requested_work); + performed_work += result.performed_work; + if (result.status == work_return_status_t::ERROR) { + return { requested_work, performed_work, work_return_status_t::ERROR }; + } else if (result.status == work_return_status_t::INSUFFICIENT_INPUT_ITEMS || result.status == work_return_status_t::DONE) { // nothing - } else if (result == work_return_t::OK || result == work_return_t::INSUFFICIENT_OUTPUT_ITEMS) { + } else if (result.status == work_return_status_t::OK || result.status == work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS) { something_happened = true; } } - return something_happened ? work_return_t::OK : work_return_t::DONE; + return { requested_work, performed_work, something_happened ? work_return_status_t::OK : work_return_status_t::DONE }; } // todo: could be moved to base class, but would make `start()` virtual or require CRTP @@ -224,23 +225,16 @@ class simple : public scheduler_base { void start() { - switch(this->_state) { - case IDLE: - this->init(); - break; - case STOPPED: - reset(); - break; - case PAUSED: - this->_state = INITIALISED; - break; - case INITIALISED: - case RUNNING: - case REQUESTED_PAUSE: - case REQUESTED_STOP: - case SHUTTING_DOWN: - case ERROR: - break; + switch (this->_state) { + case IDLE: this->init(); break; + case STOPPED: reset(); break; + case PAUSED: this->_state = INITIALISED; break; + case INITIALISED: + case RUNNING: + case REQUESTED_PAUSE: + case REQUESTED_STOP: + case SHUTTING_DOWN: + case ERROR: break; } if (this->_state != INITIALISED) { throw std::runtime_error("simple scheduler work(): graph not initialised"); @@ -249,8 +243,8 @@ class simple : public scheduler_base { this->_state = RUNNING; work_return_t result; auto nodelist = std::span{ this->_graph.blocks() }; - while ((result = work_once(nodelist)) == work_return_t::OK) { - if (result == work_return_t::ERROR) { + while ((result = work_once(nodelist)).status == work_return_status_t::OK) { + if (result.status == work_return_status_t::ERROR) { this->_state = ERROR; return; } @@ -334,18 +328,22 @@ class breadth_first : public scheduler_base { template work_return_t work_once(const std::span &nodes) { - bool something_happened = false; + constexpr std::size_t requested_work = std::numeric_limits::max(); + bool something_happened = false; + std::size_t performed_work = 0_UZ; + for (auto ¤tNode : nodes) { - auto result = currentNode->work(); - if (result == work_return_t::ERROR) { - return work_return_t::ERROR; - } else if (result == work_return_t::INSUFFICIENT_INPUT_ITEMS || result == work_return_t::DONE) { + auto result = currentNode->work(requested_work); + performed_work += result.performed_work; + if (result.status == work_return_status_t::ERROR) { + return { requested_work, performed_work, work_return_status_t::ERROR }; + } else if (result.status == work_return_status_t::INSUFFICIENT_INPUT_ITEMS || result.status == work_return_status_t::DONE) { // nothing - } else if (result == work_return_t::OK || result == work_return_t::INSUFFICIENT_OUTPUT_ITEMS) { + } else if (result.status == work_return_status_t::OK || result.status == work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS) { something_happened = true; } } - return something_happened ? work_return_t::OK : work_return_t::DONE; + return { requested_work, performed_work, something_happened ? work_return_status_t::OK : work_return_status_t::DONE }; } void @@ -356,23 +354,16 @@ class breadth_first : public scheduler_base { void start() { - switch(this->_state) { - case IDLE: - this->init(); - break; - case STOPPED: - reset(); - break; - case PAUSED: - this->_state = INITIALISED; - break; - case INITIALISED: - case RUNNING: - case REQUESTED_PAUSE: - case REQUESTED_STOP: - case SHUTTING_DOWN: - case ERROR: - break; + switch (this->_state) { + case IDLE: this->init(); break; + case STOPPED: reset(); break; + case PAUSED: this->_state = INITIALISED; break; + case INITIALISED: + case RUNNING: + case REQUESTED_PAUSE: + case REQUESTED_STOP: + case SHUTTING_DOWN: + case ERROR: break; } if (this->_state != INITIALISED) { throw std::runtime_error("simple scheduler work(): graph not initialised"); @@ -381,8 +372,8 @@ class breadth_first : public scheduler_base { this->_state = RUNNING; work_return_t result; auto nodelist = std::span{ this->_nodelist }; - while ((result = work_once(nodelist)) == work_return_t::OK) { - if (result == work_return_t::ERROR) { + while ((result = work_once(nodelist)).status == work_return_status_t::OK) { + if (result.status == work_return_status_t::ERROR) { this->_state = ERROR; return; } diff --git a/test/app_plugins_test.cpp b/test/app_plugins_test.cpp index 555fd1311..936acd964 100644 --- a/test/app_plugins_test.cpp +++ b/test/app_plugins_test.cpp @@ -98,11 +98,11 @@ main(int argc, char *argv[]) { assert(connection_4 == fg::connection_result_t::SUCCESS); for (std::size_t i = 0; i < repeats; ++i) { - std::ignore = node_source.work(); - std::ignore = node_multiply_1.work(); - std::ignore = node_multiply_2.work(); - std::ignore = node_counter.work(); - std::ignore = node_sink.work(); + std::ignore = node_source.work(std::numeric_limits::max()); + std::ignore = node_multiply_1.work(std::numeric_limits::max()); + std::ignore = node_multiply_2.work(std::numeric_limits::max()); + std::ignore = node_counter.work(std::numeric_limits::max()); + std::ignore = node_sink.work(std::numeric_limits::max()); } fmt::print("repeats {} event_count {}\n", repeats, builtin_counter::s_event_count); diff --git a/test/blocklib/core/unit-test/common_nodes.hpp b/test/blocklib/core/unit-test/common_nodes.hpp index ed5046092..341219fb8 100644 --- a/test/blocklib/core/unit-test/common_nodes.hpp +++ b/test/blocklib/core/unit-test/common_nodes.hpp @@ -61,8 +61,12 @@ ENABLE_REFLECTION_FOR_TEMPLATE(builtin_counter, in, out); // - use node::set_name instead of returning an empty name template class multi_adder : public fair::graph::node_model { + static std::atomic_size_t _unique_id_counter; + public: - int input_port_count; + int input_port_count; + const std::size_t unique_id = _unique_id_counter++; + const std::string unique_name_ = fmt::format("multi_adder#{}", unique_id); // TODO: resolve symbol duplication protected: using in_port_t = fair::graph::IN; @@ -71,11 +75,6 @@ class multi_adder : public fair::graph::node_model { std::list _input_ports; fair::graph::OUT _output_port; -private: - static std::atomic_size_t _unique_id_counter; - const std::size_t _unique_id = _unique_id_counter++; - const std::string _unique_name = fmt::format("multi_adder#{}", _unique_id); - protected: using setting_map = std::map>; std::string _name = "multi_adder"; @@ -118,7 +117,7 @@ class multi_adder : public fair::graph::node_model { [[nodiscard]] std::string_view name() const override { - return _unique_name; + return unique_name_; } std::string_view @@ -128,7 +127,7 @@ class multi_adder : public fair::graph::node_model { // TODO: integrate with node::work fair::graph::work_return_t - work() override { + work(std::size_t requested_work) override { // TODO: Rewrite with ranges once we can use them std::size_t available_samples = -1_UZ; for (const auto &input_port : _input_ports) { @@ -139,7 +138,7 @@ class multi_adder : public fair::graph::node_model { } if (available_samples == 0) { - return fair::graph::work_return_t::OK; + return { requested_work, 0_UZ, fair::graph::work_return_status_t::OK }; } std::vector> readers; @@ -160,7 +159,7 @@ class multi_adder : public fair::graph::node_model { for (auto &input_port [[maybe_unused]] : _input_ports) { assert(available_samples == input_port.streamReader().consume(available_samples)); } - return fair::graph::work_return_t::OK; + return { requested_work, available_samples, fair::graph::work_return_status_t::OK }; } void * @@ -188,15 +187,17 @@ class multi_adder : public fair::graph::node_model { [[nodiscard]] std::string_view unique_name() const override { - return _unique_name; + return unique_name_; } }; +// static_assert(fair::graph::NodeType>); + ENABLE_REFLECTION_FOR_TEMPLATE(multi_adder, input_port_count); template void -register_builtin_nodes(Registry *registry) { + register_builtin_nodes(Registry *registry) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-variable" GP_REGISTER_NODE(registry, builtin_multiply, double, float); diff --git a/test/blocklib/core/unit-test/tag_monitors.hpp b/test/blocklib/core/unit-test/tag_monitors.hpp index 419d49225..63feef002 100644 --- a/test/blocklib/core/unit-test/tag_monitors.hpp +++ b/test/blocklib/core/unit-test/tag_monitors.hpp @@ -76,7 +76,7 @@ struct TagSource : public node> { return static_cast(0); } - work_return_t + work_return_status_t process_bulk(std::span output) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -90,7 +90,7 @@ struct TagSource : public node> { } n_samples_produced += output.size(); - return n_samples_produced < n_samples_max ? work_return_t::OK : work_return_t::DONE; + return n_samples_produced < n_samples_max ? work_return_status_t::OK : work_return_status_t::DONE; } }; @@ -118,7 +118,7 @@ struct TagMonitor : public node> { return input; } - constexpr work_return_t + constexpr work_return_status_t process_bulk(std::span input, std::span output) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -132,7 +132,7 @@ struct TagMonitor : public node> { n_samples_produced += input.size(); std::memcpy(output.data(), input.data(), input.size() * sizeof(T)); - return work_return_t::OK; + return work_return_status_t::OK; } }; @@ -164,7 +164,7 @@ struct TagSink : public node> { } // template V> - constexpr work_return_t + constexpr work_return_status_t process_bulk(std::span input) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -177,7 +177,7 @@ struct TagSink : public node> { n_samples_produced += input.size(); - return work_return_t::OK; + return work_return_status_t::OK; } }; diff --git a/test/plugins/good_base_plugin.cpp b/test/plugins/good_base_plugin.cpp index 61cbea4e4..f0931776a 100644 --- a/test/plugins/good_base_plugin.cpp +++ b/test/plugins/good_base_plugin.cpp @@ -54,10 +54,10 @@ class fixed_source : public fg::node, fg::OUT T value = 1; fg::work_return_t - work() { + work(std::size_t requested_work) { if (event_count == 0) { std::cerr << "fixed_source done\n"; - return fg::work_return_t::DONE; + return { requested_work, 0_UZ, fg::work_return_status_t::DONE }; } auto &port = fair::graph::output_port<0>(this); @@ -68,11 +68,11 @@ class fixed_source : public fg::node, fg::OUT value += 1; if (event_count == -1_UZ) { - return fg::work_return_t::OK; + return { requested_work, 1_UZ, fg::work_return_status_t::OK }; } event_count--; - return fg::work_return_t::OK; + return { requested_work, 1_UZ, fg::work_return_status_t::OK }; } }; } // namespace good diff --git a/test/qa_dynamic_node.cpp b/test/qa_dynamic_node.cpp index af4e07dce..3a96bcbe3 100644 --- a/test/qa_dynamic_node.cpp +++ b/test/qa_dynamic_node.cpp @@ -14,19 +14,21 @@ struct fixed_source : public fg::node, fg::OUT(this); + auto &port = fg::output_port<0>(this); auto &writer = port.streamWriter(); - auto data = writer.reserve_output_range(1_UZ); - data[0] = value; + auto data = writer.reserve_output_range(1_UZ); + data[0] = value; data.publish(1_UZ); value += 1; - return fg::work_return_t::OK; + return { requested_work, 1_UZ, fg::work_return_status_t::OK }; } }; +static_assert(fair::graph::NodeType>); + template struct cout_sink : public fg::node, fg::IN> { std::size_t remaining = 0; @@ -40,20 +42,22 @@ struct cout_sink : public fg::node, fg::IN> { } }; +static_assert(fair::graph::NodeType>); + ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), remaining); int main() { constexpr const std::size_t sources_count = 10; - constexpr const std::size_t events_count = 5; + constexpr const std::size_t events_count = 5; - fg::graph flow_graph; + fg::graph flow_graph; // Adder has sources_count inputs in total, but let's create // sources_count / 2 inputs on construction, and change the number // via settings auto &adder = flow_graph.add_node(std::make_unique>(sources_count / 2)); - auto &sink = flow_graph.make_node>({{"remaining", events_count}}); + auto &sink = flow_graph.make_node>({ { "remaining", events_count } }); // Function that adds a new source node to the graph, and connects // it to one of adder's ports @@ -71,9 +75,9 @@ main() { for (std::size_t i = 0; i < events_count; ++i) { for (auto *source : sources) { - source->work(); + source->work(std::numeric_limits::max()); } - std::ignore = adder.work(); - std::ignore = sink.work(); + std::ignore = adder.work(std::numeric_limits::max()); + std::ignore = sink.work(std::numeric_limits::max()); } } diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 9b957a8dd..1e7add25c 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -68,6 +68,7 @@ class cout_sink : public fg::node> { } }; +static_assert(fair::graph::NodeType>); ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), sink); template @@ -77,7 +78,7 @@ class repeater_source : public fg::node> { std::size_t _counter = 0; fair::graph::work_return_t - work() { + work(std::size_t requested_work) { if (_counter < count) { _counter++; auto &writer = output_port<"value">(this).streamWriter(); @@ -85,13 +86,14 @@ class repeater_source : public fg::node> { data[0] = val; data.publish(1); - return fair::graph::work_return_t::OK; + return { requested_work, 1_UZ, fair::graph::work_return_status_t::OK }; } else { - return fair::graph::work_return_t::DONE; + return { requested_work, 0_UZ, fair::graph::work_return_status_t::DONE }; } } }; +static_assert(fair::graph::NodeType>); ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, T val, std::size_t count), (repeater_source), value); const boost::ut::suite PortApiTests = [] { @@ -113,7 +115,7 @@ const boost::ut::suite PortApiTests = [] { "PortBufferApi"_test = [] { OUT::max(), "out0"> output_port; - BufferWriter auto &writer = output_port.streamWriter(); + BufferWriter auto &writer = output_port.streamWriter(); // BufferWriter auto &tagWriter = output_port.tagWriter(); expect(ge(writer.available(), 32_UZ)); diff --git a/test/qa_hier_node.cpp b/test/qa_hier_node.cpp index 97fe8af22..a5e3d495b 100644 --- a/test/qa_hier_node.cpp +++ b/test/qa_hier_node.cpp @@ -83,9 +83,9 @@ class hier_node : public fg::node_model { } fg::work_return_t - work() override { + work(std::size_t requested_work) override { _scheduler.run_and_wait(); - return fair::graph::work_return_t::DONE; + return { requested_work, requested_work, fair::graph::work_return_status_t::DONE }; } void * @@ -127,7 +127,7 @@ struct fixed_source : public fg::node, fg::OUT(this); @@ -142,11 +142,10 @@ struct fixed_source : public fg::node, fg::OUT("custom pool", fair::thread_pool::CPU_BOUND, 2,2); // use custom pool to limit number of threads for emscripten + auto thread_pool = std::make_shared("custom pool", fair::thread_pool::CPU_BOUND, 2, 2); // use custom pool to limit number of threads for emscripten fg::scheduler::simple scheduler(make_graph(10), thread_pool); diff --git a/test/qa_plugins_test.cpp b/test/qa_plugins_test.cpp index 6742037e4..6521f6773 100644 --- a/test/qa_plugins_test.cpp +++ b/test/qa_plugins_test.cpp @@ -140,9 +140,9 @@ const boost::ut::suite BasicPluginNodesConnectionTests = [] { expect(connection_2 == fg::connection_result_t::SUCCESS); for (std::size_t i = 0; i < repeats; ++i) { - node_source->work(); - node_multiply->work(); - node_sink->work(); + std::ignore = node_source->work(std::numeric_limits::max()); + std::ignore = node_multiply->work(std::numeric_limits::max()); + std::ignore = node_sink->work(std::numeric_limits::max()); } }; @@ -155,7 +155,7 @@ const boost::ut::suite BasicPluginNodesConnectionTests = [] { // Instantiate a built-in node in a static way fair::graph::property_map node_multiply_1_params; node_multiply_1_params["factor"] = 2.0; - auto &node_multiply_1 = flow_graph.make_node>(node_multiply_1_params); + auto &node_multiply_1 = flow_graph.make_node>(node_multiply_1_params); // Instantiate a built-in node via the plugin loader auto &node_multiply_2 = context().loader.instantiate_in_graph(flow_graph, names::builtin_multiply, "double"); @@ -176,10 +176,10 @@ const boost::ut::suite BasicPluginNodesConnectionTests = [] { expect(connection_3 == fg::connection_result_t::SUCCESS); for (std::size_t i = 0; i < repeats; ++i) { - node_source.work(); - node_multiply_1.work(); - node_multiply_2.work(); - node_sink.work(); + std::ignore = node_source.work(std::numeric_limits::max()); + std::ignore = node_multiply_1.work(std::numeric_limits::max()); + std::ignore = node_multiply_2.work(std::numeric_limits::max()); + std::ignore = node_sink.work(std::numeric_limits::max()); } }; }; diff --git a/test/qa_scheduler.cpp b/test/qa_scheduler.cpp index 111db4f5c..65a64a285 100644 --- a/test/qa_scheduler.cpp +++ b/test/qa_scheduler.cpp @@ -54,6 +54,8 @@ class count_source : public fg::node, fg::OUT>); + template class expect_sink : public fg::node, fg::IN::max(), "in">> { tracer &_tracer; @@ -65,14 +67,14 @@ class expect_sink : public fg::node, fg::IN input) noexcept { _tracer.trace(this->name); for (auto data : input) { _checker(_count, data); _count++; } - return fg::work_return_t::OK; + return fg::work_return_status_t::OK; } constexpr void