Skip to content

Commit

Permalink
Port: implement a ranges::view based tag interface to tags
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Krimm <[email protected]>
  • Loading branch information
wirew0rm committed Sep 27, 2024
1 parent 4e3d0e9 commit 60a47b6
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 67 deletions.
6 changes: 3 additions & 3 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,10 +813,10 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
destinationMap.insert_or_assign(key, value);
}
};
if constexpr (ConsumablePortSpan<std::remove_cvref_t<decltype(inputSpanOrVector)>>) {
mergeSrcMapInto(inputSpanOrVector.getMergedTag().map, _mergedInputTag.map);
if constexpr (InputSpan<std::remove_cvref_t<decltype(inputSpanOrVector)>>) {
mergeSrcMapInto(inputSpanOrVector.getMergedTag(0).map, _mergedInputTag.map);
} else {
std::ranges::for_each(inputSpanOrVector, [this, &mergeSrcMapInto](auto& inputSpan) { mergeSrcMapInto(inputSpan.getMergedTag().map, _mergedInputTag.map); });
std::ranges::for_each(inputSpanOrVector, [this, &mergeSrcMapInto](auto& inputSpan) { mergeSrcMapInto(inputSpan.getMergedTag(0).map, _mergedInputTag.map); });
}
},
inputSpans);
Expand Down
16 changes: 10 additions & 6 deletions core/include/gnuradio-4.0/BlockTraits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,14 @@ struct DummyConsumableSpan {
static_assert(ConsumableSpan<DummyConsumableSpan<int>>);

template<typename T>
struct DummyConsumablePortSpan: public DummyConsumableSpan<T> {
DummyConsumableSpan<gr::Tag> tags{};
struct DummyInputSpan: public DummyConsumableSpan<T> {
DummyConsumableSpan<gr::Tag> rawTags{};
auto tags() { return std::views::empty<std::pair<Tag::signed_index_type, const property_map&>>; }
[[nodiscard]] inline Tag getMergedTag(gr::Tag::signed_index_type /*untilLocalIndex*/) const { return {}; }
void consumeTags(gr::Tag::signed_index_type /*untilLocalIndex*/) { }
};
static_assert(ConsumablePortSpan<DummyConsumablePortSpan<int>>);
static_assert(ConsumableSpan<DummyInputSpan<int>>);
static_assert(InputSpan<DummyInputSpan<int>>);

template<typename T>
struct DummyPublishableSpan {
Expand Down Expand Up @@ -386,7 +390,7 @@ constexpr auto* port_to_processBulk_argument_helper() {
if constexpr (isVectorOfSpansReturned) {
return static_cast<std::span<std::span<const typename Port::value_type::value_type>>*>(nullptr);
} else {
return static_cast<std::span<DummyConsumablePortSpan<typename Port::value_type::value_type>>*>(nullptr);
return static_cast<std::span<DummyInputSpan<typename Port::value_type::value_type>>*>(nullptr);
}
} else if constexpr (Port::value_type::kIsOutput) {
if constexpr (isVectorOfSpansReturned) {
Expand All @@ -398,7 +402,7 @@ constexpr auto* port_to_processBulk_argument_helper() {

} else { // single port
if constexpr (Port::kIsInput) {
return static_cast<DummyConsumablePortSpan<typename Port::value_type>*>(nullptr);
return static_cast<DummyInputSpan<typename Port::value_type>*>(nullptr);
} else if constexpr (Port::kIsOutput) {
return static_cast<DummyPublishableSpan<typename Port::value_type>*>(nullptr);
}
Expand Down Expand Up @@ -440,7 +444,7 @@ concept can_processBulk = can_processBulk_helper<TBlock, detail::port_to_process
* must be std::span<T> and *not* a type satisfying PublishableSpan<T>.
*/
template<typename TDerived, std::size_t I>
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyConsumablePortSpan, traits::block::stream_input_port_types<TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishableSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishableSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> bad_outputs) {
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyInputSpan, traits::block::stream_input_port_types < TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishableSpan, traits::block::stream_output_port_types < TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishableSpan, traits::block::stream_output_port_types < TDerived>>::template apply<std::tuple> bad_outputs) {
{ detail::can_processBulk_invoke_test(d, inputs, outputs, std::make_index_sequence<stream_input_port_types<TDerived>::size>(), std::make_index_sequence<stream_output_port_types<TDerived>::size>()) } -> std::same_as<work::Status>;
// TODO: Is this check redundant?
not requires { []<std::size_t... InIdx, std::size_t... OutIdx>(std::index_sequence<InIdx...>, std::index_sequence<OutIdx...>) -> decltype(d.processBulk(std::get<InIdx>(inputs)..., std::get<OutIdx>(bad_outputs)...)) { return {}; }(std::make_index_sequence<traits::block::stream_input_port_types<TDerived>::size>(), std::make_index_sequence<traits::block::stream_output_port_types<TDerived>::size>()); };
Expand Down
71 changes: 41 additions & 30 deletions core/include/gnuradio-4.0/Port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,27 @@ namespace gr {
*/
struct Async {};

/**
* @brief API for access to the input samples and tags from the process_bulk function.
*
* This concept is used for the input parameters of the process_bulk function to allow:
* - access to the samples (via conversion to std::span)
* - consumption of samples. By default all available samples get consumed.
* - access to tags
* - via a range::view returned by `tag()` which returns the index relative to first sample in this span. The index can be negative for tags that were not consumed before.
* - via rawTags, which gives access to the bare ConsumableSpan<Tag>
* - consumption of tags. By default the tags belonging to up to and including the first sample get consumed. This can be manually changed by calling consumeTags with the index of a stream sample.
* - get a merged tag which contains the data of all tags belonging to up to and including the first sample. Optionally this can be changed to merge all tags until a supplied local stream index.
*/
template<typename T>
concept ConsumablePortSpan = ConsumableSpan<T> && requires(T span) {
requires ConsumableSpan<std::remove_cvref_t<decltype(span.tags)>>;
{ *span.tags.begin() } -> std::same_as<const gr::Tag&>;
concept InputSpan = requires(T span, gr::Tag::signed_index_type n) {
{ span } -> std::ranges::contiguous_range;
{ span.consume(0) };
{ span.rawTags };
requires ConsumableSpan<std::remove_cvref_t<decltype(span.rawTags)>> && std::same_as<gr::Tag, std::ranges::range_value_t<decltype(span.rawTags)>>;
{ span.tags() } -> std::ranges::range;
{ span.consumeTags(n) };
{ span.getMergedTag(n) } -> std::same_as<gr::Tag>;
};

/**
Expand Down Expand Up @@ -343,15 +360,15 @@ struct Port {
using ReaderSpanType = decltype(std::declval<ReaderType>().template get<spanReleasePolicy>());

template<SpanReleasePolicy spanReleasePolicy>
struct ConsumablePortInputRange : public ReaderSpanType<spanReleasePolicy> {
TagReaderSpanType tags;
struct PortInputSpan : public ReaderSpanType<spanReleasePolicy> {
TagReaderSpanType rawTags;
Tag::signed_index_type streamIndex;

ConsumablePortInputRange(std::size_t nSamples, ReaderType& reader, TagReaderType& tagReader) : ReaderSpanType<spanReleasePolicy>(reader.template get<spanReleasePolicy>(nSamples)), tags(getTags(static_cast<gr::Tag::signed_index_type>(nSamples), tagReader, reader.position())), streamIndex{reader.position()} {};
PortInputSpan(std::size_t nSamples, ReaderType& reader, TagReaderType& tagReader) : ReaderSpanType<spanReleasePolicy>(reader.template get<spanReleasePolicy>(nSamples)), rawTags(getTags(static_cast<gr::Tag::signed_index_type>(nSamples), tagReader, reader.position())), streamIndex{reader.position()} {};

~ConsumablePortInputRange() override {
~PortInputSpan() override {
if (ReaderSpanType<spanReleasePolicy>::instanceCount() == 1UZ) { // has to be one, because the parent destructor which decrements it to zero is only called afterward
if (tags.isConsumeRequested()) { // the user has already manually consumed tags
if (rawTags.isConsumeRequested()) { // the user has already manually consumed tags
return;
}
if ((ReaderSpanType<spanReleasePolicy>::isConsumeRequested() && ReaderSpanType<spanReleasePolicy>::getConsumeRequested() == 0) || this->empty()) {
Expand All @@ -361,34 +378,27 @@ struct Port {
}
}

[[nodiscard]] auto tags() {
return std::views::transform(rawTags, [this](auto &tag) { return std::make_pair(std::max(tag.index, 0l) - streamIndex, std::ref(tag.map)); });
}

void consumeTags(gr::Tag::signed_index_type untilLocalIndex) {
std::size_t tagsToConsume = 0;
for (auto& t : tags) {
if (t.index <= streamIndex + untilLocalIndex) {
tagsToConsume++;
} else {
break;
}
}
std::ignore = tags.tryConsume(tagsToConsume); // todo: allow to alternatively do tags.tryConsume(nSamples)
std::size_t tagsToConsume = static_cast<std::size_t>(std::ranges::count_if(
rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }),
[](auto /*v*/) {return true;} ));
std::ignore = rawTags.tryConsume(tagsToConsume);
}

[[nodiscard]] inline Tag getMergedTag() const {
Tag result{};
[[nodiscard]] inline Tag getMergedTag(gr::Tag::signed_index_type untilLocalIndex = 0) const {
auto mergeSrcMapInto = [](const property_map& sourceMap, property_map& destinationMap) {
assert(&sourceMap != &destinationMap);
for (const auto& [key, value] : sourceMap) {
destinationMap.insert_or_assign(key, value);
}
};

result.index = -1;
std::ranges::for_each(tags, [&mergeSrcMapInto, &result, this](const Tag& tag) {
if (tag.index <= streamIndex) {
mergeSrcMapInto(tag.map, result.map);
}
});

Tag result{-1, {}};
std::ranges::for_each(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }),
[&mergeSrcMapInto, &result](const Tag& tag) { mergeSrcMapInto(tag.map, result.map); });
return result;
}

Expand All @@ -411,7 +421,8 @@ struct Port {
}
};

static_assert(ConsumablePortSpan<ConsumablePortInputRange<gr::SpanReleasePolicy::ProcessAll>>);
static_assert(ConsumableSpan<PortInputSpan<gr::SpanReleasePolicy::ProcessAll>>);
static_assert(InputSpan<PortInputSpan<gr::SpanReleasePolicy::ProcessAll>>);

private:
IoType _ioHandler = newIoHandler();
Expand Down Expand Up @@ -620,10 +631,10 @@ struct Port {
}

template<SpanReleasePolicy spanReleasePolicy>
ConsumablePortInputRange<spanReleasePolicy> get(std::size_t nSamples)
PortInputSpan<spanReleasePolicy> get(std::size_t nSamples)
requires(kIsInput)
{
return ConsumablePortInputRange<spanReleasePolicy>(nSamples, streamReader(), tagReader());
return PortInputSpan<spanReleasePolicy>(nSamples, streamReader(), tagReader());
}

template<SpanReleasePolicy TSpanReleasePolicy>
Expand Down
10 changes: 5 additions & 5 deletions core/test/qa_Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,12 +852,12 @@ const boost::ut::suite<"Requested Work Tests"> _requestedWorkTests = [] {
expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(block).template to<"in">(sink)));

graph.reconnectAllEdges();
auto blockInit = [](auto& block) {
if (block.state() == lifecycle::State::IDLE) {
std::ignore = block.changeStateTo(lifecycle::State::INITIALISED);
auto blockInit = [](auto& blockToInit) {
if (blockToInit.state() == lifecycle::State::IDLE) {
std::ignore = blockToInit.changeStateTo(lifecycle::State::INITIALISED);
}
std::ignore = block.changeStateTo(lifecycle::State::RUNNING);
expect(block.state() == lifecycle::State::RUNNING);
std::ignore = blockToInit.changeStateTo(lifecycle::State::RUNNING);
expect(blockToInit.state() == lifecycle::State::RUNNING);
};
blockInit(src);
blockInit(block);
Expand Down
49 changes: 29 additions & 20 deletions core/test/qa_Port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,53 @@ const boost::ut::suite PortTests = [] {
auto writer = in.buffer().streamBuffer.new_writer();
auto tagWriter = in.buffer().tagBuffer.new_writer();
{ // put testdata into buffer
auto writeSpan = writer.tryReserve<SpanReleasePolicy::ProcessAll>(5);
auto writeSpan = writer.tryReserve<SpanReleasePolicy::ProcessAll>(6);
auto tagSpan = tagWriter.tryReserve(5);
expect(eq(writeSpan.size(), 5UZ));
expect(eq(writeSpan.size(), 6UZ));
expect(eq(tagSpan.size(), 5UZ));
tagSpan[0] = {-1, {{"id", "tag@-1"}, {"id0", true}}};
tagSpan[1] = {1, {{"id", "tag@101"}, {"id1", true}}};
tagSpan[2] = {2, {{"id", "tag@102"}, {"id2", true}}};
tagSpan[3] = {3, {{"id", "tag@103"}, {"id3", true}}};
tagSpan[4] = {4, {{"id", "tag@104"}, {"id4", true}}};
tagSpan[2] = {3, {{"id", "tag@103"}, {"id3", true}}};
tagSpan[3] = {4, {{"id", "tag@104"}, {"id4", true}}};
tagSpan[4] = {5, {{"id", "tag@105"}, {"id5", true}}};
std::iota(writeSpan.begin(), writeSpan.end(), 100);
tagSpan.publish(5); // this should not be necessary as the ProcessAll policy should publish automatically
writeSpan.publish(5); // this should not be necessary as the ProcessAll policy should publish automatically
writeSpan.publish(6); // this should not be necessary as the ProcessAll policy should publish automatically
}
{ // partial consume
auto data = in.get<SpanReleasePolicy::ProcessAll>(5);
// fmt::print("idx: {}: data: {}\ntags: {}\nmergedTag: {}\n", data.streamIndex, std::span(data), in.tags, in.getMergedTag());
expect(std::ranges::equal(data.tags, std::vector<gr::Tag>{{-1, {{"id", "tag@-1"}, {"id0", true}}}, {1, {{"id", "tag@101"}, {"id1", true}}}, {2, {{"id", "tag@102"}, {"id2", true}}}, {3, {{"id", "tag@103"}, {"id3", true}}}, {4, {{"id", "tag@104"}, {"id4", true}}}}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::take(5)));
auto data = in.get<SpanReleasePolicy::ProcessAll>(6);
expect(std::ranges::equal(data.rawTags, std::vector<gr::Tag>{{-1, {{"id", "tag@-1"}, {"id0", true}}}, {1, {{"id", "tag@101"}, {"id1", true}}}, {3, {{"id", "tag@103"}, {"id3", true}}}, {4, {{"id", "tag@104"}, {"id4", true}}}, {5, {{"id", "tag@105"}, {"id5", true}}}}));
expect(std::ranges::equal(data.tags(), std::vector{
std::make_pair(0, gr::property_map{{"id", "tag@-1"}, {"id0", true}}),
std::make_pair(1, gr::property_map{{"id", "tag@101"}, {"id1", true}}),
std::make_pair(3, gr::property_map{{"id", "tag@103"}, {"id3", true}}),
std::make_pair(4, gr::property_map{{"id", "tag@104"}, {"id4", true}}),
std::make_pair(5, gr::property_map{{"id", "tag@105"}, {"id5", true}})
}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::take(6)));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@-1"}, {"id0", true}}});
expect(data.consume(2));
expect(data.consume(3));
}
{ // full consume
auto data = in.get<SpanReleasePolicy::ProcessAll>(2);
expect(std::ranges::equal(data.tags, std::vector<gr::Tag>{{1, {{"id", "tag@101"}, {"id1", true}}}, {2, {{"id", "tag@102"}, {"id2", true}}}, {3, {{"id", "tag@103"}, {"id3", true}}}}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::drop(2) | std::views::take(2)));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@102"}, {"id1", true}, {"id2", true}}});
expect(std::ranges::equal(data.rawTags, std::vector<gr::Tag>{{1, {{"id", "tag@101"}, {"id1", true}}}, {3, {{"id", "tag@103"}, {"id3", true}}}, {4, {{"id", "tag@104"}, {"id4", true}}}}));
expect(std::ranges::equal(data.tags(), std::vector{std::make_pair(-2, gr::property_map{{"id", "tag@101"}, {"id1", true}}), std::make_pair(0, gr::property_map{{"id", "tag@103"}, {"id3", true}}), std::make_pair(1, gr::property_map{{"id", "tag@104"}, {"id4", true}})}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::drop(3) | std::views::take(2)));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@103"}, {"id1", true}, {"id3", true}}});
}
{ // get empty range
auto data = in.get<SpanReleasePolicy::ProcessAll>(0);
expect(std::ranges::equal(data.tags, std::vector<gr::Tag>{{3, {{"id", "tag@103"}, {"id3", true}}}}));
expect(std::ranges::equal(data, std::vector<int>()));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@103"}, {"id3", true}}});
expect(std::ranges::equal(data.rawTags, std::vector<gr::Tag>{{4, {{"id", "tag@104"}, {"id4", true}}}}));
expect(std::ranges::equal(data.tags(), std::vector{std::make_pair(-1, gr::property_map{{"id", "tag@104"}, {"id4", true}})}));
expect(std::ranges::equal(data, std::ranges::empty_view<int>()));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@104"}, {"id4", true}}});
}
{ // get last sample
auto data = in.get<SpanReleasePolicy::ProcessAll>(1);
expect(std::ranges::equal(data.tags, std::vector<gr::Tag>{{3, {{"id", "tag@103"}, {"id3", true}}}, {4, {{"id", "tag@104"}, {"id4", true}}}}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::drop(4) | std::views::take(1)));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@104"}, {"id3", true}, {"id4", true}}});
expect(std::ranges::equal(data.rawTags, std::vector<gr::Tag>{{4, {{"id", "tag@104"}, {"id4", true}}}, {5, {{"id", "tag@105"}, {"id5", true}}}}));
expect(std::ranges::equal(data.tags(), std::vector{std::make_pair(-1, gr::property_map{{"id", "tag@104"}, {"id4", true}}), std::make_pair(0, property_map {{"id", "tag@105"}, {"id5", true}})}));
expect(std::ranges::equal(data, std::views::iota(100) | std::views::drop(5) | std::views::take(1)));
expect(data.getMergedTag() == gr::Tag{-1, {{"id", "tag@105"}, {"id4", true}, {"id5", true}}});
}
};

Expand Down
Loading

0 comments on commit 60a47b6

Please sign in to comment.