From 6d3546667554078a59650f534af7bbf9f67e552b Mon Sep 17 00:00:00 2001 From: rstein Date: Wed, 11 Oct 2023 14:48:20 +0200 Subject: [PATCH] refactored merged_node -> MergedBlock -> MergedGraph ... and moved its implementation to Graph.hpp since its semantic behaviour is closer to a compile-time merged Graph than a Block. * missing features to be addressed: * handling of tag forwarding * handling of sub-block settings * topology of internal graph (blocks+connections) --- core/include/gnuradio-4.0/Block.hpp | 287 ------------------------ core/include/gnuradio-4.0/Graph.hpp | 333 ++++++++++++++++++++++++++++ 2 files changed, 333 insertions(+), 287 deletions(-) diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 485ddd7d6..f0e0a4f2b 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -1229,293 +1229,6 @@ blockDescription() noexcept { return ret; } -template -concept SourceBlockLike = traits::block::can_processOne and traits::block::template output_port_types::size > 0; - -static_assert(not SourceBlockLike); - -template -concept SinkBlockLike = traits::block::can_processOne and traits::block::template input_port_types::size > 0; - -static_assert(not SinkBlockLike); - -template -class MergedBlock : public Block, meta::concat, meta::remove_at>>, - meta::concat>, typename traits::block::output_ports>> { - static std::atomic_size_t _unique_id_counter; - -public: - const std::size_t unique_id = _unique_id_counter++; - const std::string unique_name = fmt::format("MergedBlock<{}:{},{}:{}>#{}", gr::meta::type_name(), OutId, gr::meta::type_name(), InId, unique_id); - -private: - // copy-paste from above, keep in sync - using base = Block, meta::concat, meta::remove_at>>, - meta::concat>, typename traits::block::output_ports>>; - - Left left; - Right right; - - // merged_work_chunk_size, that's what friends are for - friend base; - - template - friend class MergedBlock; - - // returns the minimum of all internal max_samples port template parameters - static constexpr std::size_t - merged_work_chunk_size() noexcept { - constexpr std::size_t left_size = []() { - if constexpr (requires { - { Left::merged_work_chunk_size() } -> std::same_as; - }) { - return Left::merged_work_chunk_size(); - } else { - return std::dynamic_extent; - } - }(); - constexpr std::size_t right_size = []() { - if constexpr (requires { - { Right::merged_work_chunk_size() } -> std::same_as; - }) { - return Right::merged_work_chunk_size(); - } else { - return std::dynamic_extent; - } - }(); - return std::min({ traits::block::input_ports::template apply::value, traits::block::output_ports::template apply::value, - left_size, right_size }); - } - - template - constexpr auto - apply_left(std::size_t offset, auto &&input_tuple) noexcept { - return [&](std::index_sequence) { - return invokeProcessOneWithOrWithoutOffset(left, offset, std::get(std::forward(input_tuple))...); - }(std::make_index_sequence()); - } - - template - constexpr auto - apply_right(std::size_t offset, auto &&input_tuple, auto &&tmp) noexcept { - return [&](std::index_sequence, std::index_sequence) { - constexpr std::size_t first_offset = traits::block::input_port_types::size; - constexpr std::size_t second_offset = traits::block::input_port_types::size + sizeof...(Is); - static_assert(second_offset + sizeof...(Js) == std::tuple_size_v>); - return invokeProcessOneWithOrWithoutOffset(right, offset, std::get(std::forward(input_tuple))..., std::forward(tmp), - std::get(input_tuple)...); - }(std::make_index_sequence(), std::make_index_sequence()); - } - -public: - using TInputPortTypes = typename traits::block::input_port_types; - using TOutputPortTypes = typename traits::block::output_port_types; - using TReturnType = typename traits::block::return_type; - - constexpr MergedBlock(Left l, Right r) : left(std::move(l)), right(std::move(r)) {} - - // if the left node (source) implements available_samples (a customization point), then pass the call through - friend constexpr std::size_t - available_samples(const MergedBlock &self) noexcept - requires requires(const Left &l) { - { available_samples(l) } -> std::same_as; - } - { - return available_samples(self.left); - } - - template - requires traits::block::can_processOne_simd and traits::block::can_processOne_simd - constexpr meta::simdize>> - processOne(std::size_t offset, const Ts &...inputs) { - static_assert(traits::block::output_port_types::size == 1, "TODO: SIMD for multiple output ports not implemented yet"); - return apply_right::size() - InId - 1>(offset, std::tie(inputs...), - apply_left::size()>(offset, std::tie(inputs...))); - } - - constexpr auto - processOne_simd(std::size_t offset, auto N) - requires traits::block::can_processOne_simd - { - if constexpr (requires(Left &l) { - { l.processOne_simd(offset, N) }; - }) { - return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(offset, N)); - } else if constexpr (requires(Left &l) { - { l.processOne_simd(N) }; - }) { - return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(N)); - } else { - using LeftResult = typename traits::block::return_type; - using V = meta::simdize; - alignas(stdx::memory_alignment_v) LeftResult tmp[V::size()]; - for (std::size_t i = 0; i < V::size(); ++i) { - tmp[i] = invokeProcessOneWithOrWithoutOffset(left, offset + i); - } - return invokeProcessOneWithOrWithoutOffset(right, offset, V(tmp, stdx::vector_aligned)); - } - } - - template - // Nicer error messages for the following would be good, but not at the expense of breaking can_processOne_simd. - requires(TInputPortTypes::template are_equal...>) - constexpr TReturnType - processOne(std::size_t offset, Ts &&...inputs) { - // if (sizeof...(Ts) == 0) we could call `return processOne_simd(integral_constant)`. But if - // the caller expects to process *one* sample (no inputs for the caller to explicitly - // request simd), and we process more, we risk inconsistencies. - if constexpr (traits::block::output_port_types::size == 1) { - // only the result from the right node needs to be returned - return apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), - apply_left::size()>(offset, std::forward_as_tuple( - std::forward(inputs)...))); - - } else { - // left produces a tuple - auto left_out = apply_left::size()>(offset, std::forward_as_tuple(std::forward(inputs)...)); - auto right_out = apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), - std::move(std::get(left_out))); - - if constexpr (traits::block::output_port_types::size == 2 && traits::block::output_port_types::size == 1) { - return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); - - } else if constexpr (traits::block::output_port_types::size == 2) { - return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); - - } else if constexpr (traits::block::output_port_types::size == 1) { - return [&](std::index_sequence, std::index_sequence) { - return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); - }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>()); - - } else { - return [&](std::index_sequence, std::index_sequence, std::index_sequence) { - return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(std::get(right_out)...)); - }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>(), std::make_index_sequence()); - } - } - } // end:: processOne - - work::Result - work(std::size_t requested_work) noexcept { - return base::work(requested_work); - } -}; - -template -inline std::atomic_size_t MergedBlock::_unique_id_counter{ 0_UZ }; - -/** - * This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a - * new `merged` node, bypassing the dynamic run-time buffers. - * Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders - * of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details. - * This function uses the connect-by-port-ID API. - * - * Example: - * @code - * // declare flow-graph: 2 x in -> adder -> scale-by-2 -> scale-by-minus1 -> output - * auto merged = merge_by_index<0, 0>(scale(), merge_by_index<0, 0>(scale(), adder())); - * - * // execute graph - * std::array a = { 1, 2, 3, 4 }; - * std::array b = { 10, 10, 10, 10 }; - * - * int r = 0; - * for (std::size_t i = 0; i < 4; ++i) { - * r += merged.processOne(a[i], b[i]); - * } - * @endcode - */ -template -constexpr auto -mergeByIndex(A &&a, B &&b) -> MergedBlock, std::remove_cvref_t, OutId, InId> { - if constexpr (!std::is_same_v>::template at, - typename traits::block::input_port_types>::template at>) { - gr::meta::print_types, typename traits::block::output_port_types>, std::integral_constant, - typename traits::block::output_port_types>::template at, - - gr::meta::message_type<"INPUT_PORTS_ARE:">, typename traits::block::input_port_types>, std::integral_constant, - typename traits::block::input_port_types>::template at>{}; - } - return { std::forward(a), std::forward(b) }; -} - -/** - * This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a - * new `merged` node, bypassing the dynamic run-time buffers. - * Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders - * of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details. - * This function uses the connect-by-port-name API. - * - * Example: - * @code - * // declare flow-graph: 2 x in -> adder -> scale-by-2 -> output - * auto merged = merge<"scaled", "addend1">(scale(), adder()); - * - * // execute graph - * std::array a = { 1, 2, 3, 4 }; - * std::array b = { 10, 10, 10, 10 }; - * - * int r = 0; - * for (std::size_t i = 0; i < 4; ++i) { - * r += merged.processOne(a[i], b[i]); - * } - * @endcode - */ -template -constexpr auto -merge(A &&a, B &&b) { - constexpr int OutIdUnchecked = meta::indexForName>(); - constexpr int InIdUnchecked = meta::indexForName>(); - static_assert(OutIdUnchecked != -1); - static_assert(InIdUnchecked != -1); - constexpr auto OutId = static_cast(OutIdUnchecked); - constexpr auto InId = static_cast(InIdUnchecked); - static_assert(std::same_as>::template at, - typename traits::block::input_port_types>::template at>, - "Port types do not match"); - return MergedBlock, std::remove_cvref_t, OutId, InId>{ std::forward(a), std::forward(b) }; -} - -#if !DISABLE_SIMD -namespace test { // TODO: move to dedicated tests - -struct copy : public Block { - PortIn in; - PortOut out; - -public: - template V> - [[nodiscard]] constexpr V - processOne(const V &a) const noexcept { - return a; - } -}; -} // namespace test -#endif -} // namespace gr - -#if !DISABLE_SIMD -ENABLE_REFLECTION(gr::test::copy, in, out); -#endif - -namespace gr { - -#if !DISABLE_SIMD -namespace test { -static_assert(traits::block::input_port_types::size() == 1); -static_assert(std::same_as, float>); -static_assert(traits::block::can_processOne_scalar); -static_assert(traits::block::can_processOne_simd); -static_assert(traits::block::can_processOne_scalar_with_offset(copy(), copy()))>); -static_assert(traits::block::can_processOne_simd_with_offset(copy(), copy()))>); -static_assert(SourceBlockLike); -static_assert(SinkBlockLike); -static_assert(SourceBlockLike(copy(), copy()))>); -static_assert(SinkBlockLike(copy(), copy()))>); -} // namespace test -#endif - namespace detail { template struct BlockParameters { diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index 35281850f..97c08aec8 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -649,6 +649,339 @@ struct Graph { } }; +/*******************************************************************************************************/ +/**************************** begin of SIMD-Merged Graph Implementation ********************************/ +/*******************************************************************************************************/ + +/** + * Concepts and class for Merging Blocks to Sub-Graph Functionality + * + * This code provides a way to merge blocks of processing units in a flow-graph for efficient computation. + * The merging occurs at compile-time, enabling the execution performance to be much better than running + * each constituent block individually. + * + * Concepts: + * - `SourceBlockLike`: Represents a source block with processing capability and at least one output port. + * - `SinkBlockLike`: Represents a sink block with processing capability and at least one input port. + * + * Key Features: + * - `MergedGraph` class: Combines a source and sink block into a new unit, connecting them via specified + * output and input port indices. + * - The merged block can be efficiently optimized at compile-time. + * - Each `MergedGraph` instance has a unique ID and name, aiding in debugging and identification. + * - The merging works seamlessly for blocks that have single or multiple output ports. + * - It allows for SIMD optimizations if the constituent blocks support it. + * + * Utility Functions: + * - `mergeByIndex()`: A utility function to merge two blocks based on specified port indices. + * It checks if the output port of the source block and the input port of the sink block have matching types. + * + * Examples: + * This enables you to create a flow-graph where you merge blocks to create optimized processing paths. + * Example usage can be found in the documentation of `mergeByIndex()`. + * + * Dependencies: + * - Relies on various traits and meta-programming utilities for type introspection and compile-time checks. + * + * Note: + * - The implementation of the actual processing logic (e.g., `processOne()`, `processOne_simd()`, etc.) + * and their SIMD variants is specific to the logic and capabilities of the blocks being merged. + * + * Limitations: + * - Currently, SIMD support for multiple output ports is not implemented. + */ + +template +concept SourceBlockLike = traits::block::can_processOne and traits::block::template output_port_types::size > 0; + +static_assert(not SourceBlockLike); + +template +concept SinkBlockLike = traits::block::can_processOne and traits::block::template input_port_types::size > 0; + +static_assert(not SinkBlockLike); + +template +class MergedGraph : public Block, meta::concat, meta::remove_at>>, + meta::concat>, typename traits::block::output_ports>> { + static std::atomic_size_t _unique_id_counter; + +public: + const std::size_t unique_id = _unique_id_counter++; + const std::string unique_name = fmt::format("MergedGraph<{}:{},{}:{}>#{}", gr::meta::type_name(), OutId, gr::meta::type_name(), InId, unique_id); + +private: + // copy-paste from above, keep in sync + using base = Block, meta::concat, meta::remove_at>>, + meta::concat>, typename traits::block::output_ports>>; + + Left left; + Right right; + + // merged_work_chunk_size, that's what friends are for + friend base; + + template + friend class MergedGraph; + + // returns the minimum of all internal max_samples port template parameters + static constexpr std::size_t + merged_work_chunk_size() noexcept { + constexpr std::size_t left_size = []() { + if constexpr (requires { + { Left::merged_work_chunk_size() } -> std::same_as; + }) { + return Left::merged_work_chunk_size(); + } else { + return std::dynamic_extent; + } + }(); + constexpr std::size_t right_size = []() { + if constexpr (requires { + { Right::merged_work_chunk_size() } -> std::same_as; + }) { + return Right::merged_work_chunk_size(); + } else { + return std::dynamic_extent; + } + }(); + return std::min({ traits::block::input_ports::template apply::value, traits::block::output_ports::template apply::value, + left_size, right_size }); + } + + template + constexpr auto + apply_left(std::size_t offset, auto &&input_tuple) noexcept { + return [&](std::index_sequence) { + return invokeProcessOneWithOrWithoutOffset(left, offset, std::get(std::forward(input_tuple))...); + }(std::make_index_sequence()); + } + + template + constexpr auto + apply_right(std::size_t offset, auto &&input_tuple, auto &&tmp) noexcept { + return [&](std::index_sequence, std::index_sequence) { + constexpr std::size_t first_offset = traits::block::input_port_types::size; + constexpr std::size_t second_offset = traits::block::input_port_types::size + sizeof...(Is); + static_assert(second_offset + sizeof...(Js) == std::tuple_size_v>); + return invokeProcessOneWithOrWithoutOffset(right, offset, std::get(std::forward(input_tuple))..., std::forward(tmp), + std::get(input_tuple)...); + }(std::make_index_sequence(), std::make_index_sequence()); + } + +public: + using TInputPortTypes = typename traits::block::input_port_types; + using TOutputPortTypes = typename traits::block::output_port_types; + using TReturnType = typename traits::block::return_type; + + constexpr MergedGraph(Left l, Right r) : left(std::move(l)), right(std::move(r)) {} + + // if the left node (source) implements available_samples (a customization point), then pass the call through + friend constexpr std::size_t + available_samples(const MergedGraph &self) noexcept + requires requires(const Left &l) { + { available_samples(l) } -> std::same_as; + } + { + return available_samples(self.left); + } + + template + requires traits::block::can_processOne_simd and traits::block::can_processOne_simd + constexpr meta::simdize>> + processOne(std::size_t offset, const Ts &...inputs) { + static_assert(traits::block::output_port_types::size == 1, "TODO: SIMD for multiple output ports not implemented yet"); + return apply_right::size() - InId - 1>(offset, std::tie(inputs...), + apply_left::size()>(offset, std::tie(inputs...))); + } + + constexpr auto + processOne_simd(std::size_t offset, auto N) + requires traits::block::can_processOne_simd + { + if constexpr (requires(Left &l) { + { l.processOne_simd(offset, N) }; + }) { + return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(offset, N)); + } else if constexpr (requires(Left &l) { + { l.processOne_simd(N) }; + }) { + return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(N)); + } else { + using LeftResult = typename traits::block::return_type; + using V = meta::simdize; + alignas(stdx::memory_alignment_v) LeftResult tmp[V::size()]; + for (std::size_t i = 0; i < V::size(); ++i) { + tmp[i] = invokeProcessOneWithOrWithoutOffset(left, offset + i); + } + return invokeProcessOneWithOrWithoutOffset(right, offset, V(tmp, stdx::vector_aligned)); + } + } + + template + // Nicer error messages for the following would be good, but not at the expense of breaking can_processOne_simd. + requires(TInputPortTypes::template are_equal...>) + constexpr TReturnType + processOne(std::size_t offset, Ts &&...inputs) { + // if (sizeof...(Ts) == 0) we could call `return processOne_simd(integral_constant)`. But if + // the caller expects to process *one* sample (no inputs for the caller to explicitly + // request simd), and we process more, we risk inconsistencies. + if constexpr (traits::block::output_port_types::size == 1) { + // only the result from the right node needs to be returned + return apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), + apply_left::size()>(offset, std::forward_as_tuple( + std::forward(inputs)...))); + + } else { + // left produces a tuple + auto left_out = apply_left::size()>(offset, std::forward_as_tuple(std::forward(inputs)...)); + auto right_out = apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), + std::move(std::get(left_out))); + + if constexpr (traits::block::output_port_types::size == 2 && traits::block::output_port_types::size == 1) { + return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); + + } else if constexpr (traits::block::output_port_types::size == 2) { + return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); + + } else if constexpr (traits::block::output_port_types::size == 1) { + return [&](std::index_sequence, std::index_sequence) { + return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); + }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>()); + + } else { + return [&](std::index_sequence, std::index_sequence, std::index_sequence) { + return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(std::get(right_out)...)); + }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>(), std::make_index_sequence()); + } + } + } // end:: processOne + + work::Result + work(std::size_t requested_work) noexcept { + return base::work(requested_work); + } +}; + +template +inline std::atomic_size_t MergedGraph::_unique_id_counter{ 0_UZ }; + +/** + * This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a + * new `merged` node, bypassing the dynamic run-time buffers. + * Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders + * of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details. + * This function uses the connect-by-port-ID API. + * + * Example: + * @code + * // declare flow-graph: 2 x in -> adder -> scale-by-2 -> scale-by-minus1 -> output + * auto merged = merge_by_index<0, 0>(scale(), merge_by_index<0, 0>(scale(), adder())); + * + * // execute graph + * std::array a = { 1, 2, 3, 4 }; + * std::array b = { 10, 10, 10, 10 }; + * + * int r = 0; + * for (std::size_t i = 0; i < 4; ++i) { + * r += merged.processOne(a[i], b[i]); + * } + * @endcode + */ +template +constexpr auto +mergeByIndex(A &&a, B &&b) -> MergedGraph, std::remove_cvref_t, OutId, InId> { + if constexpr (!std::is_same_v>::template at, + typename traits::block::input_port_types>::template at>) { + gr::meta::print_types, typename traits::block::output_port_types>, std::integral_constant, + typename traits::block::output_port_types>::template at, + + gr::meta::message_type<"INPUT_PORTS_ARE:">, typename traits::block::input_port_types>, std::integral_constant, + typename traits::block::input_port_types>::template at>{}; + } + return { std::forward(a), std::forward(b) }; +} + +/** + * This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a + * new `merged` node, bypassing the dynamic run-time buffers. + * Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders + * of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details. + * This function uses the connect-by-port-name API. + * + * Example: + * @code + * // declare flow-graph: 2 x in -> adder -> scale-by-2 -> output + * auto merged = merge<"scaled", "addend1">(scale(), adder()); + * + * // execute graph + * std::array a = { 1, 2, 3, 4 }; + * std::array b = { 10, 10, 10, 10 }; + * + * int r = 0; + * for (std::size_t i = 0; i < 4; ++i) { + * r += merged.processOne(a[i], b[i]); + * } + * @endcode + */ +template +constexpr auto +merge(A &&a, B &&b) { + constexpr int OutIdUnchecked = meta::indexForName>(); + constexpr int InIdUnchecked = meta::indexForName>(); + static_assert(OutIdUnchecked != -1); + static_assert(InIdUnchecked != -1); + constexpr auto OutId = static_cast(OutIdUnchecked); + constexpr auto InId = static_cast(InIdUnchecked); + static_assert(std::same_as>::template at, + typename traits::block::input_port_types>::template at>, + "Port types do not match"); + return MergedGraph, std::remove_cvref_t, OutId, InId>{ std::forward(a), std::forward(b) }; +} + +#if !DISABLE_SIMD +namespace test { // TODO: move to dedicated tests + +struct copy : public Block { + PortIn in; + PortOut out; + +public: + template V> + [[nodiscard]] constexpr V + processOne(const V &a) const noexcept { + return a; + } +}; +} // namespace test +#endif +} // namespace gr + +#if !DISABLE_SIMD +ENABLE_REFLECTION(gr::test::copy, in, out); +#endif + +namespace gr { + +#if !DISABLE_SIMD +namespace test { +static_assert(traits::block::input_port_types::size() == 1); +static_assert(std::same_as, float>); +static_assert(traits::block::can_processOne_scalar); +static_assert(traits::block::can_processOne_simd); +static_assert(traits::block::can_processOne_scalar_with_offset(copy(), copy()))>); +static_assert(traits::block::can_processOne_simd_with_offset(copy(), copy()))>); +static_assert(SourceBlockLike); +static_assert(SinkBlockLike); +static_assert(SourceBlockLike(copy(), copy()))>); +static_assert(SinkBlockLike(copy(), copy()))>); +} // namespace test +#endif + +/*******************************************************************************************************/ +/**************************** end of SIMD-Merged Graph Implementation **********************************/ +/*******************************************************************************************************/ + // TODO: add nicer enum formatter inline std::ostream & operator<<(std::ostream &os, const ConnectionResult &value) {