From 87ff686497aa6643290a518c070dab5ced31fe00 Mon Sep 17 00:00:00 2001 From: rstein Date: Wed, 11 Oct 2023 14:25:23 +0200 Subject: [PATCH] renamed WorkReturnStatus -> work::Status & WorkReturn -> work::Result ... hope this makes it a bit easier in terms of naming. --- .../gnuradio-4.0/basic/clock_source.hpp | 10 +- .../gnuradio-4.0/basic/common_blocks.hpp | 6 +- .../include/gnuradio-4.0/basic/data_sink.hpp | 4 +- .../include/gnuradio-4.0/basic/selector.hpp | 6 +- blocks/basic/test/qa_selector.cpp | 6 +- .../include/gnuradio-4.0/fourier/fft.hpp | 4 +- blocks/fourier/test/qa_fourier.cpp | 8 +- .../gnuradio-4.0/testing/bm_test_helper.hpp | 10 +- .../gnuradio-4.0/testing/tag_monitors.hpp | 12 +- core/benchmarks/bm_fft.cpp | 4 +- core/benchmarks/bm_node_api.cpp | 34 ++-- core/include/gnuradio-4.0/Block.hpp | 151 +++++++++--------- core/include/gnuradio-4.0/BlockTraits.hpp | 8 +- core/include/gnuradio-4.0/Graph.hpp | 4 +- core/include/gnuradio-4.0/Scheduler.hpp | 46 +++--- core/test/plugins/good_base_plugin.cpp | 8 +- core/test/qa_Block.cpp | 4 +- core/test/qa_DynamicBlock.cpp | 4 +- core/test/qa_DynamicPort.cpp | 6 +- core/test/qa_HierBlock.cpp | 10 +- core/test/qa_scheduler.cpp | 4 +- core/test/qa_settings.cpp | 4 +- 22 files changed, 176 insertions(+), 177 deletions(-) diff --git a/blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp b/blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp index 1e191e16e..3b0bd3085 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp @@ -60,7 +60,7 @@ ClockSource Documentation -- add here while (this->ioThreadShallRun.load()) { std::this_thread::sleep_until(nextTimePoint); // invoke and execute work function from user-provided thread - if (this->invokeWork() == WorkReturnStatus::DONE) { + if (this->invokeWork() == work::Status::DONE) { break; } else { retry = 2; @@ -89,11 +89,11 @@ ClockSource Documentation -- add here nextTimePoint = ClockSourceType::now(); } - WorkReturnStatus + work::Status processBulk(PublishableSpan auto &output) noexcept { if (n_samples_max > 0 && n_samples_produced >= n_samples_max) { output.publish(0_UZ); - return WorkReturnStatus::DONE; + return work::Status::DONE; } if constexpr (useIoThread) { // using scheduler-graph provided user thread @@ -103,7 +103,7 @@ ClockSource Documentation -- add here const auto writableSamples = static_cast(output.size()); if (writableSamples < chunk_size) { output.publish(0_UZ); - return WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS; + return work::Status::INSUFFICIENT_OUTPUT_ITEMS; } const std::uint32_t remaining_samples = n_samples_max - n_samples_produced; @@ -139,7 +139,7 @@ ClockSource Documentation -- add here nextTimePoint += std::chrono::microseconds(static_cast(static_cast(updatePeriod.count()) * ratio)); } - return WorkReturnStatus::OK; + return work::Status::OK; } }; diff --git a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp index a83ee1a68..f0fb58001 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp @@ -141,7 +141,7 @@ class multi_adder : public gr::BlockModel { } // TODO: integrate with Block::work - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) override { // TODO: Rewrite with ranges once we can use them std::size_t available_samples = -1_UZ; @@ -153,7 +153,7 @@ class multi_adder : public gr::BlockModel { } if (available_samples == 0) { - return { requested_work, 0_UZ, gr::WorkReturnStatus::OK }; + return { requested_work, 0_UZ, gr::work::Status::OK }; } std::vector> readers; @@ -174,7 +174,7 @@ class multi_adder : public gr::BlockModel { for (auto &input_port [[maybe_unused]] : _input_ports) { assert(available_samples == input_port.streamReader().consume(available_samples)); } - return { requested_work, available_samples, gr::WorkReturnStatus::OK }; + return { requested_work, available_samples, gr::work::Status::OK }; } void * diff --git a/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp b/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp index 405fc8eb4..c3081eb23 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp @@ -474,7 +474,7 @@ class data_sink : public Block> { } } - [[nodiscard]] WorkReturnStatus + [[nodiscard]] work::Status processBulk(std::span in_data) noexcept { std::optional tagData; if (this->input_tags_present()) { @@ -500,7 +500,7 @@ class data_sink : public Block> { } } - return WorkReturnStatus::OK; + return work::Status::OK; } private: diff --git a/blocks/basic/include/gnuradio-4.0/basic/selector.hpp b/blocks/basic/include/gnuradio-4.0/basic/selector.hpp index a4070bbd8..cca9653e2 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/selector.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/selector.hpp @@ -108,7 +108,7 @@ struct Selector : Block, SelectorDoc> { using input_reader_t = typename PortIn::ReaderType; using output_writer_t = typename PortOut::WriterType; - gr::WorkReturnStatus + gr::work::Status processBulk(select_reader_t *select, // const std::vector &ins, monitor_writer_t *monOut, // @@ -120,7 +120,7 @@ struct Selector : Block, SelectorDoc> { // make the implicit consume all available behaviour explicit std::for_each(ins.begin(), ins.end(), [](auto *input) { std::ignore = input->consume(input->available()); }); } - return WorkReturnStatus::OK; + return work::Status::OK; } std::set used_inputs; @@ -199,7 +199,7 @@ struct Selector : Block, SelectorDoc> { } } - return WorkReturnStatus::OK; + return work::Status::OK; } }; } // namespace gr::basic diff --git a/blocks/basic/test/qa_selector.cpp b/blocks/basic/test/qa_selector.cpp index 58c13f7a5..848484397 100644 --- a/blocks/basic/test/qa_selector.cpp +++ b/blocks/basic/test/qa_selector.cpp @@ -28,7 +28,7 @@ struct repeated_source : public gr::Block> { } } - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) { if (values_next == values.cend()) { values_next = values.cbegin(); @@ -50,10 +50,10 @@ struct repeated_source : public gr::Block> { values_next++; - return { requested_work, 1UL, gr::WorkReturnStatus::OK }; + return { requested_work, 1UL, gr::work::Status::OK }; } else { // TODO: Investigate what schedulers do when there is an event written, but we return DONE - return { requested_work, 1UL, gr::WorkReturnStatus::DONE }; + return { requested_work, 1UL, gr::work::Status::DONE }; } } }; diff --git a/blocks/fourier/include/gnuradio-4.0/fourier/fft.hpp b/blocks/fourier/include/gnuradio-4.0/fourier/fft.hpp index 1e7bc1081..3b015b99e 100644 --- a/blocks/fourier/include/gnuradio-4.0/fourier/fft.hpp +++ b/blocks/fourier/include/gnuradio-4.0/fourier/fft.hpp @@ -130,7 +130,7 @@ On the choice of window (mathematically aka. apodisation) functions _phaseSpectrum.resize(computeHalfSpectrum ? newSize : (newSize / 2), 0); } - [[nodiscard]] constexpr WorkReturnStatus + [[nodiscard]] constexpr work::Status processBulk(std::span input, std::span output) { if constexpr (std::is_same_v) { std::copy_n(input.begin(), fftSize, _inData.begin()); @@ -158,7 +158,7 @@ On the choice of window (mathematically aka. apodisation) functions static_assert(!std::is_same_v> && "FFT output type not (yet) implemented"); } - return WorkReturnStatus::OK; + return work::Status::OK; } constexpr U diff --git a/blocks/fourier/test/qa_fourier.cpp b/blocks/fourier/test/qa_fourier.cpp index b60fa2563..55650c0c8 100644 --- a/blocks/fourier/test/qa_fourier.cpp +++ b/blocks/fourier/test/qa_fourier.cpp @@ -170,7 +170,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { std::ignore = fftBlock.settings().apply_staged_parameters(); const auto signal{ generateSinSample(t.N, t.sample_rate, t.frequency, t.amplitude) }; std::vector resultingDataSets(1); - expect(gr::WorkReturnStatus::OK == fftBlock.processBulk(signal, resultingDataSets)); + expect(gr::work::Status::OK == fftBlock.processBulk(signal, resultingDataSets)); const auto peakIndex{ static_cast(std::distance(fftBlock._magnitudeSpectrum.begin(), @@ -224,7 +224,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { expectedPeakAmplitude = 1.; } std::vector resultingDataSets(1); - expect(gr::WorkReturnStatus::OK == fftBlock.processBulk(signal, resultingDataSets)); + expect(gr::work::Status::OK == fftBlock.processBulk(signal, resultingDataSets)); const auto peakIndex{ static_cast(std::distance(fftBlock._magnitudeSpectrum.begin(), std::ranges::max_element(fftBlock._magnitudeSpectrum))) }; const auto peakAmplitude{ fftBlock._magnitudeSpectrum[peakIndex] }; @@ -252,7 +252,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { std::vector v{ OutType() }; std::span outSpan(v); - expect(gr::WorkReturnStatus::OK == fftBlock.processBulk(signal, outSpan)); + expect(gr::work::Status::OK == fftBlock.processBulk(signal, outSpan)); equalDataset(fftBlock, v[0], sample_rate); } | typesWithAlgoToTest; @@ -338,7 +338,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { std::iota(signal.begin(), signal.end(), 1.); } std::vector resultingDataSets(1); - expect(gr::WorkReturnStatus::OK == fftBlock.processBulk(signal, resultingDataSets)); + expect(gr::work::Status::OK == fftBlock.processBulk(signal, resultingDataSets)); expect(eq(fftBlock.fftSize, N)) << fmt::format("<{}> equal fft size", type_name()); expect(eq(fftBlock._window.size(), N)) << fmt::format("<{}> equal window vector size", type_name()); diff --git a/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp b/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp index 1987d21d5..04fc359ac 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/bm_test_helper.hpp @@ -49,7 +49,7 @@ class source : public gr::Block> { return x; } - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) { const std::size_t n_to_publish = _n_samples_max - n_samples_produced; if (n_to_publish > 0) { @@ -62,7 +62,7 @@ class source : public gr::Block> { if constexpr (use_bulk_operation) { std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size())); if (n_write == 0_UZ) { - return { requested_work, 0_UZ, gr::WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + return { requested_work, 0_UZ, gr::work::Status::INSUFFICIENT_INPUT_ITEMS }; } writer.publish( // @@ -75,14 +75,14 @@ class source : public gr::Block> { } else { auto [data, token] = writer.get(1); if (data.size() == 0_UZ) { - return { requested_work, 0_UZ, gr::WorkReturnStatus::ERROR }; + return { requested_work, 0_UZ, gr::work::Status::ERROR }; } data[0] = processOne(); writer.publish(token, 1); } - return { requested_work, 1_UZ, gr::WorkReturnStatus::OK }; + return { requested_work, 1_UZ, gr::work::Status::OK }; } else { - return { requested_work, 0_UZ, gr::WorkReturnStatus::DONE }; + return { requested_work, 0_UZ, gr::work::Status::DONE }; } } }; diff --git a/blocks/testing/include/gnuradio-4.0/testing/tag_monitors.hpp b/blocks/testing/include/gnuradio-4.0/testing/tag_monitors.hpp index 434868142..ec01c6b40 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/tag_monitors.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/tag_monitors.hpp @@ -119,7 +119,7 @@ struct TagSource : public Block> { return static_cast(0); } - WorkReturnStatus + work::Status processBulk(std::span output) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -133,7 +133,7 @@ struct TagSource : public Block> { } n_samples_produced += static_cast(output.size()); - return n_samples_produced < n_samples_max ? WorkReturnStatus::OK : WorkReturnStatus::DONE; + return n_samples_produced < n_samples_max ? work::Status::OK : work::Status::DONE; } }; @@ -158,7 +158,7 @@ struct TagMonitor : public Block> { return input; } - constexpr WorkReturnStatus + constexpr work::Status processBulk(std::span input, std::span output) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -172,7 +172,7 @@ struct TagMonitor : public Block> { n_samples_produced += static_cast(input.size()); std::memcpy(output.data(), input.data(), input.size() * sizeof(T)); - return WorkReturnStatus::OK; + return work::Status::OK; } }; @@ -204,7 +204,7 @@ struct TagSink : public Block> { } // template V> - constexpr WorkReturnStatus + constexpr work::Status processBulk(std::span input) noexcept requires(UseProcessOne == ProcessFunction::USE_PROCESS_BULK) { @@ -220,7 +220,7 @@ struct TagSink : public Block> { n_samples_produced += static_cast(input.size()); timeLastSample = ClockSourceType::now(); - return WorkReturnStatus::OK; + return work::Status::OK; } float diff --git a/core/benchmarks/bm_fft.cpp b/core/benchmarks/bm_fft.cpp index 51964a1a9..6d65b351b 100644 --- a/core/benchmarks/bm_fft.cpp +++ b/core/benchmarks/bm_fft.cpp @@ -83,7 +83,7 @@ testFFT() { std::vector> resultingDataSets(1); ::benchmark::benchmark(fmt::format("{} - fftw", type_name())) = [&fft1, &signal, &resultingDataSets] { - expect(gr::WorkReturnStatus::OK == fft1.processBulk(signal, resultingDataSets)); + expect(gr::work::Status::OK == fft1.processBulk(signal, resultingDataSets)); }; } { @@ -92,7 +92,7 @@ testFFT() { std::vector> resultingDataSets(1); ::benchmark::benchmark(fmt::format("{} - fft", type_name())) = [&fft1, &signal, &resultingDataSets] { - expect(gr::WorkReturnStatus::OK == fft1.processBulk(signal, resultingDataSets)); + expect(gr::work::Status::OK == fft1.processBulk(signal, resultingDataSets)); }; } diff --git a/core/benchmarks/bm_node_api.cpp b/core/benchmarks/bm_node_api.cpp index d68f9216a..f1a72b34d 100644 --- a/core/benchmarks/bm_node_api.cpp +++ b/core/benchmarks/bm_node_api.cpp @@ -80,7 +80,7 @@ class math_bulk_op : public gr::Block, gr::PortInNamed input, std::span output) const noexcept { // classic for-loop for (std::size_t i = 0; i < input.size(); i++) { @@ -93,7 +93,7 @@ class math_bulk_op : public gr::Block, gr::PortInNamed, gr::PortI explicit gen_operation_SIMD(T value, std::string name_ = gr::this_source_location()) : _value(value) { this->name = name_; } - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) noexcept { auto &out_port = outputPort<0>(this); auto &in_port = inputPort<0>(this); @@ -199,9 +199,9 @@ class gen_operation_SIMD : public gr::Block, gr::PortI const auto n_readable = std::min(reader.available(), in_port.max_buffer_size()); const auto n_writable = std::min(writer.available(), out_port.max_buffer_size()); if (n_readable == 0) { - return { requested_work, 0UL, gr::WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + return { requested_work, 0UL, gr::work::Status::INSUFFICIENT_INPUT_ITEMS }; } else if (n_writable == 0) { - return { requested_work, 0UL, gr::WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + return { requested_work, 0UL, gr::work::Status::INSUFFICIENT_OUTPUT_ITEMS }; } const std::size_t n_to_publish = std::min(n_readable, n_writable); @@ -250,9 +250,9 @@ class gen_operation_SIMD : public gr::Block, gr::PortI n_to_publish); if (!reader.consume(n_to_publish)) { - return { requested_work, n_to_publish, gr::WorkReturnStatus::ERROR }; + return { requested_work, n_to_publish, gr::work::Status::ERROR }; } - return { requested_work, n_to_publish, gr::WorkReturnStatus::OK }; + return { requested_work, n_to_publish, gr::work::Status::OK }; } }; @@ -280,7 +280,7 @@ class copy : public gr::Block, gr::PortInName constexpr static std::size_t simd_size = std::max(from_simd_size, to_simd_size); public: - gr::WorkReturnStatus + gr::work::Status work() noexcept { using namespace stdx; auto &out_port = outputPort<"out">(this); @@ -353,9 +353,9 @@ class convert : public gr::Block, gr::PortInName const auto n_readable = std::min(reader.available(), in_port.max_buffer_size()); const auto n_writable = std::min(writer.available(), out_port.max_buffer_size()); if (n_readable < to_simd_size) { - return gr::WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS; + return gr::work::Status::INSUFFICIENT_INPUT_ITEMS; } else if (n_writable < from_simd_size) { - return gr::WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS; + return gr::work::Status::INSUFFICIENT_OUTPUT_ITEMS; } const auto n_readable_scalars = n_readable * from_simd_size; const auto n_writable_scalars = n_writable * to_simd_size; @@ -364,7 +364,7 @@ class convert : public gr::Block, gr::PortInName const auto objects_to_write = stdx::is_simd_v ? n_simd_to_convert : scalars_to_convert; const auto objects_to_read = stdx::is_simd_v ? n_simd_to_convert : scalars_to_convert; - auto return_value = gr::WorkReturnStatus::OK; + auto return_value = gr::work::Status::OK; writer.publish( // [&](std::span output) { const auto input = reader.get(); @@ -380,7 +380,7 @@ class convert : public gr::Block, gr::PortInName } } if (!reader.consume(objects_to_read)) { - return_value = gr::WorkReturnStatus::ERROR; + return_value = gr::work::Status::ERROR; return; } }, diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 419392105..485ddd7d6 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -57,22 +57,6 @@ invokeProcessOneWithOrWithoutOffset(T &node, std::size_t offset, const Us &...in return node.processOne(inputs...); } -// TODO: inline in work namespace and rename to Status -enum class WorkReturnStatus { - ERROR = -100, /// error occurred in the work function - INSUFFICIENT_OUTPUT_ITEMS = -3, /// work requires a larger output buffer to produce output - INSUFFICIENT_INPUT_ITEMS = -2, /// work requires a larger input buffer to produce output - DONE = -1, /// this block has completed its processing and the flowgraph should be done - OK = 0, /// work call was successful and return values in i/o structs are valid -}; - -// TODO: inline in work namespace and rename WorkReturn to something better 'WorkResult'? -struct WorkReturn { - std::size_t requested_work = std::numeric_limits::max(); - std::size_t performed_work = 0; - WorkReturnStatus status = WorkReturnStatus::OK; -}; - template [[nodiscard]] constexpr auto & inputPort(Self *self) noexcept { @@ -123,27 +107,9 @@ outputPorts(Self *self) noexcept { return [self](std::index_sequence) { return std::tie(outputPort(self)...); }(std::make_index_sequence::size>()); } -template -concept BlockLike = requires(T t, std::size_t requested_work) { - { t.unique_name } -> std::same_as; - { unwrap_if_wrapped_t{} } -> std::same_as; - { unwrap_if_wrapped_t{} } -> std::same_as; - { t.description } noexcept -> std::same_as; - - { t.isBlocking() } noexcept -> std::same_as; - - { t.settings() } -> std::same_as; - { t.work(requested_work) } -> std::same_as; - - // N.B. TODO discuss these requirements - requires !std::is_copy_constructible_v; - requires !std::is_copy_assignable_v; - // requires !std::is_move_constructible_v; - // requires !std::is_move_assignable_v; -}; +namespace work { -namespace detail { -class WorkCounter { +class Counter { std::atomic_uint64_t encodedCounter{ static_cast(std::numeric_limits::max()) << 32 }; public: @@ -185,7 +151,40 @@ class WorkCounter { return { static_cast(workRequested), static_cast(workDone) }; } }; -} // namespace detail + +enum class Status { + ERROR = -100, /// error occurred in the work function + INSUFFICIENT_OUTPUT_ITEMS = -3, /// work requires a larger output buffer to produce output + INSUFFICIENT_INPUT_ITEMS = -2, /// work requires a larger input buffer to produce output + DONE = -1, /// this block has completed its processing and the flowgraph should be done + OK = 0, /// work call was successful and return values in i/o structs are valid +}; + +struct Result { + std::size_t requested_work = std::numeric_limits::max(); + std::size_t performed_work = 0; + Status status = Status::OK; +}; +} // namespace work + +template +concept BlockLike = requires(T t, std::size_t requested_work) { + { t.unique_name } -> std::same_as; + { unwrap_if_wrapped_t{} } -> std::same_as; + { unwrap_if_wrapped_t{} } -> std::same_as; + { t.description } noexcept -> std::same_as; + + { t.isBlocking() } noexcept -> std::same_as; + + { t.settings() } -> std::same_as; + { t.work(requested_work) } -> std::same_as; + + // N.B. TODO discuss these requirements + requires !std::is_copy_constructible_v; + requires !std::is_copy_assignable_v; + // requires !std::is_move_constructible_v; + // requires !std::is_move_assignable_v; +}; template concept HasProcessOneFunction = traits::block::can_processOne; @@ -269,9 +268,9 @@ static_assert(PublishableSpan>); * const auto n_readable = std::min(reader.available(), in_port.max_buffer_size()); * const auto n_writable = std::min(writer.available(), out_port.max_buffer_size()); * if (n_readable == 0) { - * return { 0, gr::WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + * return { 0, gr::work::Status::INSUFFICIENT_INPUT_ITEMS }; * } else if (n_writable == 0) { - * return { 0, gr::WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + * return { 0, gr::work::Status::INSUFFICIENT_OUTPUT_ITEMS }; * } * const std::size_t n_to_publish = std::min(n_readable, n_writable); // N.B. here enforcing N_input == N_output * @@ -283,9 +282,9 @@ static_assert(PublishableSpan>); * }, n_to_publish); * * if (!reader.consume(n_to_publish)) { - * return { n_to_publish, gr::WorkReturnStatus::ERROR }; + * return { n_to_publish, gr::work::Status::ERROR }; * } - * return { n_to_publish, gr::WorkReturnStatus::OK }; + * return { n_to_publish, gr::work::Status::OK }; * } * @endcode *
  • case 4: Python -> map to cases 1-3 and/or dedicated callback (to-be-implemented) @@ -314,8 +313,8 @@ struct Block : protected std::tuple { alignas(hardware_destructive_interference_size) std::atomic_uint32_t ioThreadRunning{ 0_UZ }; alignas(hardware_destructive_interference_size) std::atomic_bool ioThreadShallRun{ false }; alignas(hardware_destructive_interference_size) std::atomic ioRequestedWork{ std::numeric_limits::max() }; - alignas(hardware_destructive_interference_size) detail::WorkCounter ioWorkDone{}; - alignas(hardware_destructive_interference_size) std::atomic ioLastWorkStatus{ WorkReturnStatus::OK }; + alignas(hardware_destructive_interference_size) work::Counter ioWorkDone{}; + alignas(hardware_destructive_interference_size) std::atomic ioLastWorkStatus{ work::Status::OK }; alignas(hardware_destructive_interference_size) std::shared_ptr progress = std::make_shared(); alignas(hardware_destructive_interference_size) std::shared_ptr ioThreadPool = std::make_shared( "block_thread_pool", gr::thread_pool::TaskType::IO_BOUND, 2_UZ, std::numeric_limits::max()); @@ -750,9 +749,9 @@ struct Block : protected std::tuple { * @brief * @return struct { std::size_t produced_work, work_return_t} */ - WorkReturn + work::Result workInternal(std::size_t requested_work) { - using gr::WorkReturnStatus; + using gr::work::Status; using TInputTypes = traits::block::input_port_types; using TOutputTypes = traits::block::output_port_types; @@ -789,17 +788,17 @@ struct Block : protected std::tuple { std::size_t max_buffer = ports_status.out_available; const std::make_signed_t available_samples = self().available_samples(self()); if (available_samples < 0 && max_buffer > 0) { - return { requested_work, 0_UZ, WorkReturnStatus::DONE }; + return { requested_work, 0_UZ, work::Status::DONE }; } if (available_samples == 0) { - return { requested_work, 0_UZ, WorkReturnStatus::OK }; + return { requested_work, 0_UZ, work::Status::OK }; } std::size_t samples_to_process = std::max(0UL, std::min(static_cast(available_samples), max_buffer)); if (not ports_status.enoughSamplesForOutputPorts(samples_to_process)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_INPUT_ITEMS }; } if (samples_to_process == 0) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_OUTPUT_ITEMS }; } ports_status.in_samples = std::min(samples_to_process, requested_work); ports_status.out_samples = ports_status.in_samples; @@ -810,13 +809,13 @@ struct Block : protected std::tuple { // the (source) node wants to determine the number of samples to process std::size_t samples_to_process = available_samples(self()); if (samples_to_process == 0) { - return { requested_work, 0_UZ, WorkReturnStatus::OK }; + return { requested_work, 0_UZ, work::Status::OK }; } if (not ports_status.enoughSamplesForOutputPorts(samples_to_process)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_INPUT_ITEMS }; } if (not ports_status.spaceAvailableOnOutputPorts(samples_to_process)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_OUTPUT_ITEMS }; } ports_status.in_samples = std::min(samples_to_process, requested_work); ports_status.out_samples = ports_status.in_samples; @@ -834,7 +833,7 @@ struct Block : protected std::tuple { // derive value from output buffer size std::size_t samples_to_process = std::min(ports_status.out_available, ports_status.out_max_samples); if (not ports_status.enoughSamplesForOutputPorts(samples_to_process)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_OUTPUT_ITEMS }; } ports_status.in_samples = std::min(samples_to_process, requested_work); ports_status.out_samples = ports_status.in_samples; @@ -846,7 +845,7 @@ struct Block : protected std::tuple { ports_status.out_samples = ports_status.in_samples; if (ports_status.has_sync_input_ports && ports_status.in_available == 0) { - return { requested_work, 0_UZ, ports_status.in_at_least_one_port_has_data ? WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS : WorkReturnStatus::DONE }; + return { requested_work, 0_UZ, ports_status.in_at_least_one_port_has_data ? work::Status::INSUFFICIENT_INPUT_ITEMS : work::Status::DONE }; } if constexpr (Resampling::kEnabled) { @@ -857,7 +856,7 @@ struct Block : protected std::tuple { || (static_cast(ports_status.in_max_samples) * ratio < static_cast(ports_status.out_min_samples)); assert(!is_ill_defined); if (is_ill_defined) { - return { requested_work, 0_UZ, WorkReturnStatus::ERROR }; + return { requested_work, 0_UZ, work::Status::ERROR }; } ports_status.in_samples = static_cast(ports_status.in_samples / denominator) * denominator; // remove reminder @@ -872,7 +871,7 @@ struct Block : protected std::tuple { const std::size_t in_max_samples = static_cast(static_cast(out_max_limit) / ratio); std::size_t in_max_wo_reminder = (in_max_samples / denominator) * denominator; - if (ports_status.in_samples < in_min_wo_reminder) return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + if (ports_status.in_samples < in_min_wo_reminder) return { requested_work, 0_UZ, work::Status::INSUFFICIENT_INPUT_ITEMS }; ports_status.in_samples = std::clamp(ports_status.in_samples, in_min_wo_reminder, in_max_wo_reminder); ports_status.out_samples = numerator * (ports_status.in_samples / denominator); } @@ -881,10 +880,10 @@ struct Block : protected std::tuple { // TODO: special case for ports_status.in_samples == 0 ? if (not ports_status.enoughSamplesForOutputPorts(ports_status.out_samples)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_INPUT_ITEMS }; } if (not ports_status.spaceAvailableOnOutputPorts(ports_status.out_samples)) { - return { requested_work, 0_UZ, WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS }; + return { requested_work, 0_UZ, work::Status::INSUFFICIENT_OUTPUT_ITEMS }; } } @@ -983,7 +982,7 @@ struct Block : protected std::tuple { } const bool success = consume_readers(self(), n_samples_to_consume); forward_tags(); - return { requested_work, n_samples_to_consume, success ? WorkReturnStatus::OK : WorkReturnStatus::ERROR }; + return { requested_work, n_samples_to_consume, success ? work::Status::OK : work::Status::ERROR }; } } } @@ -1033,14 +1032,14 @@ struct Block : protected std::tuple { if constexpr (HasProcessBulkFunction) { // cannot use std::apply because it requires tuple_cat(input_spans, writers_tuple). The latter doesn't work because writers_tuple isn't copyable. - const WorkReturnStatus ret = [&](std::index_sequence, std::index_sequence) { + const work::Status ret = [&](std::index_sequence, std::index_sequence) { return self().processBulk(std::get(input_spans)..., std::get(writers_tuple)...); }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); write_to_outputs(ports_status.out_samples, writers_tuple); const bool success = consume_readers(self(), n_samples_to_consume); forward_tags(); - return { requested_work, ports_status.in_samples, success ? ret : WorkReturnStatus::ERROR }; + return { requested_work, ports_status.in_samples, success ? ret : work::Status::ERROR }; } else if constexpr (HasProcessOneFunction) { if (ports_status.in_samples != ports_status.out_samples) @@ -1087,26 +1086,26 @@ struct Block : protected std::tuple { } #endif forward_tags(); - return { requested_work, ports_status.in_samples, success ? WorkReturnStatus::OK : WorkReturnStatus::ERROR }; + return { requested_work, ports_status.in_samples, success ? work::Status::OK : work::Status::ERROR }; } // processOne(...) handling // else { // static_assert(gr::meta::always_false, "neither processBulk(...) nor processOne(...) implemented"); // } - return { requested_work, 0_UZ, WorkReturnStatus::ERROR }; + return { requested_work, 0_UZ, work::Status::ERROR }; } // end: work_return_t work_internal() noexcept { ..} public: - WorkReturnStatus + work::Status invokeWork() requires(blockingIO) { auto [work_requested, work_done, last_status] = workInternal(ioRequestedWork.load(std::memory_order_relaxed)); ioWorkDone.increment(work_requested, work_done); - if (auto [incWorkRequested, incWorkDone] = ioWorkDone.get(); last_status == WorkReturnStatus::DONE && incWorkDone > 0) { + if (auto [incWorkRequested, incWorkDone] = ioWorkDone.get(); last_status == work::Status::DONE && incWorkDone > 0) { // finished local iteration but need to report more work to be done until // external scheduler loop acknowledged all samples being processed // via the 'ioWorkDone.getAndReset()' call - last_status = WorkReturnStatus::OK; + last_status = work::Status::OK; } ioLastWorkStatus.exchange(last_status, std::memory_order_relaxed); @@ -1122,7 +1121,7 @@ struct Block : protected std::tuple { * requested_work limit as an affine relation with the returned performed_work. * @return { requested_work, performed_work, status} */ - WorkReturn + work::Result work(std::size_t requested_work = std::numeric_limits::max()) noexcept { if constexpr (blockingIO) { constexpr bool useIoThread = std::disjunction_v, Arguments>...>; @@ -1137,7 +1136,7 @@ struct Block : protected std::tuple { #endif for (int retryCount = 2; ioThreadShallRun && retryCount > 0; retryCount--) { while (ioThreadShallRun && retryCount) { - if (invokeWork() == WorkReturnStatus::DONE) { + if (invokeWork() == work::Status::DONE) { break; } else { // processed data before shutting down wait (see below) and retry (here: once) @@ -1160,13 +1159,13 @@ struct Block : protected std::tuple { // let user call '' explicitly } } - const WorkReturnStatus lastStatus = ioLastWorkStatus.exchange(WorkReturnStatus::OK, std::memory_order_relaxed); + const work::Status lastStatus = ioLastWorkStatus.exchange(work::Status::OK, std::memory_order_relaxed); const auto &[accumulatedRequestedWork, performedWork] = ioWorkDone.getAndReset(); // TODO: this is just "working" solution for deadlock with emscripten, need to be investigated further #if defined(__EMSCRIPTEN__) std::this_thread::sleep_for(std::chrono::nanoseconds(1)); #endif - return { accumulatedRequestedWork, performedWork, performedWork > 0 ? WorkReturnStatus::OK : lastStatus }; + return { accumulatedRequestedWork, performedWork, performedWork > 0 ? work::Status::OK : lastStatus }; } else { return workInternal(requested_work); } @@ -1396,7 +1395,7 @@ class MergedBlock : public Block, meta::co } } // end:: processOne - WorkReturn + work::Result work(std::size_t requested_work) noexcept { return base::work(requested_work); } @@ -1548,7 +1547,7 @@ struct RegisterBlock { #ifdef FMT_FORMAT_H_ template<> -struct fmt::formatter { +struct fmt::formatter { static constexpr auto parse(const format_parse_context &ctx) { const auto it = ctx.begin(); @@ -1558,8 +1557,8 @@ struct fmt::formatter { template auto - format(const gr::WorkReturnStatus &status, FormatContext &ctx) { - using enum gr::WorkReturnStatus; + format(const gr::work::Status &status, FormatContext &ctx) { + using enum gr::work::Status; switch (status) { case ERROR: return fmt::format_to(ctx.out(), "ERROR"); case INSUFFICIENT_OUTPUT_ITEMS: return fmt::format_to(ctx.out(), "INSUFFICIENT_OUTPUT_ITEMS"); @@ -1573,7 +1572,7 @@ struct fmt::formatter { }; template<> -struct fmt::formatter { +struct fmt::formatter { static constexpr auto parse(const format_parse_context &ctx) { const auto it = ctx.begin(); @@ -1583,7 +1582,7 @@ struct fmt::formatter { template auto - format(const gr::WorkReturn &work_return, FormatContext &ctx) { + format(const gr::work::Result &work_return, FormatContext &ctx) { return fmt::format_to(ctx.out(), "requested_work: {}, performed_work: {}, status: {}", work_return.requested_work, work_return.performed_work, fmt::format("{}", work_return.status)); } }; diff --git a/core/include/gnuradio-4.0/BlockTraits.hpp b/core/include/gnuradio-4.0/BlockTraits.hpp index 7ed0a618c..ca3e3d831 100644 --- a/core/include/gnuradio-4.0/BlockTraits.hpp +++ b/core/include/gnuradio-4.0/BlockTraits.hpp @@ -9,8 +9,8 @@ #include -namespace gr { -enum class WorkReturnStatus; +namespace gr::work { +enum class Status; } namespace gr::traits::block { @@ -345,7 +345,7 @@ concept can_processBulk = requires(TBlock &n, typename meta::transform_types_nes typename meta::transform_types_nested>::tuple_type outputs) { { detail::can_processBulk_invoke_test(n, inputs, outputs, std::make_index_sequence::size>(), std::make_index_sequence::size>()) - } -> std::same_as; + } -> std::same_as; }; /** @@ -367,7 +367,7 @@ concept processBulk_requires_ith_output_as_span = requires(TDerived std::index_sequence) -> decltype(d.processBulk(std::get(inputs)..., std::get(outputs)...)) { return {}; }(std::make_index_sequence::size>(), std::make_index_sequence::size>()) - } -> std::same_as; + } -> std::same_as; not requires { [](std::index_sequence, std::index_sequence) -> decltype(d.processBulk(std::get(inputs)..., std::get(bad_outputs)...)) { diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index c3c398d34..35281850f 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -159,7 +159,7 @@ class BlockModel { settings() const = 0; - [[nodiscard]] virtual WorkReturn + [[nodiscard]] virtual work::Result work(std::size_t requested_work) = 0; @@ -260,7 +260,7 @@ class BlockWrapper : public BlockModel { return blockRef().init(progress, ioThreadPool); } - [[nodiscard]] constexpr WorkReturn + [[nodiscard]] constexpr work::Result work(std::size_t requested_work = std::numeric_limits::max()) override { return blockRef().work(requested_work); } diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 558d7fccc..7cbbed0a5 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -128,7 +128,7 @@ class SchedulerBase { } void - runOnPool(const std::vector> &jobs, const std::function &)> work_function) { + runOnPool(const std::vector> &jobs, const std::function &)> work_function) { [[maybe_unused]] const auto pe = _profiler_handler.startCompleteEvent("scheduler_base.runOnPool"); _progress = 0; _running_threads = jobs.size(); @@ -138,14 +138,14 @@ class SchedulerBase { } void - poolWorker(const std::function &work, std::size_t n_batches) { + poolWorker(const std::function &work, std::size_t n_batches) { auto &profiler_handler = _profiler.forThisThread(); uint32_t done = 0; uint32_t progress_count = 0; while (done < n_batches && !_stop_requested) { auto pe = profiler_handler.startCompleteEvent("scheduler_base.work"); - bool something_happened = work().status == WorkReturnStatus::OK; + bool something_happened = work().status == work::Status::OK; pe.finish(); uint64_t progress_local, progress_new; if (something_happened) { // something happened in this thread => increase progress and reset done count @@ -211,7 +211,7 @@ class Simple : public SchedulerBase { } template - WorkReturn + work::Result workOnce(const std::span &blocks) { constexpr std::size_t requested_work = std::numeric_limits::max(); bool something_happened = false; @@ -219,11 +219,11 @@ class Simple : public SchedulerBase { for (auto ¤tBlock : blocks) { auto result = currentBlock->work(requested_work); performed_work += result.performed_work; - if (result.status == WorkReturnStatus::ERROR) { - return { requested_work, performed_work, WorkReturnStatus::ERROR }; - } else if (result.status == WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS || result.status == WorkReturnStatus::DONE) { + if (result.status == work::Status::ERROR) { + return { requested_work, performed_work, work::Status::ERROR }; + } else if (result.status == work::Status::INSUFFICIENT_INPUT_ITEMS || result.status == work::Status::DONE) { // nothing - } else if (result.status == WorkReturnStatus::OK || result.status == WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS) { + } else if (result.status == work::Status::OK || result.status == work::Status::INSUFFICIENT_OUTPUT_ITEMS) { something_happened = true; } if (currentBlock->isBlocking()) { // work-around for `DONE` issue when running with multithreaded BlockingIO blocks -> TODO: needs a better solution on a global scope @@ -232,7 +232,7 @@ class Simple : public SchedulerBase { something_happened |= std::accumulate(available_input_samples.begin(), available_input_samples.end(), 0_UZ) > 0_UZ; } } - return { requested_work, performed_work, something_happened ? WorkReturnStatus::OK : WorkReturnStatus::DONE }; + return { requested_work, performed_work, something_happened ? work::Status::OK : work::Status::DONE }; } // todo: could be moved to base class, but would make `start()` virtual or require CRTP @@ -262,12 +262,12 @@ class Simple : public SchedulerBase { } if constexpr (executionPolicy == singleThreaded) { this->_state = RUNNING; - WorkReturn result; - auto blocklist = std::span{ this->_graph.blocks() }; + work::Result result; + auto blocklist = std::span{ this->_graph.blocks() }; do { result = workOnce(blocklist); - } while (result.status == WorkReturnStatus::OK); - if (result.status == WorkReturnStatus::ERROR) { + } while (result.status == work::Status::OK); + if (result.status == work::Status::ERROR) { this->_state = ERROR; } else { this->_state = STOPPED; @@ -350,7 +350,7 @@ class BreadthFirst : public SchedulerBase { } template - WorkReturn + work::Result workOnce(const std::span &blocks) { constexpr std::size_t requested_work = std::numeric_limits::max(); bool something_happened = false; @@ -359,11 +359,11 @@ class BreadthFirst : public SchedulerBase { for (auto ¤tBlock : blocks) { auto result = currentBlock->work(requested_work); performed_work += result.performed_work; - if (result.status == WorkReturnStatus::ERROR) { - return { requested_work, performed_work, WorkReturnStatus::ERROR }; - } else if (result.status == WorkReturnStatus::INSUFFICIENT_INPUT_ITEMS || result.status == WorkReturnStatus::DONE) { + if (result.status == work::Status::ERROR) { + return { requested_work, performed_work, work::Status::ERROR }; + } else if (result.status == work::Status::INSUFFICIENT_INPUT_ITEMS || result.status == work::Status::DONE) { // nothing - } else if (result.status == WorkReturnStatus::OK || result.status == WorkReturnStatus::INSUFFICIENT_OUTPUT_ITEMS) { + } else if (result.status == work::Status::OK || result.status == work::Status::INSUFFICIENT_OUTPUT_ITEMS) { something_happened = true; } @@ -374,7 +374,7 @@ class BreadthFirst : public SchedulerBase { } } - return { requested_work, performed_work, something_happened ? WorkReturnStatus::OK : WorkReturnStatus::DONE }; + return { requested_work, performed_work, something_happened ? work::Status::OK : work::Status::DONE }; } void @@ -401,10 +401,10 @@ class BreadthFirst : public SchedulerBase { } if constexpr (executionPolicy == singleThreaded) { this->_state = RUNNING; - WorkReturn result; - auto blocklist = std::span{ this->_blocklist }; - while ((result = workOnce(blocklist)).status == WorkReturnStatus::OK) { - if (result.status == WorkReturnStatus::ERROR) { + work::Result result; + auto blocklist = std::span{ this->_blocklist }; + while ((result = workOnce(blocklist)).status == work::Status::OK) { + if (result.status == work::Status::ERROR) { this->_state = ERROR; return; } diff --git a/core/test/plugins/good_base_plugin.cpp b/core/test/plugins/good_base_plugin.cpp index 4fe535772..28a7189ad 100644 --- a/core/test/plugins/good_base_plugin.cpp +++ b/core/test/plugins/good_base_plugin.cpp @@ -53,11 +53,11 @@ class fixed_source : public grg::Block, grg::PortOutNamed(this); @@ -68,11 +68,11 @@ class fixed_source : public grg::Block, grg::PortOutNamed, gr::ResamplingRatio<>, gr: ProcessStatus status{}; bool write_to_vector{ false }; - gr::WorkReturnStatus + gr::work::Status processBulk(std::span input, std::span output) noexcept { status.n_inputs = input.size(); status.n_outputs = output.size(); @@ -96,7 +96,7 @@ struct IntDecBlock : public gr::Block, gr::ResamplingRatio<>, gr: status.total_out += output.size(); if (write_to_vector) status.in_vector.insert(status.in_vector.end(), input.begin(), input.end()); - return gr::WorkReturnStatus::OK; + return gr::work::Status::OK; } }; diff --git a/core/test/qa_DynamicBlock.cpp b/core/test/qa_DynamicBlock.cpp index 8535ef9a8..086e68423 100644 --- a/core/test/qa_DynamicBlock.cpp +++ b/core/test/qa_DynamicBlock.cpp @@ -11,7 +11,7 @@ template struct fixed_source : public gr::Block, gr::PortOutNamed> { T value = 1; - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) { using namespace gr::literals; auto &port = gr::outputPort<0>(this); @@ -21,7 +21,7 @@ struct fixed_source : public gr::Block, gr::PortOutNamed> { grg::PortOut value; std::size_t _counter = 0; - gr::WorkReturn + gr::work::Result work(std::size_t requested_work) { if (_counter < count) { _counter++; @@ -88,9 +88,9 @@ class repeater_source : public grg::Block> { data[0] = val; data.publish(1); - return { requested_work, 1_UZ, gr::WorkReturnStatus::OK }; + return { requested_work, 1_UZ, gr::work::Status::OK }; } else { - return { requested_work, 0_UZ, gr::WorkReturnStatus::DONE }; + return { requested_work, 0_UZ, gr::work::Status::DONE }; } } }; diff --git a/core/test/qa_HierBlock.cpp b/core/test/qa_HierBlock.cpp index cb316f89b..8020e0fe6 100644 --- a/core/test/qa_HierBlock.cpp +++ b/core/test/qa_HierBlock.cpp @@ -96,10 +96,10 @@ class HierBlock : public grg::BlockModel { return 0UL; } - grg::WorkReturn + grg::work::Result work(std::size_t requested_work) override { _scheduler.runAndWait(); - return { requested_work, requested_work, gr::WorkReturnStatus::DONE }; + return { requested_work, requested_work, gr::work::Status::DONE }; } void * @@ -141,7 +141,7 @@ struct fixed_source : public grg::Block> { T value = 1; - grg::WorkReturn + grg::work::Result work(std::size_t requested_work) { if (remaining_events_count != 0) { using namespace gr::literals; @@ -156,10 +156,10 @@ struct fixed_source : public grg::Block> { } value += 1; - return { requested_work, 1UL, grg::WorkReturnStatus::OK }; + return { requested_work, 1UL, grg::work::Status::OK }; } else { // TODO: Investigate what schedulers do when there is an event written, but we return DONE - return { requested_work, 1UL, grg::WorkReturnStatus::DONE }; + return { requested_work, 1UL, grg::work::Status::DONE }; } } }; diff --git a/core/test/qa_scheduler.cpp b/core/test/qa_scheduler.cpp index 45e1cfe70..777c1abfb 100644 --- a/core/test/qa_scheduler.cpp +++ b/core/test/qa_scheduler.cpp @@ -65,14 +65,14 @@ class expect_sink : public gr::Block, gr::PortInNamed ~expect_sink() { boost::ut::expect(boost::ut::that % _count == N); } - [[nodiscard]] gr::WorkReturnStatus + [[nodiscard]] gr::work::Status processBulk(std::span input) noexcept { _tracer.trace(this->name); for (auto data : input) { _checker(_count, data); _count++; } - return gr::WorkReturnStatus::OK; + return gr::work::Status::OK; } constexpr void diff --git a/core/test/qa_settings.cpp b/core/test/qa_settings.cpp index 57903ecea..c4d062621 100644 --- a/core/test/qa_settings.cpp +++ b/core/test/qa_settings.cpp @@ -185,7 +185,7 @@ struct Decimate : public Block, SupportedTypes input, std::span output) noexcept { assert(this->numerator == std::size_t(1) && "block implements only basic decimation"); assert(this->denominator != std::size_t(0) && "denominator must be non-zero"); @@ -203,7 +203,7 @@ struct Decimate : public Block, SupportedTypes