Skip to content

Commit

Permalink
refactored merged_node -> MergedBlock -> MergedGraph
Browse files Browse the repository at this point in the history
... and moved its implementation to Graph.hpp since its semantic behaviour is closer to a compile-time merged Graph than a Block.

* missing features to be addressed:
  * handling of tag forwarding
  * handling of sub-block settings
  * topology of internal graph (blocks+connections)
  • Loading branch information
RalphSteinhagen authored and wirew0rm committed Oct 11, 2023
1 parent 87ff686 commit 6d35466
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 287 deletions.
287 changes: 0 additions & 287 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,293 +1229,6 @@ blockDescription() noexcept {
return ret;
}

template<typename TBlock>
concept SourceBlockLike = traits::block::can_processOne<TBlock> and traits::block::template output_port_types<TBlock>::size > 0;

static_assert(not SourceBlockLike<int>);

template<typename TBlock>
concept SinkBlockLike = traits::block::can_processOne<TBlock> and traits::block::template input_port_types<TBlock>::size > 0;

static_assert(not SinkBlockLike<int>);

template<SourceBlockLike Left, SinkBlockLike Right, std::size_t OutId, std::size_t InId>
class MergedBlock : public Block<MergedBlock<Left, Right, OutId, InId>, meta::concat<typename traits::block::input_ports<Left>, meta::remove_at<InId, typename traits::block::input_ports<Right>>>,
meta::concat<meta::remove_at<OutId, typename traits::block::output_ports<Left>>, typename traits::block::output_ports<Right>>> {
static std::atomic_size_t _unique_id_counter;

public:
const std::size_t unique_id = _unique_id_counter++;
const std::string unique_name = fmt::format("MergedBlock<{}:{},{}:{}>#{}", gr::meta::type_name<Left>(), OutId, gr::meta::type_name<Right>(), InId, unique_id);

private:
// copy-paste from above, keep in sync
using base = Block<MergedBlock<Left, Right, OutId, InId>, meta::concat<typename traits::block::input_ports<Left>, meta::remove_at<InId, typename traits::block::input_ports<Right>>>,
meta::concat<meta::remove_at<OutId, typename traits::block::output_ports<Left>>, typename traits::block::output_ports<Right>>>;

Left left;
Right right;

// merged_work_chunk_size, that's what friends are for
friend base;

template<SourceBlockLike, SinkBlockLike, std::size_t, std::size_t>
friend class MergedBlock;

// returns the minimum of all internal max_samples port template parameters
static constexpr std::size_t
merged_work_chunk_size() noexcept {
constexpr std::size_t left_size = []() {
if constexpr (requires {
{ Left::merged_work_chunk_size() } -> std::same_as<std::size_t>;
}) {
return Left::merged_work_chunk_size();
} else {
return std::dynamic_extent;
}
}();
constexpr std::size_t right_size = []() {
if constexpr (requires {
{ Right::merged_work_chunk_size() } -> std::same_as<std::size_t>;
}) {
return Right::merged_work_chunk_size();
} else {
return std::dynamic_extent;
}
}();
return std::min({ traits::block::input_ports<Right>::template apply<traits::port::max_samples>::value, traits::block::output_ports<Left>::template apply<traits::port::max_samples>::value,
left_size, right_size });
}

template<std::size_t I>
constexpr auto
apply_left(std::size_t offset, auto &&input_tuple) noexcept {
return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
return invokeProcessOneWithOrWithoutOffset(left, offset, std::get<Is>(std::forward<decltype(input_tuple)>(input_tuple))...);
}(std::make_index_sequence<I>());
}

template<std::size_t I, std::size_t J>
constexpr auto
apply_right(std::size_t offset, auto &&input_tuple, auto &&tmp) noexcept {
return [&]<std::size_t... Is, std::size_t... Js>(std::index_sequence<Is...>, std::index_sequence<Js...>) {
constexpr std::size_t first_offset = traits::block::input_port_types<Left>::size;
constexpr std::size_t second_offset = traits::block::input_port_types<Left>::size + sizeof...(Is);
static_assert(second_offset + sizeof...(Js) == std::tuple_size_v<std::remove_cvref_t<decltype(input_tuple)>>);
return invokeProcessOneWithOrWithoutOffset(right, offset, std::get<first_offset + Is>(std::forward<decltype(input_tuple)>(input_tuple))..., std::forward<decltype(tmp)>(tmp),
std::get<second_offset + Js>(input_tuple)...);
}(std::make_index_sequence<I>(), std::make_index_sequence<J>());
}

public:
using TInputPortTypes = typename traits::block::input_port_types<base>;
using TOutputPortTypes = typename traits::block::output_port_types<base>;
using TReturnType = typename traits::block::return_type<base>;

constexpr MergedBlock(Left l, Right r) : left(std::move(l)), right(std::move(r)) {}

// if the left node (source) implements available_samples (a customization point), then pass the call through
friend constexpr std::size_t
available_samples(const MergedBlock &self) noexcept
requires requires(const Left &l) {
{ available_samples(l) } -> std::same_as<std::size_t>;
}
{
return available_samples(self.left);
}

template<meta::any_simd... Ts>
requires traits::block::can_processOne_simd<Left> and traits::block::can_processOne_simd<Right>
constexpr meta::simdize<TReturnType, meta::simdize_size_v<std::tuple<Ts...>>>
processOne(std::size_t offset, const Ts &...inputs) {
static_assert(traits::block::output_port_types<Left>::size == 1, "TODO: SIMD for multiple output ports not implemented yet");
return apply_right<InId, traits::block::input_port_types<Right>::size() - InId - 1>(offset, std::tie(inputs...),
apply_left<traits::block::input_port_types<Left>::size()>(offset, std::tie(inputs...)));
}

constexpr auto
processOne_simd(std::size_t offset, auto N)
requires traits::block::can_processOne_simd<Right>
{
if constexpr (requires(Left &l) {
{ l.processOne_simd(offset, N) };
}) {
return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(offset, N));
} else if constexpr (requires(Left &l) {
{ l.processOne_simd(N) };
}) {
return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(N));
} else {
using LeftResult = typename traits::block::return_type<Left>;
using V = meta::simdize<LeftResult, N>;
alignas(stdx::memory_alignment_v<V>) LeftResult tmp[V::size()];
for (std::size_t i = 0; i < V::size(); ++i) {
tmp[i] = invokeProcessOneWithOrWithoutOffset(left, offset + i);
}
return invokeProcessOneWithOrWithoutOffset(right, offset, V(tmp, stdx::vector_aligned));
}
}

template<typename... Ts>
// Nicer error messages for the following would be good, but not at the expense of breaking can_processOne_simd.
requires(TInputPortTypes::template are_equal<std::remove_cvref_t<Ts>...>)
constexpr TReturnType
processOne(std::size_t offset, Ts &&...inputs) {
// if (sizeof...(Ts) == 0) we could call `return processOne_simd(integral_constant<size_t, width>)`. But if
// the caller expects to process *one* sample (no inputs for the caller to explicitly
// request simd), and we process more, we risk inconsistencies.
if constexpr (traits::block::output_port_types<Left>::size == 1) {
// only the result from the right node needs to be returned
return apply_right<InId, traits::block::input_port_types<Right>::size() - InId - 1>(offset, std::forward_as_tuple(std::forward<Ts>(inputs)...),
apply_left<traits::block::input_port_types<Left>::size()>(offset, std::forward_as_tuple(
std::forward<Ts>(inputs)...)));

} else {
// left produces a tuple
auto left_out = apply_left<traits::block::input_port_types<Left>::size()>(offset, std::forward_as_tuple(std::forward<Ts>(inputs)...));
auto right_out = apply_right<InId, traits::block::input_port_types<Right>::size() - InId - 1>(offset, std::forward_as_tuple(std::forward<Ts>(inputs)...),
std::move(std::get<OutId>(left_out)));

if constexpr (traits::block::output_port_types<Left>::size == 2 && traits::block::output_port_types<Right>::size == 1) {
return std::make_tuple(std::move(std::get<OutId ^ 1>(left_out)), std::move(right_out));

} else if constexpr (traits::block::output_port_types<Left>::size == 2) {
return std::tuple_cat(std::make_tuple(std::move(std::get<OutId ^ 1>(left_out))), std::move(right_out));

} else if constexpr (traits::block::output_port_types<Right>::size == 1) {
return [&]<std::size_t... Is, std::size_t... Js>(std::index_sequence<Is...>, std::index_sequence<Js...>) {
return std::make_tuple(std::move(std::get<Is>(left_out))..., std::move(std::get<OutId + 1 + Js>(left_out))..., std::move(right_out));
}(std::make_index_sequence<OutId>(), std::make_index_sequence<traits::block::output_port_types<Left>::size - OutId - 1>());

} else {
return [&]<std::size_t... Is, std::size_t... Js, std::size_t... Ks>(std::index_sequence<Is...>, std::index_sequence<Js...>, std::index_sequence<Ks...>) {
return std::make_tuple(std::move(std::get<Is>(left_out))..., std::move(std::get<OutId + 1 + Js>(left_out))..., std::move(std::get<Ks>(right_out)...));
}(std::make_index_sequence<OutId>(), std::make_index_sequence<traits::block::output_port_types<Left>::size - OutId - 1>(), std::make_index_sequence<Right::output_port_types::size>());
}
}
} // end:: processOne

work::Result
work(std::size_t requested_work) noexcept {
return base::work(requested_work);
}
};

template<SourceBlockLike Left, SinkBlockLike Right, std::size_t OutId, std::size_t InId>
inline std::atomic_size_t MergedBlock<Left, Right, OutId, InId>::_unique_id_counter{ 0_UZ };

/**
* This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a
* new `merged` node, bypassing the dynamic run-time buffers.
* Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders
* of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details.
* This function uses the connect-by-port-ID API.
*
* Example:
* @code
* // declare flow-graph: 2 x in -> adder -> scale-by-2 -> scale-by-minus1 -> output
* auto merged = merge_by_index<0, 0>(scale<int, -1>(), merge_by_index<0, 0>(scale<int, 2>(), adder<int>()));
*
* // execute graph
* std::array<int, 4> a = { 1, 2, 3, 4 };
* std::array<int, 4> b = { 10, 10, 10, 10 };
*
* int r = 0;
* for (std::size_t i = 0; i < 4; ++i) {
* r += merged.processOne(a[i], b[i]);
* }
* @endcode
*/
template<std::size_t OutId, std::size_t InId, SourceBlockLike A, SinkBlockLike B>
constexpr auto
mergeByIndex(A &&a, B &&b) -> MergedBlock<std::remove_cvref_t<A>, std::remove_cvref_t<B>, OutId, InId> {
if constexpr (!std::is_same_v<typename traits::block::output_port_types<std::remove_cvref_t<A>>::template at<OutId>,
typename traits::block::input_port_types<std::remove_cvref_t<B>>::template at<InId>>) {
gr::meta::print_types<gr::meta::message_type<"OUTPUT_PORTS_ARE:">, typename traits::block::output_port_types<std::remove_cvref_t<A>>, std::integral_constant<int, OutId>,
typename traits::block::output_port_types<std::remove_cvref_t<A>>::template at<OutId>,

gr::meta::message_type<"INPUT_PORTS_ARE:">, typename traits::block::input_port_types<std::remove_cvref_t<A>>, std::integral_constant<int, InId>,
typename traits::block::input_port_types<std::remove_cvref_t<A>>::template at<InId>>{};
}
return { std::forward<A>(a), std::forward<B>(b) };
}

/**
* This methods can merge simple blocks that are defined via a single `auto processOne(..)` producing a
* new `merged` node, bypassing the dynamic run-time buffers.
* Since the merged node can be highly optimised during compile-time, it's execution performance is usually orders
* of magnitude more efficient than executing a cascade of the same constituent blocks. See the benchmarks for details.
* This function uses the connect-by-port-name API.
*
* Example:
* @code
* // declare flow-graph: 2 x in -> adder -> scale-by-2 -> output
* auto merged = merge<"scaled", "addend1">(scale<int, 2>(), adder<int>());
*
* // execute graph
* std::array<int, 4> a = { 1, 2, 3, 4 };
* std::array<int, 4> b = { 10, 10, 10, 10 };
*
* int r = 0;
* for (std::size_t i = 0; i < 4; ++i) {
* r += merged.processOne(a[i], b[i]);
* }
* @endcode
*/
template<fixed_string OutName, fixed_string InName, SourceBlockLike A, SinkBlockLike B>
constexpr auto
merge(A &&a, B &&b) {
constexpr int OutIdUnchecked = meta::indexForName<OutName, typename traits::block::output_ports<A>>();
constexpr int InIdUnchecked = meta::indexForName<InName, typename traits::block::input_ports<B>>();
static_assert(OutIdUnchecked != -1);
static_assert(InIdUnchecked != -1);
constexpr auto OutId = static_cast<std::size_t>(OutIdUnchecked);
constexpr auto InId = static_cast<std::size_t>(InIdUnchecked);
static_assert(std::same_as<typename traits::block::output_port_types<std::remove_cvref_t<A>>::template at<OutId>,
typename traits::block::input_port_types<std::remove_cvref_t<B>>::template at<InId>>,
"Port types do not match");
return MergedBlock<std::remove_cvref_t<A>, std::remove_cvref_t<B>, OutId, InId>{ std::forward<A>(a), std::forward<B>(b) };
}

#if !DISABLE_SIMD
namespace test { // TODO: move to dedicated tests

struct copy : public Block<copy> {
PortIn<float> in;
PortOut<float> out;

public:
template<meta::t_or_simd<float> V>
[[nodiscard]] constexpr V
processOne(const V &a) const noexcept {
return a;
}
};
} // namespace test
#endif
} // namespace gr

#if !DISABLE_SIMD
ENABLE_REFLECTION(gr::test::copy, in, out);
#endif

namespace gr {

#if !DISABLE_SIMD
namespace test {
static_assert(traits::block::input_port_types<copy>::size() == 1);
static_assert(std::same_as<traits::block::return_type<copy>, float>);
static_assert(traits::block::can_processOne_scalar<copy>);
static_assert(traits::block::can_processOne_simd<copy>);
static_assert(traits::block::can_processOne_scalar_with_offset<decltype(mergeByIndex<0, 0>(copy(), copy()))>);
static_assert(traits::block::can_processOne_simd_with_offset<decltype(mergeByIndex<0, 0>(copy(), copy()))>);
static_assert(SourceBlockLike<copy>);
static_assert(SinkBlockLike<copy>);
static_assert(SourceBlockLike<decltype(mergeByIndex<0, 0>(copy(), copy()))>);
static_assert(SinkBlockLike<decltype(mergeByIndex<0, 0>(copy(), copy()))>);
} // namespace test
#endif

namespace detail {
template<typename... Types>
struct BlockParameters {
Expand Down
Loading

0 comments on commit 6d35466

Please sign in to comment.