Skip to content

Commit

Permalink
ConsumableInputPortRange Tag API (#422)
Browse files Browse the repository at this point in the history
* Port: Tag consumption API
* Port: implement a ranges::view based tag interface to tags

Signed-off-by: Alexander Krimm <[email protected]>
  • Loading branch information
wirew0rm authored Oct 1, 2024
1 parent dad548e commit 1aeeb6a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 120 deletions.
19 changes: 11 additions & 8 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,19 +735,22 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
* @param untilOffset defaults to 0, if bigger merges all tags from samples 0...untilOffset for each port before merging
* them
*/
constexpr void updateInputAndOutputTags(std::size_t /*untilOffset*/ = 0UZ) noexcept {
for_each_port(
[this]<PortLike TPort>(TPort& input_port) noexcept {
constexpr void updateInputAndOutputTags(auto& inputSpans, std::size_t /*untilOffset*/ = 0UZ) noexcept {
gr::meta::tuple_for_each(
[this](const auto& inputSpanOrVector) noexcept {
auto mergeSrcMapInto = [](const property_map& sourceMap, property_map& destinationMap) {
assert(&sourceMap != &destinationMap);
for (const auto& [key, value] : sourceMap) {
destinationMap.insert_or_assign(key, value);
}
};

mergeSrcMapInto(input_port.getMergedTag().map, _mergedInputTag.map);
if constexpr (InputSpan<std::remove_cvref_t<decltype(inputSpanOrVector)>>) {
mergeSrcMapInto(inputSpanOrVector.getMergedTag(0).map, _mergedInputTag.map);
} else {
std::ranges::for_each(inputSpanOrVector, [this, &mergeSrcMapInto](auto& inputSpan) { mergeSrcMapInto(inputSpan.getMergedTag(0).map, _mergedInputTag.map); });
}
},
inputPorts<PortType::STREAM>(&self()));
inputSpans);

if (!mergedInputTag().map.empty()) {
settings().autoUpdate(mergedInputTag()); // apply tags as new settings if matching
Expand Down Expand Up @@ -1464,8 +1467,8 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
const bool isEosTagPresent = nextEosTag <= 0 || nextEosTagSkipBefore < minSyncIn || nextEosTagSkipBefore < input_chunk_size || output_chunk_size * (nextEosTagSkipBefore / input_chunk_size) < minSyncOut;

if (inputSkipBefore > 0) { // consume samples on sync ports that need to be consumed due to the stride
updateInputAndOutputTags(inputSkipBefore); // apply all tags in the skipped data range
const auto inputSpans = prepareStreams(inputPorts<PortType::STREAM>(&self()), inputSkipBefore); // only way to consume is via the ConsumableSpan now
updateInputAndOutputTags(inputSpans, inputSkipBefore); // apply all tags in the skipped data range
consumeReaders(inputSkipBefore, inputSpans);
}
// return if there is no work to be performed // todo: add eos policy
Expand Down Expand Up @@ -1495,7 +1498,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
return {requestedWork, 0UZ, INSUFFICIENT_OUTPUT_ITEMS};
}

updateInputAndOutputTags();
updateInputAndOutputTags(inputSpans);
applyChangedSettings();

copyCachedOutputTags(outputSpans);
Expand Down
16 changes: 10 additions & 6 deletions core/include/gnuradio-4.0/BlockTraits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,14 @@ struct DummyConsumableSpan {
static_assert(ConsumableSpan<DummyConsumableSpan<int>>);

template<typename T>
struct DummyConsumablePortSpan: public DummyConsumableSpan<T> {
DummyConsumableSpan<gr::Tag> tags{};
struct DummyInputSpan: public DummyConsumableSpan<T> {
DummyConsumableSpan<gr::Tag> rawTags{};
auto tags() { return std::views::empty<std::pair<Tag::signed_index_type, const property_map&>>; }
[[nodiscard]] inline Tag getMergedTag(gr::Tag::signed_index_type /*untilLocalIndex*/) const { return {}; }
void consumeTags(gr::Tag::signed_index_type /*untilLocalIndex*/) { }
};
static_assert(ConsumablePortSpan<DummyConsumablePortSpan<int>>);
static_assert(ConsumableSpan<DummyInputSpan<int>>);
static_assert(InputSpan<DummyInputSpan<int>>);

template<typename T>
struct DummyPublishableSpan {
Expand Down Expand Up @@ -377,7 +381,7 @@ constexpr auto* port_to_processBulk_argument_helper() {
if constexpr (isVectorOfSpansReturned) {
return static_cast<std::span<std::span<const typename Port::value_type::value_type>>*>(nullptr);
} else {
return static_cast<std::span<DummyConsumablePortSpan<typename Port::value_type::value_type>>*>(nullptr);
return static_cast<std::span<DummyInputSpan<const typename Port::value_type::value_type>>*>(nullptr);
}
} else if constexpr (Port::value_type::kIsOutput) {
if constexpr (isVectorOfSpansReturned) {
Expand All @@ -389,7 +393,7 @@ constexpr auto* port_to_processBulk_argument_helper() {

} else { // single port
if constexpr (Port::kIsInput) {
return static_cast<DummyConsumablePortSpan<typename Port::value_type>*>(nullptr);
return static_cast<DummyInputSpan<const typename Port::value_type>*>(nullptr);
} else if constexpr (Port::kIsOutput) {
return static_cast<DummyPublishablePortSpan<typename Port::value_type>*>(nullptr);
}
Expand Down Expand Up @@ -431,7 +435,7 @@ concept can_processBulk = can_processBulk_helper<TBlock, detail::port_to_process
* must be std::span<T> and *not* a type satisfying PublishableSpan<T>.
*/
template<typename TDerived, std::size_t I>
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyConsumablePortSpan, traits::block::stream_input_port_types<TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types<TDerived>>::template apply<std::tuple> bad_outputs) {
concept processBulk_requires_ith_output_as_span = can_processBulk<TDerived> && (I < traits::block::stream_output_port_types<TDerived>::size) && (I >= 0) && requires(TDerived& d, typename meta::transform_types<detail::DummyInputSpan, traits::block::stream_input_port_types < TDerived>>::template apply<std::tuple> inputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::dynamic_span, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types < TDerived>>::template apply<std::tuple> outputs, typename meta::transform_conditional<decltype([](auto j) { return j == I; }), detail::nothing_you_ever_wanted, detail::DummyPublishablePortSpan, traits::block::stream_output_port_types < TDerived>>::template apply<std::tuple> bad_outputs) {
{ detail::can_processBulk_invoke_test(d, inputs, outputs, std::make_index_sequence<stream_input_port_types<TDerived>::size>(), std::make_index_sequence<stream_output_port_types<TDerived>::size>()) } -> std::same_as<work::Status>;
// TODO: Is this check redundant?
not requires { []<std::size_t... InIdx, std::size_t... OutIdx>(std::index_sequence<InIdx...>, std::index_sequence<OutIdx...>) -> decltype(d.processBulk(std::get<InIdx>(inputs)..., std::get<OutIdx>(bad_outputs)...)) { return {}; }(std::make_index_sequence<traits::block::stream_input_port_types<TDerived>::size>(), std::make_index_sequence<traits::block::stream_output_port_types<TDerived>::size>()); };
Expand Down
144 changes: 71 additions & 73 deletions core/include/gnuradio-4.0/Port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,27 @@ namespace gr {
*/
struct Async {};

/**
* @brief API for access to the input samples and tags from the process_bulk function.
*
* This concept is used for the input parameters of the process_bulk function to allow:
* - access to the samples (via conversion to std::span)
* - consumption of samples. By default all available samples get consumed.
* - access to tags
* - via a range::view returned by `tag()` which returns the index relative to first sample in this span. The index can be negative for tags that were not consumed before.
* - via rawTags, which gives access to the bare ConsumableSpan<Tag>
* - consumption of tags. By default the tags belonging to up to and including the first sample get consumed. This can be manually changed by calling consumeTags with the index of a stream sample.
* - get a merged tag which contains the data of all tags belonging to up to and including the first sample. Optionally this can be changed to merge all tags until a supplied local stream index.
*/
template<typename T>
concept ConsumablePortSpan = ConsumableSpan<T> && requires(T span) {
requires ConsumableSpan<std::remove_cvref_t<decltype(span.tags)>>;
{ *span.tags.begin() } -> std::same_as<const gr::Tag&>;
concept InputSpan = requires(T span, gr::Tag::signed_index_type n) {
{ span } -> std::ranges::contiguous_range;
{ span.consume(0) };
{ span.rawTags };
requires ConsumableSpan<std::remove_cvref_t<decltype(span.rawTags)>> && std::same_as<gr::Tag, std::ranges::range_value_t<decltype(span.rawTags)>>;
{ span.tags() } -> std::ranges::range;
{ span.consumeTags(n) };
{ span.getMergedTag(n) } -> std::same_as<gr::Tag>;
};

template<typename T>
Expand Down Expand Up @@ -351,33 +368,68 @@ struct Port {
using ReaderSpanType = decltype(std::declval<ReaderType>().template get<spanReleasePolicy>());

template<SpanReleasePolicy spanReleasePolicy>
struct ConsumablePortInputRange : public ReaderSpanType<spanReleasePolicy> {
TagReaderSpanType tags;
struct PortInputSpan : public ReaderSpanType<spanReleasePolicy> {
TagReaderSpanType rawTags;
Tag::signed_index_type streamIndex;

ConsumablePortInputRange(std::size_t nSamples, ReaderType& streamReader, TagReaderSpanType _tags, Tag::signed_index_type _currentStreamOffset) : ReaderSpanType<spanReleasePolicy>(streamReader.template get<spanReleasePolicy>(nSamples)), tags(_tags), streamIndex{_currentStreamOffset} {};
PortInputSpan(std::size_t nSamples, ReaderType& reader, TagReaderType& tagReader) : ReaderSpanType<spanReleasePolicy>(reader.template get<spanReleasePolicy>(nSamples)), rawTags(getTags(static_cast<gr::Tag::signed_index_type>(nSamples), tagReader, reader.position())), streamIndex{reader.position()} {};

~ConsumablePortInputRange() {
~PortInputSpan() override {
if (ReaderSpanType<spanReleasePolicy>::instanceCount() == 1UZ) { // has to be one, because the parent destructor which decrements it to zero is only called afterward
if (tags.isConsumeRequested()) { // the user has already manually consumed tags
if (rawTags.isConsumeRequested()) { // the user has already manually consumed tags
return;
}
if ((ReaderSpanType<spanReleasePolicy>::isConsumeRequested() && ReaderSpanType<spanReleasePolicy>::nRequestedSamplesToConsume() == 0) || this->empty()) {
return; // no samples to be consumed -> do not consume any tags
}
std::size_t tagsToConsume = 0;
for (auto& t : tags) {
if (t.index <= streamIndex) {
tagsToConsume++;
} else {
break;
consumeTags(0); // consume all tags including the one on the first sample
}
}

[[nodiscard]] auto tags() {
return std::views::transform(rawTags, [this](auto &tag) { return std::make_pair(std::max(tag.index, 0l) - streamIndex, std::ref(tag.map)); });
}

void consumeTags(gr::Tag::signed_index_type untilLocalIndex) {
std::size_t tagsToConsume = static_cast<std::size_t>(std::ranges::count_if(
rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }),
[](auto /*v*/) {return true;} ));
std::ignore = rawTags.tryConsume(tagsToConsume);
}

[[nodiscard]] inline Tag getMergedTag(gr::Tag::signed_index_type untilLocalIndex = 0) const {
auto mergeSrcMapInto = [](const property_map& sourceMap, property_map& destinationMap) {
assert(&sourceMap != &destinationMap);
for (const auto& [key, value] : sourceMap) {
destinationMap.insert_or_assign(key, value);
}
};
Tag result{-1, {}};
std::ranges::for_each(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }),
[&mergeSrcMapInto, &result](const Tag& tag) { mergeSrcMapInto(tag.map, result.map); });
return result;
}

private:
auto getTags(gr::Tag::signed_index_type nSamples, TagReaderType& reader, gr::Tag::signed_index_type _currentStreamOffset) {
std::size_t nTagsProcessed = 0UZ;
bool properTagDistance = false;
for (const Tag& tag : reader.get(reader.available())) {
const bool tagIsWithinRange = (tag.index != -1) && tag.index < _currentStreamOffset + nSamples;
if ((!properTagDistance && tag.index < 0) || tagIsWithinRange) { // 'index == -1' wildcard Tag index -> process unconditionally
nTagsProcessed++;
if (tagIsWithinRange) { // detected regular Tag position, ignore and stop at further wildcard Tags
properTagDistance = true;
}
} else {
break; // Tag is wildcard (index == -1) after a regular or newer than the present reading position (+ offset)
}
std::ignore = tags.tryConsume(tagsToConsume); // this is a default fallback: tags may be both explicitly and implicitly consumed
}
return reader.get(nTagsProcessed);
}
}; // end of ConsumablePortInputRange
static_assert(ConsumablePortSpan<ConsumablePortInputRange<gr::SpanReleasePolicy::ProcessAll>>);
static_assert(ConsumableSpan<PortInputSpan<gr::SpanReleasePolicy::ProcessAll>>);
static_assert(InputSpan<PortInputSpan<gr::SpanReleasePolicy::ProcessAll>>);

template<SpanReleasePolicy spanReleasePolicy>
using WriterSpanType = decltype(std::declval<WriterType>().template reserve<spanReleasePolicy>(1UZ));
Expand Down Expand Up @@ -435,9 +487,6 @@ struct Port {

static_assert(PublishablePortSpan<PublishablePortOutputRange<gr::SpanReleasePolicy::ProcessAll, PublishableSpanReservePolicy::Reserve>>);

std::span<const Tag> tags; // Range of tags for the currently processed stream range; only used in input ports
Tag::signed_index_type streamIndex{}; // Absolute offset of the first sample in the currently processed stream span; only used in input ports

private:
IoType _ioHandler = newIoHandler();
TagIoType _tagIoHandler = newTagIoHandler();
Expand Down Expand Up @@ -645,13 +694,10 @@ struct Port {
}

template<SpanReleasePolicy spanReleasePolicy>
ConsumablePortInputRange<spanReleasePolicy> get(std::size_t nSamples)
PortInputSpan<spanReleasePolicy> get(std::size_t nSamples)
requires(kIsInput)
{
auto taggedStream = ConsumablePortInputRange<spanReleasePolicy>(nSamples, streamReader(), getTags(static_cast<gr::Tag::signed_index_type>(nSamples)), streamReader().position());
tags = std::span(taggedStream.tags.data(), taggedStream.tags.size());
streamIndex = streamReader().position();
return taggedStream;
return PortInputSpan<spanReleasePolicy>(nSamples, streamReader(), tagReader());
}

template<SpanReleasePolicy spanReleasePolicy>
Expand All @@ -668,52 +714,6 @@ struct Port {
return PublishablePortOutputRange<spanReleasePolicy, PublishableSpanReservePolicy::TryReserve>(nSamples, streamWriter(), tagWriter(), streamWriter().position());
}

/**
* @return get all (incl. past unconsumed) tags () until the read-position + optional offset
*/
inline constexpr ConsumableSpan auto getTags(Tag::signed_index_type untilOffset = 0) noexcept
requires(kIsInput)
{
const auto readPos = streamReader().position();
const auto inputTags = tagReader().get(); // N.B. returns all old/available/pending tags
std::size_t nTagsProcessed = 0UZ;
bool properTagDistance = false;

for (const Tag& tag : inputTags) {
const auto relativeTagPosition = (tag.index - readPos); // w.r.t. present stream reader position
const bool tagIsWithinRange = (tag.index != -1) && relativeTagPosition < untilOffset;
if ((!properTagDistance && tag.index < 0) || tagIsWithinRange) { // 'index == -1' wildcard Tag index -> process unconditionally
nTagsProcessed++;
if (tagIsWithinRange) { // detected regular Tag position, ignore and stop at further wildcard Tags
properTagDistance = true;
}
} else {
break; // Tag is wildcard (index == -1) after a regular or newer than the present reading position (+ offset)
}
}
return tagReader().get(nTagsProcessed);
}

inline const Tag getMergedTag() {
Tag result{};

auto mergeSrcMapInto = [](const property_map& sourceMap, property_map& destinationMap) {
assert(&sourceMap != &destinationMap);
for (const auto& [key, value] : sourceMap) {
destinationMap.insert_or_assign(key, value);
}
};

result.index = -1;
std::ranges::for_each(tags, [&mergeSrcMapInto, &result, this](const Tag& tag) {
if (tag.index <= streamIndex) {
mergeSrcMapInto(tag.map, result.map);
}
});

return result;
}

inline constexpr void publishTag(property_map&& tag_data, Tag::signed_index_type tagOffset = -1) noexcept
requires(kIsOutput)
{
Expand Down Expand Up @@ -977,9 +977,7 @@ class DynamicPort {
DynamicPort(DynamicPort&& arg) = default;
DynamicPort& operator=(DynamicPort&& arg) = delete;

// TODO: The lifetime of ports is a problem here, if we keep
// a reference to the port in DynamicPort, the port object
// can not be reallocated
// TODO: The lifetime of ports is a problem here, if we keep a reference to the port in DynamicPort, the port object/ can not be reallocated
template<PortLike T>
explicit constexpr DynamicPort(T& arg, non_owned_reference_tag) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{std::make_unique<PortWrapper<T, false>>(arg)} {}

Expand Down
Loading

0 comments on commit 1aeeb6a

Please sign in to comment.