From f9c2416a0b72b5eaff08772d356a22d5e291e8b4 Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Mon, 16 Oct 2023 10:38:14 +0200 Subject: [PATCH] Renamed data_sink -> DataSink, data_sink_registry -> DataSinkRegistry --- CMakeLists.txt | 4 +- .../basic/{data_sink.hpp => DataSink.hpp} | 530 +++++++++--------- blocks/basic/test/CMakeLists.txt | 2 +- .../{qa_data_sink.cpp => qa_DataSink.cpp} | 412 +++++++------- 4 files changed, 474 insertions(+), 474 deletions(-) rename blocks/basic/include/gnuradio-4.0/basic/{data_sink.hpp => DataSink.hpp} (56%) rename blocks/basic/test/{qa_data_sink.cpp => qa_DataSink.cpp} (56%) diff --git a/CMakeLists.txt b/CMakeLists.txt index c82fa20a5..154f27b44 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -153,13 +153,13 @@ if (ENABLE_TESTING AND UNIX AND NOT APPLE) NAME coverage EXECUTABLE ctest EXECUTABLE_ARGS "--output-on-failure" - DEPENDENCIES qa_buffer qa_data_sink qa_DynamicPort qa_DynamicBlock qa_HierBlock qa_filter qa_Settings qa_Tags qa_Scheduler qa_thread_pool qa_thread_affinity + DEPENDENCIES qa_buffer qa_DataSink qa_DynamicPort qa_DynamicBlock qa_HierBlock qa_filter qa_Settings qa_Tags qa_Scheduler qa_thread_pool qa_thread_affinity EXCLUDE "$CMAKE_BUILD_DIR/*") setup_target_for_coverage_gcovr_html( NAME coverage_html EXECUTABLE ctest EXECUTABLE_ARGS "--output-on-failure" - DEPENDENCIES qa_buffer qa_data_sink qa_DynamicPort qa_DynamicBlock qa_HierBlock qa_filter qa_Settings qa_Tags qa_Scheduler qa_thread_pool qa_thread_affinity + DEPENDENCIES qa_buffer qa_DataSink qa_DynamicPort qa_DynamicBlock qa_HierBlock qa_filter qa_Settings qa_Tags qa_Scheduler qa_thread_pool qa_thread_affinity EXCLUDE "$CMAKE_BUILD_DIR/*") endif () message("Building Tests and benchmarks.") diff --git a/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp similarity index 56% rename from blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp rename to blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp index dadd7e94c..47c1e6eba 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/data_sink.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp @@ -14,16 +14,16 @@ namespace gr::basic { -enum class blocking_mode { NonBlocking, Blocking }; +enum class BlockingMode { NonBlocking, Blocking }; -enum class trigger_match_result { +enum class TriggerMatchResult { Matching, ///< Start a new dataset NotMatching, ///< Finish dataset Ignore ///< Ignore tag }; template -class data_sink; +class DataSink; // Until clang-format can handle concepts // clang-format off @@ -35,7 +35,7 @@ concept DataSetCallback = std::invocable>; * Stream callback functions receive the span of data, with optional tags and reference to the sink. */ template -concept StreamCallback = std::invocable> || std::invocable, std::span> || std::invocable, std::span, const data_sink&>; +concept StreamCallback = std::invocable> || std::invocable, std::span> || std::invocable, std::span, const DataSink&>; /** * Used for testing whether a tag should trigger data acquisition. @@ -46,7 +46,7 @@ concept StreamCallback = std::invocable> || std::invocable * @code * auto matcher = [](const auto &tag) { * const auto is_trigger = ...check if tag is trigger...; - * return is_trigger ? trigger_match_result::Matching : trigger_match_result::Ignore; + * return is_trigger ? TriggerMatchResult::Matching : TriggerMatchResult::Ignore; * }; * @endcode * @@ -66,68 +66,68 @@ concept StreamCallback = std::invocable> || std::invocable * struct color_matcher { * matcher_result operator()(const Tag &tag) { * if (tag == green || tag == yellow) { - * return trigger_match_result::Matching; + * return TriggerMatchResult::Matching; * } * if (tag == red) { - * return trigger_match_result::NotMatching; + * return TriggerMatchResult::NotMatching; * } * - * return trigger_match_result::Ignore; + * return TriggerMatchResult::Ignore; * } * }; * @endcode * - * @see trigger_match_result + * @see TriggerMatchResult */ template concept TriggerMatcher = requires(T matcher, Tag tag) { - { matcher(tag) } -> std::convertible_to; + { matcher(tag) } -> std::convertible_to; }; // clang-format on -struct data_sink_query { +struct DataSinkQuery { std::optional _sink_name; std::optional _signal_name; - static data_sink_query - signal_name(std::string_view name) { + static DataSinkQuery + signalName(std::string_view name) { return { {}, std::string{ name } }; } - static data_sink_query - sink_name(std::string_view name) { + static DataSinkQuery + sinkName(std::string_view name) { return { std::string{ name }, {} }; } }; -class data_sink_registry { +class DataSinkRegistry { std::mutex _mutex; std::vector _sinks; public: // TODO this shouldn't be a singleton but associated with the flow graph (?) // TODO reconsider mutex usage when moving to the graph - static data_sink_registry & + static DataSinkRegistry & instance() { - static data_sink_registry s_instance; + static DataSinkRegistry s_instance; return s_instance; } template void - register_sink(data_sink *sink) { + registerSink(DataSink *sink) { std::lock_guard lg{ _mutex }; _sinks.push_back(sink); } template void - unregister_sink(data_sink *sink) { + unregisterSink(DataSink *sink) { std::lock_guard lg{ _mutex }; std::erase_if(_sinks, [sink](const std::any &v) { try { - return std::any_cast *>(v) == sink; + return std::any_cast *>(v) == sink; } catch (...) { return false; } @@ -135,99 +135,99 @@ class data_sink_registry { } template - std::shared_ptr::poller> - get_streaming_poller(const data_sink_query &query, blocking_mode block = blocking_mode::Blocking) { + std::shared_ptr::poller> + getStreamingPoller(const DataSinkQuery &query, BlockingMode block = BlockingMode::Blocking) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); - return sink ? sink->get_streaming_poller(block) : nullptr; + auto sink = findSink(query); + return sink ? sink->getStreamingPoller(block) : nullptr; } template - std::shared_ptr::dataset_poller> - get_trigger_poller(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { + std::shared_ptr::DataSetPoller> + getTriggerPoller(const DataSinkQuery &query, M &&matcher, std::size_t preSamples, std::size_t postSamples, BlockingMode block = BlockingMode::Blocking) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); - return sink ? sink->get_trigger_poller(std::forward(matcher), pre_samples, post_samples, block) : nullptr; + auto sink = findSink(query); + return sink ? sink->getTriggerPoller(std::forward(matcher), preSamples, postSamples, block) : nullptr; } template - std::shared_ptr::dataset_poller> - get_multiplexed_poller(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { + std::shared_ptr::DataSetPoller> + getMultiplexedPoller(const DataSinkQuery &query, M &&matcher, std::size_t maximumWindowSize, BlockingMode block = BlockingMode::Blocking) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); - return sink ? sink->get_multiplexed_poller(std::forward(matcher), maximum_window_size, block) : nullptr; + auto sink = findSink(query); + return sink ? sink->getMultiplexedPoller(std::forward(matcher), maximumWindowSize, block) : nullptr; } template - std::shared_ptr::dataset_poller> - get_snapshot_poller(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { + std::shared_ptr::DataSetPoller> + getSnapshotPoller(const DataSinkQuery &query, M &&matcher, std::chrono::nanoseconds delay, BlockingMode block = BlockingMode::Blocking) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); - return sink ? sink->get_snapshot_poller(std::forward(matcher), delay, block) : nullptr; + auto sink = findSink(query); + return sink ? sink->getSnapshotPoller(std::forward(matcher), delay, block) : nullptr; } template Callback> bool - register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback &&callback) { + registerStreamingCallback(const DataSinkQuery &query, std::size_t maxChunkSize, Callback &&callback) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); + auto sink = findSink(query); if (!sink) { return false; } - sink->register_streaming_callback(max_chunk_size, std::forward(callback)); + sink->registerStreamingCallback(maxChunkSize, std::forward(callback)); return true; } template Callback, TriggerMatcher M> bool - register_trigger_callback(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { + registerTriggerCallback(const DataSinkQuery &query, M &&matcher, std::size_t preSamples, std::size_t postSamples, Callback &&callback) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); + auto sink = findSink(query); if (!sink) { return false; } - sink->register_trigger_callback(std::forward(matcher), pre_samples, post_samples, std::forward(callback)); + sink->registerTriggerCallback(std::forward(matcher), preSamples, postSamples, std::forward(callback)); return true; } template Callback, TriggerMatcher M> bool - register_multiplexed_callback(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, Callback &&callback) { + registerMultiplexedCallback(const DataSinkQuery &query, M &&matcher, std::size_t maximumWindowSize, Callback &&callback) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); + auto sink = findSink(query); if (!sink) { return false; } - sink->register_multiplexed_callback(std::forward(matcher), maximum_window_size, std::forward(callback)); + sink->registerMultiplexedCallback(std::forward(matcher), maximumWindowSize, std::forward(callback)); return true; } template Callback, TriggerMatcher M> bool - register_snapshot_callback(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { + registerSnapshotCallback(const DataSinkQuery &query, M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg{ _mutex }; - auto sink = find_sink(query); + auto sink = findSink(query); if (!sink) { return false; } - sink->register_snapshot_callback(std::forward(matcher), delay, std::forward(callback)); + sink->registerSnapshotCallback(std::forward(matcher), delay, std::forward(callback)); return true; } private: template - data_sink * - find_sink(const data_sink_query &query) { + DataSink * + findSink(const DataSinkQuery &query) { auto matches = [&query](const std::any &v) { try { - auto sink = std::any_cast *>(v); - const auto sink_name_matches = !query._sink_name || *query._sink_name == sink->name; - const auto signal_name_matches = !query._signal_name || *query._signal_name == sink->signal_name; - return sink_name_matches && signal_name_matches; + auto sink = std::any_cast *>(v); + const auto sinkNameMatches = !query._sink_name || *query._sink_name == sink->name; + const auto signalNameMatches = !query._signal_name || *query._signal_name == sink->signal_name; + return sinkNameMatches && signalNameMatches; } catch (...) { return false; } @@ -238,7 +238,7 @@ class data_sink_registry { return nullptr; } - return std::any_cast *>(*it); + return std::any_cast *>(*it); } }; @@ -303,14 +303,14 @@ get(const property_map &m, const std::string_view &key) { * @tparam T input sample type */ template -class data_sink : public Block> { - struct abstract_listener; +class DataSink : public Block> { + struct AbstractListener; - static constexpr std::size_t _listener_buffer_size = 65536; - std::deque> _listeners; - std::mutex _listener_mutex; - std::optional> _history; - bool _has_signal_info_from_settings = false; + static constexpr std::size_t _listener_buffer_size = 65536; + std::deque> _listeners; + std::mutex _listener_mutex; + std::optional> _history; + bool _has_signal_info_from_settings = false; public: Annotated, Unit<"Hz">> sample_rate = 1.f; @@ -341,19 +341,19 @@ class data_sink : public Block> { return false; } - const auto read_data = reader.get(available); + const auto readData = reader.get(available); if constexpr (requires { fnc(std::span(), std::span()); }) { - const auto tags = tag_reader.get(); - const auto it = std::find_if_not(tags.begin(), tags.end(), [until = static_cast(samples_read + available)](const auto &tag) { return tag.index < until; }); - auto relevant_tags = std::vector(tags.begin(), it); - for (auto &t : relevant_tags) { + const auto tags = tag_reader.get(); + const auto it = std::find_if_not(tags.begin(), tags.end(), [until = static_cast(samples_read + available)](const auto &tag) { return tag.index < until; }); + auto relevantTags = std::vector(tags.begin(), it); + for (auto &t : relevantTags) { t.index -= static_cast(samples_read); } - fnc(read_data, std::span(relevant_tags)); - std::ignore = tag_reader.consume(relevant_tags.size()); + fnc(readData, std::span(relevantTags)); + std::ignore = tag_reader.consume(relevantTags.size()); } else { std::ignore = tag_reader.consume(tag_reader.available()); - fnc(read_data); + fnc(readData); } std::ignore = reader.consume(available); @@ -362,7 +362,7 @@ class data_sink : public Block> { } }; - struct dataset_poller { + struct DataSetPoller { gr::CircularBuffer> buffer = gr::CircularBuffer>(_listener_buffer_size); decltype(buffer.new_reader()) reader = buffer.new_reader(); decltype(buffer.new_writer()) writer = buffer.new_writer(); @@ -377,92 +377,92 @@ class data_sink : public Block> { return false; } - const auto read_data = reader.get(available); - fnc(read_data); + const auto readData = reader.get(available); + fnc(readData); std::ignore = reader.consume(available); return true; } }; - data_sink() { data_sink_registry::instance().register_sink(this); } + DataSink() { DataSinkRegistry::instance().registerSink(this); } - ~data_sink() { + ~DataSink() { stop(); - data_sink_registry::instance().unregister_sink(this); + DataSinkRegistry::instance().unregisterSink(this); } void - settingsChanged(const property_map & /*old_settings*/, const property_map &new_settings) { - if (apply_signal_info(new_settings)) { + settingsChanged(const property_map & /*oldSettings*/, const property_map &newSettings) { + if (applySignalInfo(newSettings)) { _has_signal_info_from_settings = true; } } std::shared_ptr - get_streaming_poller(blocking_mode block_mode = blocking_mode::Blocking) { + getStreamingPoller(BlockingMode blockMode = BlockingMode::Blocking) { std::lock_guard lg(_listener_mutex); - const auto block = block_mode == blocking_mode::Blocking; + const auto block = blockMode == BlockingMode::Blocking; auto handler = std::make_shared(); - add_listener(std::make_unique>(handler, block, *this), block); + addListener(std::make_unique>(handler, block, *this), block); return handler; } template - std::shared_ptr - get_trigger_poller(M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { - const auto block = block_mode == blocking_mode::Blocking; - auto handler = std::make_shared(); + std::shared_ptr + getTriggerPoller(M &&matcher, std::size_t preSamples, std::size_t postSamples, BlockingMode blockMode = BlockingMode::Blocking) { + const auto block = blockMode == BlockingMode::Blocking; + auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); - add_listener(std::make_unique>(std::forward(matcher), handler, pre_samples, post_samples, block), block); - ensure_history_size(pre_samples); + addListener(std::make_unique>(std::forward(matcher), handler, preSamples, postSamples, block), block); + ensureHistorySize(preSamples); return handler; } template - std::shared_ptr - get_multiplexed_poller(M &&matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { + std::shared_ptr + getMultiplexedPoller(M &&matcher, std::size_t maximumWindowSize, BlockingMode blockMode = BlockingMode::Blocking) { std::lock_guard lg(_listener_mutex); - const auto block = block_mode == blocking_mode::Blocking; - auto handler = std::make_shared(); - add_listener(std::make_unique>(std::forward(matcher), maximum_window_size, handler, block), block); + const auto block = blockMode == BlockingMode::Blocking; + auto handler = std::make_shared(); + addListener(std::make_unique>(std::forward(matcher), maximumWindowSize, handler, block), block); return handler; } template - std::shared_ptr - get_snapshot_poller(M &&matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { - const auto block = block_mode == blocking_mode::Blocking; - auto handler = std::make_shared(); + std::shared_ptr + getSnapshotPoller(M &&matcher, std::chrono::nanoseconds delay, BlockingMode blockMode = BlockingMode::Blocking) { + const auto block = blockMode == BlockingMode::Blocking; + auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); - add_listener(std::make_unique>(std::forward(matcher), delay, handler, block), block); + addListener(std::make_unique>(std::forward(matcher), delay, handler, block), block); return handler; } template Callback> void - register_streaming_callback(std::size_t max_chunk_size, Callback &&callback) { - add_listener(std::make_unique>(max_chunk_size, std::forward(callback), *this), false); + registerStreamingCallback(std::size_t maxChunkSize, Callback &&callback) { + addListener(std::make_unique>(maxChunkSize, std::forward(callback), *this), false); } template Callback> void - register_trigger_callback(M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { - add_listener(std::make_unique>(std::forward(matcher), pre_samples, post_samples, std::forward(callback)), false); - ensure_history_size(pre_samples); + registerTriggerCallback(M &&matcher, std::size_t preSamples, std::size_t postSamples, Callback &&callback) { + addListener(std::make_unique>(std::forward(matcher), preSamples, postSamples, std::forward(callback)), false); + ensureHistorySize(preSamples); } template Callback> void - register_multiplexed_callback(M &&matcher, std::size_t maximum_window_size, Callback &&callback) { + registerMultiplexedCallback(M &&matcher, std::size_t maximumWindowSize, Callback &&callback) { std::lock_guard lg(_listener_mutex); - add_listener(std::make_unique>(std::forward(matcher), maximum_window_size, std::forward(callback)), false); + addListener(std::make_unique>(std::forward(matcher), maximumWindowSize, std::forward(callback)), false); } template Callback> void - register_snapshot_callback(M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { + registerSnapshotCallback(M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg(_listener_mutex); - add_listener(std::make_unique>(std::forward(matcher), delay, std::forward(callback)), false); + addListener(std::make_unique>(std::forward(matcher), delay, std::forward(callback)), false); } // TODO this code should be called at the end of graph processing @@ -475,28 +475,28 @@ class data_sink : public Block> { } [[nodiscard]] work::Status - processBulk(std::span in_data) noexcept { + processBulk(std::span inData) noexcept { std::optional tagData; if (this->input_tags_present()) { assert(this->input_tags()[0].index == 0); tagData = this->input_tags()[0].map; // signal info from settings overrides info from tags if (!_has_signal_info_from_settings) { - apply_signal_info(this->input_tags()[0].map); + applySignalInfo(this->input_tags()[0].map); } } { std::lock_guard lg(_listener_mutex); // TODO review/profile if a lock-free data structure should be used here - const auto history_view = _history ? _history->get_span(0) : std::span(); + const auto historyView = _history ? _history->get_span(0) : std::span(); std::erase_if(_listeners, [](const auto &l) { return l->expired; }); for (auto &listener : _listeners) { - listener->process(history_view, in_data, tagData); + listener->process(historyView, inData, tagData); } if (_history) { // store potential pre-samples for triggers at the beginning of the next chunk - const auto to_write = std::min(in_data.size(), _history->capacity()); - _history->push_back_bulk(in_data.last(to_write)); + const auto toWrite = std::min(inData.size(), _history->capacity()); + _history->push_back_bulk(inData.last(toWrite)); } } @@ -505,7 +505,7 @@ class data_sink : public Block> { private: bool - apply_signal_info(const property_map &properties) { + applySignalInfo(const property_map &properties) { try { const auto rate_ = detail::get(properties, tag::SAMPLE_RATE.key()); const auto name_ = detail::get(properties, tag::SIGNAL_NAME.key()); @@ -532,15 +532,15 @@ class data_sink : public Block> { // forward to listeners if (rate_ || name_ || unit_ || min_ || max_) { - const auto dstempl = make_dataset_template(); + const auto dstempl = makeDataSetTemplate(); std::lock_guard lg{ _listener_mutex }; for (auto &l : _listeners) { if (rate_) { - l->apply_sample_rate(sample_rate); + l->applySampleRate(sample_rate); } if (name_ || unit_ || min_ || max_) { - l->set_dataset_template(dstempl); + l->setDataSetTemplate(dstempl); } } } @@ -552,7 +552,7 @@ class data_sink : public Block> { } DataSet - make_dataset_template() const { + makeDataSetTemplate() const { DataSet dstempl; dstempl.signal_names = { signal_name }; dstempl.signal_units = { signal_unit }; @@ -561,7 +561,7 @@ class data_sink : public Block> { } void - ensure_history_size(std::size_t new_size) { + ensureHistorySize(std::size_t new_size) { const auto old_size = _history ? _history->capacity() : std::size_t{ 0 }; if (new_size <= old_size) { return; @@ -580,9 +580,9 @@ class data_sink : public Block> { } void - add_listener(std::unique_ptr &&l, bool block) { - l->set_dataset_template(make_dataset_template()); - l->apply_sample_rate(sample_rate); + addListener(std::unique_ptr &&l, bool block) { + l->setDataSetTemplate(makeDataSetTemplate()); + l->applySampleRate(sample_rate); if (block) { _listeners.push_back(std::move(l)); } else { @@ -590,38 +590,38 @@ class data_sink : public Block> { } } - struct abstract_listener { - bool expired = false; + struct AbstractListener { + bool expired = false; - virtual ~abstract_listener() = default; + virtual ~AbstractListener() = default; void - set_expired() { + setExpired() { expired = true; } virtual void - apply_sample_rate(float /*sample_rate*/) {} + applySampleRate(float /*sample_rate*/) {} virtual void - set_dataset_template(DataSet) {} + setDataSetTemplate(DataSet) {} virtual void - process(std::span history, std::span data, std::optional tag_data0) + process(std::span history, std::span data, std::optional tagData0) = 0; virtual void stop() = 0; }; template - struct continuous_listener : public abstract_listener { - static constexpr auto has_callback = !std::is_same_v; - static constexpr auto callback_takes_tags = std::is_invocable_v, std::span> - || std::is_invocable_v, std::span, const data_sink &>; + struct ContinuousListener : public AbstractListener { + static constexpr auto hasCallback = !std::is_same_v; + static constexpr auto callbackTakesTags = std::is_invocable_v, std::span> + || std::is_invocable_v, std::span, const DataSink &>; - const data_sink &parent_sink; - bool block = false; - std::size_t samples_written = 0; + const DataSink &parent_sink; + bool block = false; + std::size_t samples_written = 0; // callback-only std::size_t buffer_fill = 0; @@ -634,13 +634,13 @@ class data_sink : public Block> { Callback callback; template - explicit continuous_listener(std::size_t max_chunk_size, CallbackFW &&c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} + explicit ContinuousListener(std::size_t maxChunkSize, CallbackFW &&c, const DataSink &parent) : parent_sink(parent), buffer(maxChunkSize), callback{ std::forward(c) } {} - explicit continuous_listener(std::shared_ptr poller, bool do_block, const data_sink &parent) : parent_sink(parent), block(do_block), polling_handler{ std::move(poller) } {} + explicit ContinuousListener(std::shared_ptr poller, bool doBlock, const DataSink &parent) : parent_sink(parent), block(doBlock), polling_handler{ std::move(poller) } {} inline void - call_callback(std::span data, std::span tags) { - if constexpr (std::is_invocable_v, std::span, const data_sink &>) { + callCallback(std::span data, std::span tags) { + if constexpr (std::is_invocable_v, std::span, const DataSink &>) { callback(std::move(data), std::move(tags), parent_sink); } else if constexpr (std::is_invocable_v, std::span>) { callback(std::move(data), std::move(tags)); @@ -650,23 +650,23 @@ class data_sink : public Block> { } void - process(std::span, std::span data, std::optional tag_data0) override { + process(std::span, std::span data, std::optional tagData0) override { using namespace gr::detail; - if constexpr (has_callback) { + if constexpr (hasCallback) { // if there's pending data, fill buffer and send out if (buffer_fill > 0) { const auto n = std::min(data.size(), buffer.size() - buffer_fill); detail::copy_span(data.first(n), std::span(buffer).subspan(buffer_fill, n)); - if constexpr (callback_takes_tags) { - if (tag_data0) { - tag_buffer.push_back({ static_cast(buffer_fill), *tag_data0 }); - tag_data0.reset(); + if constexpr (callbackTakesTags) { + if (tagData0) { + tag_buffer.push_back({ static_cast(buffer_fill), *tagData0 }); + tagData0.reset(); } } buffer_fill += n; if (buffer_fill == buffer.size()) { - call_callback(std::span(buffer), std::span(tag_buffer)); + callCallback(std::span(buffer), std::span(tag_buffer)); samples_written += buffer.size(); buffer_fill = 0; tag_buffer.clear(); @@ -677,13 +677,13 @@ class data_sink : public Block> { // send out complete chunks directly while (data.size() >= buffer.size()) { - if constexpr (callback_takes_tags) { + if constexpr (callbackTakesTags) { std::vector tags; - if (tag_data0) { - tags.push_back({ 0, std::move(*tag_data0) }); - tag_data0.reset(); + if (tagData0) { + tags.push_back({ 0, std::move(*tagData0) }); + tagData0.reset(); } - call_callback(data.first(buffer.size()), std::span(tags)); + callCallback(data.first(buffer.size()), std::span(tags)); } else { callback(data.first(buffer.size())); } @@ -695,41 +695,41 @@ class data_sink : public Block> { if (!data.empty()) { detail::copy_span(data, std::span(buffer).first(data.size())); buffer_fill = data.size(); - if constexpr (callback_takes_tags) { - if (tag_data0) { - tag_buffer.push_back({ 0, std::move(*tag_data0) }); + if constexpr (callbackTakesTags) { + if (tagData0) { + tag_buffer.push_back({ 0, std::move(*tagData0) }); } } } } else { auto poller = polling_handler.lock(); if (!poller) { - this->set_expired(); + this->setExpired(); return; } - const auto to_write = block ? data.size() : std::min(data.size(), poller->writer.available()); + const auto toWrite = block ? data.size() : std::min(data.size(), poller->writer.available()); - if (to_write > 0) { - if (tag_data0) { + if (toWrite > 0) { + if (tagData0) { auto tw = poller->tag_writer.reserve_output_range(1); - tw[0] = { static_cast(samples_written), std::move(*tag_data0) }; + tw[0] = { static_cast(samples_written), std::move(*tagData0) }; tw.publish(1); } - auto write_data = poller->writer.reserve_output_range(to_write); - detail::copy_span(data.first(to_write), std::span(write_data)); - write_data.publish(write_data.size()); + auto writeData = poller->writer.reserve_output_range(toWrite); + detail::copy_span(data.first(toWrite), std::span(writeData)); + writeData.publish(writeData.size()); } - poller->drop_count += data.size() - to_write; - samples_written += to_write; + poller->drop_count += data.size() - toWrite; + samples_written += toWrite; } } void stop() override { - if constexpr (has_callback) { + if constexpr (hasCallback) { if (buffer_fill > 0) { - call_callback(std::span(buffer).first(buffer_fill), std::span(tag_buffer)); + callCallback(std::span(buffer).first(buffer_fill), std::span(tag_buffer)); tag_buffer.clear(); buffer_fill = 0; } @@ -747,50 +747,50 @@ class data_sink : public Block> { }; template - struct trigger_listener : public abstract_listener { - bool block = false; - std::size_t pre_samples = 0; - std::size_t post_samples = 0; + struct TriggerListener : public AbstractListener { + bool block = false; + std::size_t preSamples = 0; + std::size_t postSamples = 0; - DataSet dataset_template; - M trigger_matcher = {}; - std::deque pending_trigger_windows; // triggers that still didn't receive all their data - std::weak_ptr polling_handler = {}; + DataSet dataset_template; + M trigger_matcher = {}; + std::deque pending_trigger_windows; // triggers that still didn't receive all their data + std::weak_ptr polling_handler = {}; - Callback callback; + Callback callback; template - explicit trigger_listener(Matcher &&matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) - : block(do_block), pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(handler) } {} + explicit TriggerListener(Matcher &&matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool doBlock) + : block(doBlock), preSamples(pre), postSamples(post), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(handler) } {} template - explicit trigger_listener(Matcher &&matcher, std::size_t pre, std::size_t post, CallbackFW &&cb) - : pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} + explicit TriggerListener(Matcher &&matcher, std::size_t pre, std::size_t post, CallbackFW &&cb) + : preSamples(pre), postSamples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} void - set_dataset_template(DataSet dst) override { + setDataSetTemplate(DataSet dst) override { dataset_template = std::move(dst); } inline void - publish_dataset(DataSet &&data) { + publishDataSet(DataSet &&data) { if constexpr (!std::is_same_v) { callback(std::move(data)); } else { auto poller = polling_handler.lock(); if (!poller) { - this->set_expired(); + this->setExpired(); return; } - auto write_data = poller->writer.reserve_output_range(1); + auto writeData = poller->writer.reserve_output_range(1); if (block) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { if (poller->writer.available() > 0) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { poller->drop_count++; } @@ -799,26 +799,26 @@ class data_sink : public Block> { } void - process(std::span history, std::span in_data, std::optional tag_data0) override { - if (tag_data0 && trigger_matcher(Tag{ 0, *tag_data0 }) == trigger_match_result::Matching) { + process(std::span history, std::span inData, std::optional tagData0) override { + if (tagData0 && trigger_matcher(Tag{ 0, *tagData0 }) == TriggerMatchResult::Matching) { DataSet dataset = dataset_template; - dataset.signal_values.reserve(pre_samples + post_samples); // TODO maybe make the circ. buffer smaller but preallocate these + dataset.signal_values.reserve(preSamples + postSamples); // TODO maybe make the circ. buffer smaller but preallocate these - const auto pre_sample_view = history.last(std::min(pre_samples, history.size())); - dataset.signal_values.insert(dataset.signal_values.end(), pre_sample_view.begin(), pre_sample_view.end()); + const auto preSampleView = history.last(std::min(preSamples, history.size())); + dataset.signal_values.insert(dataset.signal_values.end(), preSampleView.begin(), preSampleView.end()); - dataset.timing_events = { { { static_cast(pre_sample_view.size()), *tag_data0 } } }; - pending_trigger_windows.push_back({ .dataset = std::move(dataset), .pending_post_samples = post_samples }); + dataset.timing_events = { { { static_cast(preSampleView.size()), *tagData0 } } }; + pending_trigger_windows.push_back({ .dataset = std::move(dataset), .pending_post_samples = postSamples }); } auto window = pending_trigger_windows.begin(); while (window != pending_trigger_windows.end()) { - const auto post_sample_view = in_data.first(std::min(window->pending_post_samples, in_data.size())); - window->dataset.signal_values.insert(window->dataset.signal_values.end(), post_sample_view.begin(), post_sample_view.end()); - window->pending_post_samples -= post_sample_view.size(); + const auto postSampleView = inData.first(std::min(window->pending_post_samples, inData.size())); + window->dataset.signal_values.insert(window->dataset.signal_values.end(), postSampleView.begin(), postSampleView.end()); + window->pending_post_samples -= postSampleView.size(); if (window->pending_post_samples == 0) { - this->publish_dataset(std::move(window->dataset)); + this->publishDataSet(std::move(window->dataset)); window = pending_trigger_windows.erase(window); } else { ++window; @@ -830,7 +830,7 @@ class data_sink : public Block> { stop() override { for (auto &window : pending_trigger_windows) { if (!window.dataset.signal_values.empty()) { - this->publish_dataset(std::move(window.dataset)); + this->publishDataSet(std::move(window.dataset)); } } pending_trigger_windows.clear(); @@ -841,47 +841,47 @@ class data_sink : public Block> { }; template - struct multiplexed_listener : public abstract_listener { - bool block = false; - M matcher; - DataSet dataset_template; - std::optional> pending_dataset; - std::size_t maximum_window_size; - std::weak_ptr polling_handler = {}; - Callback callback; + struct MultiplexedListener : public AbstractListener { + bool block = false; + M matcher; + DataSet dataset_template; + std::optional> pending_dataset; + std::size_t maximumWindowSize; + std::weak_ptr polling_handler = {}; + Callback callback; template - explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, CallbackFW &&cb) - : matcher(std::forward(matcher_)), maximum_window_size(max_window_size), callback(std::forward(cb)) {} + explicit MultiplexedListener(Matcher &&matcher_, std::size_t maxWindowSize, CallbackFW &&cb) + : matcher(std::forward(matcher_)), maximumWindowSize(maxWindowSize), callback(std::forward(cb)) {} template - explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) - : block(do_block), matcher(std::forward(matcher_)), maximum_window_size(max_window_size), polling_handler{ std::move(handler) } {} + explicit MultiplexedListener(Matcher &&matcher_, std::size_t maxWindowSize, std::shared_ptr handler, bool doBlock) + : block(doBlock), matcher(std::forward(matcher_)), maximumWindowSize(maxWindowSize), polling_handler{ std::move(handler) } {} void - set_dataset_template(DataSet dst) override { + setDataSetTemplate(DataSet dst) override { dataset_template = std::move(dst); } inline void - publish_dataset(DataSet &&data) { + publishDataSet(DataSet &&data) { if constexpr (!std::is_same_v) { callback(std::move(data)); } else { auto poller = polling_handler.lock(); if (!poller) { - this->set_expired(); + this->setExpired(); return; } - auto write_data = poller->writer.reserve_output_range(1); + auto writeData = poller->writer.reserve_output_range(1); if (block) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { if (poller->writer.available() > 0) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { poller->drop_count++; } @@ -890,31 +890,31 @@ class data_sink : public Block> { } void - process(std::span, std::span in_data, std::optional tag_data0) override { - if (tag_data0) { - const auto obsr = matcher(Tag{ 0, *tag_data0 }); - if (obsr == trigger_match_result::NotMatching || obsr == trigger_match_result::Matching) { + process(std::span, std::span inData, std::optional tagData0) override { + if (tagData0) { + const auto obsr = matcher(Tag{ 0, *tagData0 }); + if (obsr == TriggerMatchResult::NotMatching || obsr == TriggerMatchResult::Matching) { if (pending_dataset) { - if (obsr == trigger_match_result::NotMatching) { - pending_dataset->timing_events[0].push_back({ static_cast(pending_dataset->signal_values.size()), *tag_data0 }); + if (obsr == TriggerMatchResult::NotMatching) { + pending_dataset->timing_events[0].push_back({ static_cast(pending_dataset->signal_values.size()), *tagData0 }); } - this->publish_dataset(std::move(*pending_dataset)); + this->publishDataSet(std::move(*pending_dataset)); pending_dataset.reset(); } } - if (obsr == trigger_match_result::Matching) { + if (obsr == TriggerMatchResult::Matching) { pending_dataset = dataset_template; - pending_dataset->signal_values.reserve(maximum_window_size); // TODO might be too much? - pending_dataset->timing_events = { { { 0, *tag_data0 } } }; + pending_dataset->signal_values.reserve(maximumWindowSize); // TODO might be too much? + pending_dataset->timing_events = { { { 0, *tagData0 } } }; } } if (pending_dataset) { - const auto to_write = std::min(in_data.size(), maximum_window_size - pending_dataset->signal_values.size()); - const auto view = in_data.first(to_write); + const auto toWrite = std::min(inData.size(), maximumWindowSize - pending_dataset->signal_values.size()); + const auto view = inData.first(toWrite); pending_dataset->signal_values.insert(pending_dataset->signal_values.end(), view.begin(), view.end()); - if (pending_dataset->signal_values.size() == maximum_window_size) { - this->publish_dataset(std::move(*pending_dataset)); + if (pending_dataset->signal_values.size() == maximumWindowSize) { + this->publishDataSet(std::move(*pending_dataset)); pending_dataset.reset(); } } @@ -923,7 +923,7 @@ class data_sink : public Block> { void stop() override { if (pending_dataset) { - this->publish_dataset(std::move(*pending_dataset)); + this->publishDataSet(std::move(*pending_dataset)); pending_dataset.reset(); } if (auto p = polling_handler.lock()) { @@ -939,54 +939,54 @@ class data_sink : public Block> { }; template - struct snapshot_listener : public abstract_listener { - bool block = false; - std::chrono::nanoseconds time_delay; - std::size_t sample_delay = 0; - DataSet dataset_template; - M trigger_matcher = {}; - std::deque pending; - std::weak_ptr polling_handler = {}; - Callback callback; + struct SnapshotListener : public AbstractListener { + bool block = false; + std::chrono::nanoseconds time_delay; + std::size_t sample_delay = 0; + DataSet dataset_template; + M trigger_matcher = {}; + std::deque pending; + std::weak_ptr polling_handler = {}; + Callback callback; template - explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) - : block(do_block), time_delay(delay), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(poller) } {} + explicit SnapshotListener(Matcher &&matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool doBlock) + : block(doBlock), time_delay(delay), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(poller) } {} template - explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, CallbackFW &&cb) + explicit SnapshotListener(Matcher &&matcher, std::chrono::nanoseconds delay, CallbackFW &&cb) : time_delay(delay), trigger_matcher(std::forward(matcher)), callback(std::forward(cb)) {} void - set_dataset_template(DataSet dst) override { + setDataSetTemplate(DataSet dst) override { dataset_template = std::move(dst); } void - apply_sample_rate(float rateHz) override { + applySampleRate(float rateHz) override { sample_delay = static_cast(std::round(std::chrono::duration_cast>(time_delay).count() * rateHz)); // TODO do we need to update the requested_samples of pending here? (considering both old and new time_delay) } inline void - publish_dataset(DataSet &&data) { + publishDataSet(DataSet &&data) { if constexpr (!std::is_same_v) { callback(std::move(data)); } else { auto poller = polling_handler.lock(); if (!poller) { - this->set_expired(); + this->setExpired(); return; } - auto write_data = poller->writer.reserve_output_range(1); + auto writeData = poller->writer.reserve_output_range(1); if (block) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { if (poller->writer.available() > 0) { - write_data[0] = std::move(data); - write_data.publish(1); + writeData[0] = std::move(data); + writeData.publish(1); } else { poller->drop_count++; } @@ -995,25 +995,25 @@ class data_sink : public Block> { } void - process(std::span, std::span in_data, std::optional tag_data0) override { - if (tag_data0 && trigger_matcher({ 0, *tag_data0 }) == trigger_match_result::Matching) { - auto new_pending = pending_snapshot{ *tag_data0, sample_delay, sample_delay }; - // make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in apply_sample_rate, see there) + process(std::span, std::span inData, std::optional tagData0) override { + if (tagData0 && trigger_matcher({ 0, *tagData0 }) == TriggerMatchResult::Matching) { + auto new_pending = pending_snapshot{ *tagData0, sample_delay, sample_delay }; + // make sure pending is sorted by number of pending_samples (insertion might be not at end if sample rate decreased; TODO unless we adapt them in applySampleRate, see there) auto rit = std::find_if(pending.rbegin(), pending.rend(), [delay = sample_delay](const auto &other) { return other.pending_samples < delay; }); pending.insert(rit.base(), std::move(new_pending)); } auto it = pending.begin(); while (it != pending.end()) { - if (it->pending_samples >= in_data.size()) { - it->pending_samples -= in_data.size(); + if (it->pending_samples >= inData.size()) { + it->pending_samples -= inData.size(); break; } DataSet dataset = dataset_template; dataset.timing_events = { { { -static_cast(it->delay), std::move(it->tag_data) } } }; - dataset.signal_values = { in_data[it->pending_samples] }; - this->publish_dataset(std::move(dataset)); + dataset.signal_values = { inData[it->pending_samples] }; + this->publishDataSet(std::move(dataset)); it = pending.erase(it); } @@ -1031,6 +1031,6 @@ class data_sink : public Block> { } // namespace gr::basic -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::basic::data_sink), in, sample_rate, signal_name, signal_unit, signal_min, signal_max); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::basic::DataSink), in, sample_rate, signal_name, signal_unit, signal_min, signal_max); #endif diff --git a/blocks/basic/test/CMakeLists.txt b/blocks/basic/test/CMakeLists.txt index f1a897074..e37edc758 100644 --- a/blocks/basic/test/CMakeLists.txt +++ b/blocks/basic/test/CMakeLists.txt @@ -1,3 +1,3 @@ add_ut_test(qa_selector) add_ut_test(qa_sources) -add_ut_test(qa_data_sink) +add_ut_test(qa_DataSink) diff --git a/blocks/basic/test/qa_data_sink.cpp b/blocks/basic/test/qa_DataSink.cpp similarity index 56% rename from blocks/basic/test/qa_data_sink.cpp rename to blocks/basic/test/qa_DataSink.cpp index 16e555e78..f48245e5a 100644 --- a/blocks/basic/test/qa_data_sink.cpp +++ b/blocks/basic/test/qa_DataSink.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #if defined(__clang__) && __clang_major__ >= 16 // clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection @@ -119,36 +119,36 @@ struct Matcher { return !same(x, other); } - trigger_match_result + TriggerMatchResult operator()(const Tag &tag) { const auto ty = tag.get("YEAR"); const auto tm = tag.get("MONTH"); const auto td = tag.get("DAY"); if (!ty || !tm || !td) { - return trigger_match_result::Ignore; + return TriggerMatchResult::Ignore; } - const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); - const auto &[y, m, d] = tup; - const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; - const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; - const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; + const auto tup = std::make_tuple(std::get(ty->get()), std::get(tm->get()), std::get(td->get())); + const auto &[y, m, d] = tup; + const auto ly = last_seen ? std::optional(std::get<0>(*last_seen)) : std::nullopt; + const auto lm = last_seen ? std::optional(std::get<1>(*last_seen)) : std::nullopt; + const auto ld = last_seen ? std::optional(std::get<2>(*last_seen)) : std::nullopt; - const auto year_restart = year && *year == -1 && changed(y, ly); - const auto year_matches = !year || *year == -1 || same(y, year); - const auto month_restart = month && *month == -1 && changed(m, lm); - const auto month_matches = !month || *month == -1 || same(m, month); - const auto day_restart = day && *day == -1 && changed(d, ld); - const auto day_matches = !day || *day == -1 || same(d, day); - const auto matches = year_matches && month_matches && day_matches; - const auto restart = year_restart || month_restart || day_restart; + const auto yearRestart = year && *year == -1 && changed(y, ly); + const auto yearMatches = !year || *year == -1 || same(y, year); + const auto monthRestart = month && *month == -1 && changed(m, lm); + const auto monthMatches = !month || *month == -1 || same(m, month); + const auto dayRestart = day && *day == -1 && changed(d, ld); + const auto dayMatches = !day || *day == -1 || same(d, day); + const auto matches = yearMatches && monthMatches && dayMatches; + const auto restart = yearRestart || monthRestart || dayRestart; - trigger_match_result r = trigger_match_result::Ignore; + auto r = TriggerMatchResult::Ignore; if (!matches) { - r = trigger_match_result::NotMatching; + r = TriggerMatchResult::NotMatching; } else if (!last_matched || restart) { - r = trigger_match_result::Matching; + r = TriggerMatchResult::Matching; } last_seen = tup; @@ -158,18 +158,18 @@ struct Matcher { }; static Tag -make_tag(Tag::signed_index_type index, int year, int month, int day) { +makeTag(Tag::signed_index_type index, int year, int month, int day) { return Tag{ index, { { "YEAR", year }, { "MONTH", month }, { "DAY", day } } }; } static std::vector -make_test_tags(Tag::signed_index_type first_index, Tag::signed_index_type interval) { +makeTestTags(Tag::signed_index_type firstIndex, Tag::signed_index_type interval) { std::vector tags; for (int y = 1; y <= 3; ++y) { for (int m = 1; m <= 2; ++m) { for (int d = 1; d <= 3; ++d) { - tags.push_back(make_tag(first_index, y, m, d)); - first_index += interval; + tags.push_back(makeTag(firstIndex, y, m, d)); + firstIndex += interval; } } } @@ -177,20 +177,20 @@ make_test_tags(Tag::signed_index_type first_index, Tag::signed_index_type interv } static std::string -to_ascii_art(std::span states) { +toAsciiArt(std::span states) { bool started = false; std::string r; for (auto s : states) { switch (s) { - case trigger_match_result::Matching: + case TriggerMatchResult::Matching: r += started ? "||#" : "|#"; started = true; break; - case trigger_match_result::NotMatching: + case TriggerMatchResult::NotMatching: r += started ? "|_" : "_"; started = false; break; - case trigger_match_result::Ignore: r += started ? "#" : "_"; break; + case TriggerMatchResult::Ignore: r += started ? "#" : "_"; break; } }; return r; @@ -198,13 +198,13 @@ to_ascii_art(std::span states) { template std::string -run_matcher_test(std::span tags, M o) { - std::vector r; +runMatcherTest(std::span tags, M o) { + std::vector r; r.reserve(tags.size()); for (const auto &tag : tags) { r.push_back(o(tag)); } - return to_ascii_art(r); + return toAsciiArt(r); } } // namespace gr::basic::data_sink_test @@ -213,13 +213,13 @@ ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (gr::basic::data_sink_test::So template std::string -format_list(const T &l) { +formatList(const T &l) { return fmt::format("[{}]", fmt::join(l, ", ")); } template bool -indexes_match(const T &lhs, const U &rhs) { +indexesMatch(const T &lhs, const U &rhs) { auto index_match = [](const auto &l, const auto &r) { return l.index == r.index; }; return std::equal(std::begin(lhs), std::end(lhs), std::begin(rhs), std::end(rhs), index_match); @@ -234,41 +234,41 @@ const boost::ut::suite DataSinkTests = [] { using namespace std::string_literals; "callback continuous mode"_test = [] { - static constexpr std::int32_t n_samples = 200005; - static constexpr std::size_t chunk_size = 1000; + static constexpr std::int32_t kSamples = 200005; + static constexpr std::size_t kChunkSize = 1000; - const auto src_tags = make_test_tags(0, 1000); + const auto srcTags = makeTestTags(0, 1000); gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); - src.tags = src_tags; + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + src.tags = srcTags; expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - std::atomic samples_seen1 = 0; - std::atomic chunks_seen1 = 0; - auto callback = [&samples_seen1, &chunks_seen1](std::span buffer) { + std::atomic samplesSeen1 = 0; + std::atomic chunksSeen1 = 0; + auto callback = [&samplesSeen1, &chunksSeen1](std::span buffer) { for (std::size_t i = 0; i < buffer.size(); ++i) { - expect(eq(buffer[i], static_cast(samples_seen1 + i))); + expect(eq(buffer[i], static_cast(samplesSeen1 + i))); } - samples_seen1 += buffer.size(); - chunks_seen1++; - if (chunks_seen1 < 201) { - expect(eq(buffer.size(), chunk_size)); + samplesSeen1 += buffer.size(); + chunksSeen1++; + if (chunksSeen1 < 201) { + expect(eq(buffer.size(), kChunkSize)); } else { expect(eq(buffer.size(), 5_UZ)); } }; std::mutex m2; - std::size_t samples_seen2 = 0; - std::size_t chunks_seen2 = 0; - std::vector received_tags; - auto callback_with_tags = [&samples_seen2, &chunks_seen2, &m2, &received_tags](std::span buffer, std::span tags) { + std::size_t samplesSeen2 = 0; + std::size_t chunksSeen2 = 0; + std::vector receivedTags; + auto callbackWithTags = [&samplesSeen2, &chunksSeen2, &m2, &receivedTags](std::span buffer, std::span tags) { for (std::size_t i = 0; i < buffer.size(); ++i) { - expect(eq(buffer[i], static_cast(samples_seen2 + i))); + expect(eq(buffer[i], static_cast(samplesSeen2 + i))); } for (const auto &tag : tags) { @@ -278,27 +278,27 @@ const boost::ut::suite DataSinkTests = [] { auto lg = std::lock_guard{ m2 }; std::vector adjusted; - std::transform(tags.begin(), tags.end(), std::back_inserter(adjusted), [samples_seen2](const auto &tag) { - return Tag{ static_cast(samples_seen2) + tag.index, tag.map }; + std::transform(tags.begin(), tags.end(), std::back_inserter(adjusted), [samplesSeen2](const auto &tag) { + return Tag{ static_cast(samplesSeen2) + tag.index, tag.map }; }); - received_tags.insert(received_tags.end(), adjusted.begin(), adjusted.end()); - samples_seen2 += buffer.size(); - chunks_seen2++; - if (chunks_seen2 < 201) { - expect(eq(buffer.size(), chunk_size)); + receivedTags.insert(receivedTags.end(), adjusted.begin(), adjusted.end()); + samplesSeen2 += buffer.size(); + chunksSeen2++; + if (chunksSeen2 < 201) { + expect(eq(buffer.size(), kChunkSize)); } else { expect(eq(buffer.size(), 5_UZ)); } }; - auto callback_with_tags_and_sink = [&sink](std::span, std::span, const data_sink &passed_sink) { - expect(eq(passed_sink.name.value, "test_sink"s)); - expect(eq(sink.unique_name, passed_sink.unique_name)); + auto callbackWithTagsAndSink = [&sink](std::span, std::span, const DataSink &passedSink) { + expect(eq(passedSink.name.value, "test_sink"s)); + expect(eq(sink.unique_name, passedSink.unique_name)); }; - expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback)); - expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback_with_tags)); - expect(data_sink_registry::instance().register_streaming_callback(data_sink_query::sink_name("test_sink"), chunk_size, callback_with_tags_and_sink)); + expect(DataSinkRegistry::instance().registerStreamingCallback(DataSinkQuery::sinkName("test_sink"), kChunkSize, callback)); + expect(DataSinkRegistry::instance().registerStreamingCallback(DataSinkQuery::sinkName("test_sink"), kChunkSize, callbackWithTags)); + expect(DataSinkRegistry::instance().registerStreamingCallback(DataSinkQuery::sinkName("test_sink"), kChunkSize, callbackWithTagsAndSink)); gr::scheduler::Simple sched{ std::move(testGraph) }; sched.runAndWait(); @@ -306,35 +306,35 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this auto lg = std::lock_guard{ m2 }; - expect(eq(chunks_seen1.load(), 201_UZ)); - expect(eq(chunks_seen2, 201_UZ)); - expect(eq(samples_seen1.load(), static_cast(n_samples))); - expect(eq(samples_seen2, static_cast(n_samples))); - expect(eq(indexes_match(received_tags, src_tags), true)) << fmt::format("{} != {}", format_list(received_tags), format_list(src_tags)); + expect(eq(chunksSeen1.load(), 201_UZ)); + expect(eq(chunksSeen2, 201_UZ)); + expect(eq(samplesSeen1.load(), static_cast(kSamples))); + expect(eq(samplesSeen2, static_cast(kSamples))); + expect(eq(indexesMatch(receivedTags, srcTags), true)) << fmt::format("{} != {}", formatList(receivedTags), formatList(srcTags)); }; "blocking polling continuous mode"_test = [] { - constexpr std::int32_t n_samples = 200000; + constexpr std::int32_t kSamples = 200000; gr::Graph testGraph; - const auto tags = make_test_tags(0, 1000); - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); + const auto tags = makeTestTags(0, 1000); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); src.tags = tags; - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto poller_data_only = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); - expect(neq(poller_data_only, nullptr)); + auto pollerDataOnly = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink"), BlockingMode::Blocking); + expect(neq(pollerDataOnly, nullptr)); - auto poller_with_tags = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); - expect(neq(poller_with_tags, nullptr)); + auto pollerWithTags = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink"), BlockingMode::Blocking); + expect(neq(pollerWithTags, nullptr)); - auto runner1 = std::async([poller = poller_data_only] { + auto runner1 = std::async([poller = pollerDataOnly] { std::vector received; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished; + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished; while (poller->process([&received](const auto &data) { received.insert(received.end(), data.begin(), data.end()); })) { } } @@ -342,24 +342,24 @@ const boost::ut::suite DataSinkTests = [] { return received; }); - auto runner2 = std::async([poller = poller_with_tags] { + auto runner2 = std::async([poller = pollerWithTags] { std::vector received; - std::vector received_tags; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished; - while (poller->process([&received, &received_tags](const auto &data, const auto &tags_) { + std::vector receivedTags; + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished; + while (poller->process([&received, &receivedTags](const auto &data, const auto &tags_) { auto rtags = std::vector(tags_.begin(), tags_.end()); for (auto &t : rtags) { t.index += static_cast(received.size()); } - received_tags.insert(received_tags.end(), rtags.begin(), rtags.end()); + receivedTags.insert(receivedTags.end(), rtags.begin(), rtags.end()); received.insert(received.end(), data.begin(), data.end()); })) { } } - return std::make_tuple(received, received_tags); + return std::make_tuple(received, receivedTags); }); gr::scheduler::Simple sched{ std::move(testGraph) }; @@ -367,51 +367,51 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this - std::vector expected(n_samples); + std::vector expected(kSamples); std::iota(expected.begin(), expected.end(), 0.0); - const auto received1 = runner1.get(); - const auto &[received2, received_tags] = runner2.get(); + const auto received1 = runner1.get(); + const auto &[received2, receivedTags] = runner2.get(); expect(eq(received1.size(), expected.size())); expect(eq(received1, expected)); - expect(eq(poller_data_only->drop_count.load(), 0_UZ)); + expect(eq(pollerDataOnly->drop_count.load(), 0_UZ)); expect(eq(received2.size(), expected.size())); expect(eq(received2, expected)); - expect(eq(received_tags.size(), tags.size())); - expect(eq(indexes_match(received_tags, tags), true)) << fmt::format("{} != {}", format_list(received_tags), format_list(tags)); - expect(eq(poller_with_tags->drop_count.load(), 0_UZ)); + expect(eq(receivedTags.size(), tags.size())); + expect(eq(indexesMatch(receivedTags, tags), true)) << fmt::format("{} != {}", formatList(receivedTags), formatList(tags)); + expect(eq(pollerWithTags->drop_count.load(), 0_UZ)); }; "blocking polling trigger mode non-overlapping"_test = [] { - constexpr std::int32_t n_samples = 200000; + constexpr std::int32_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); const auto tags = std::vector{ { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; src.tags = tags; - auto &sink = testGraph.emplaceBlock>( - { { "name", "test_sink" }, { "signal_name", "test signal" }, { "signal_unit", "none" }, { "signal_min", int32_t{ 0 } }, { "signal_max", int32_t{ n_samples - 1 } } }); + auto &sink = testGraph.emplaceBlock>( + { { "name", "test_sink" }, { "signal_name", "test signal" }, { "signal_unit", "none" }, { "signal_min", int32_t{ 0 } }, { "signal_max", int32_t{ kSamples - 1 } } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const Tag &tag) { + auto isTrigger = [](const Tag &tag) { const auto v = tag.get("TYPE"); - return v && std::get(v->get()) == "TRIGGER" ? trigger_match_result::Matching : trigger_match_result::Ignore; + return v && std::get(v->get()) == "TRIGGER" ? TriggerMatchResult::Matching : TriggerMatchResult::Ignore; }; // lookup by signal name - auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::signal_name("test signal"), is_trigger, 3, 2, blocking_mode::Blocking); + auto poller = DataSinkRegistry::instance().getTriggerPoller(DataSinkQuery::signalName("test signal"), isTrigger, 3, 2, BlockingMode::Blocking); expect(neq(poller, nullptr)); auto polling = std::async([poller] { - std::vector received_data; - std::vector received_tags; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished; - [[maybe_unused]] auto r = poller->process([&received_data, &received_tags](const auto &datasets) { + std::vector receivedData; + std::vector receivedTags; + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished; + [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto &datasets) { for (const auto &dataset : datasets) { - received_data.insert(received_data.end(), dataset.signal_values.begin(), dataset.signal_values.end()); + receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); // signal info from sink settings expect(eq(dataset.signal_names.size(), 1u)); expect(eq(dataset.signal_units.size(), 1u)); @@ -419,14 +419,14 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "none"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ 0, n_samples - 1 })); + expect(eq(dataset.signal_ranges[0], std::vector{ 0, kSamples - 1 })); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, 3)); - received_tags.insert(received_tags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); + receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); } }); } - return std::make_tuple(received_data, received_tags); + return std::make_tuple(receivedData, receivedTags); }); gr::scheduler::Simple sched{ std::move(testGraph) }; @@ -434,55 +434,55 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this - const auto &[received_data, received_tags] = polling.get(); - const auto expected_tags = { tags[0], tags[2] }; // triggers-only + const auto &[receivedData, receivedTags] = polling.get(); + const auto expected_tags = { tags[0], tags[2] }; // triggers-only - expect(eq(received_data.size(), 10_UZ)); - expect(eq(received_data, std::vector{ 2997, 2998, 2999, 3000, 3001, 179997, 179998, 179999, 180000, 180001 })); - expect(eq(received_tags.size(), expected_tags.size())); + expect(eq(receivedData.size(), 10_UZ)); + expect(eq(receivedData, std::vector{ 2997, 2998, 2999, 3000, 3001, 179997, 179998, 179999, 180000, 180001 })); + expect(eq(receivedTags.size(), expected_tags.size())); expect(eq(poller->drop_count.load(), 0_UZ)); }; "blocking snapshot mode"_test = [] { - constexpr std::int32_t n_samples = 200000; + constexpr std::int32_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); src.tags = { { 0, { { std::string(tag::SIGNAL_NAME.key()), "test signal" }, { std::string(tag::SIGNAL_UNIT.key()), "none" }, { std::string(tag::SIGNAL_MIN.key()), int32_t{ 0 } }, - { std::string(tag::SIGNAL_MAX.key()), n_samples - 1 } } }, + { std::string(tag::SIGNAL_MAX.key()), kSamples - 1 } } }, { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" }, { "sample_rate", 10000.f } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" }, { "sample_rate", 10000.f } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const Tag &tag) { + auto isTrigger = [](const Tag &tag) { const auto v = tag.get("TYPE"); - return (v && std::get(v->get()) == "TRIGGER") ? trigger_match_result::Matching : trigger_match_result::Ignore; + return (v && std::get(v->get()) == "TRIGGER") ? TriggerMatchResult::Matching : TriggerMatchResult::Ignore; }; const auto delay = std::chrono::milliseconds{ 500 }; // sample rate 10000 -> 5000 samples - auto poller = data_sink_registry::instance().get_snapshot_poller(data_sink_query::sink_name("test_sink"), is_trigger, delay, blocking_mode::Blocking); + auto poller = DataSinkRegistry::instance().getSnapshotPoller(DataSinkQuery::sinkName("test_sink"), isTrigger, delay, BlockingMode::Blocking); expect(neq(poller, nullptr)); - std::vector received_data_cb; + std::vector receivedDataCb; - auto callback = [&received_data_cb](const auto &dataset) { received_data_cb.insert(received_data_cb.end(), dataset.signal_values.begin(), dataset.signal_values.end()); }; + auto callback = [&receivedDataCb](const auto &dataset) { receivedDataCb.insert(receivedDataCb.end(), dataset.signal_values.begin(), dataset.signal_values.end()); }; - expect(data_sink_registry::instance().register_snapshot_callback(data_sink_query::sink_name("test_sink"), is_trigger, delay, callback)); + expect(DataSinkRegistry::instance().registerSnapshotCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, delay, callback)); auto poller_result = std::async([poller] { - std::vector received_data; + std::vector receivedData; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished; - [[maybe_unused]] auto r = poller->process([&received_data](const auto &datasets) { + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished; + [[maybe_unused]] auto r = poller->process([&receivedData](const auto &datasets) { for (const auto &dataset : datasets) { // signal info from tags expect(eq(dataset.signal_names.size(), 1u)); @@ -491,15 +491,15 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "none"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ 0, n_samples - 1 })); + expect(eq(dataset.signal_ranges[0], std::vector{ 0, kSamples - 1 })); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, -5000)); - received_data.insert(received_data.end(), dataset.signal_values.begin(), dataset.signal_values.end()); + receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); } }); } - return received_data; + return receivedData; }); gr::scheduler::Simple sched{ std::move(testGraph) }; @@ -507,20 +507,20 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this - const auto received_data = poller_result.get(); - expect(eq(received_data_cb, received_data)); - expect(eq(received_data, std::vector{ 8000, 185000 })); + const auto receivedData = poller_result.get(); + expect(eq(receivedDataCb, receivedData)); + expect(eq(receivedData, std::vector{ 8000, 185000 })); expect(eq(poller->drop_count.load(), 0_UZ)); }; "blocking multiplexed mode"_test = [] { - const auto tags = make_test_tags(0, 10000); + const auto tags = makeTestTags(0, 10000); const std::int32_t n_samples = static_cast(tags.size() * 10000 + 100000); gr::Graph testGraph; auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); src.tags = tags; - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); @@ -528,11 +528,11 @@ const boost::ut::suite DataSinkTests = [] { const auto t = std::span(tags); // Test the test matcher - expect(eq(run_matcher_test(t, Matcher({}, -1, {})), "|###||###||###||###||###||###"s)); - expect(eq(run_matcher_test(t, Matcher(-1, {}, {})), "|######||######||######"s)); - expect(eq(run_matcher_test(t, Matcher(1, {}, {})), "|######|____________"s)); - expect(eq(run_matcher_test(t, Matcher(1, {}, 2)), "_|#|__|#|_____________"s)); - expect(eq(run_matcher_test(t, Matcher({}, {}, 1)), "|#|__|#|__|#|__|#|__|#|__|#|__"s)); + expect(eq(runMatcherTest(t, Matcher({}, -1, {})), "|###||###||###||###||###||###"s)); + expect(eq(runMatcherTest(t, Matcher(-1, {}, {})), "|######||######||######"s)); + expect(eq(runMatcherTest(t, Matcher(1, {}, {})), "|######|____________"s)); + expect(eq(runMatcherTest(t, Matcher(1, {}, 2)), "_|#|__|#|_____________"s)); + expect(eq(runMatcherTest(t, Matcher({}, {}, 1)), "|#|__|#|__|#|__|#|__|#|__|#|__"s)); } const auto matchers = std::array{ Matcher({}, -1, {}), Matcher(-1, {}, {}), Matcher(1, {}, {}), Matcher(1, {}, 2), Matcher({}, {}, 1) }; @@ -543,28 +543,28 @@ const boost::ut::suite DataSinkTests = [] { { 0, 59999 }, { 10000, 19999, 40000, 49999 }, { 0, 9999, 30000, 39999, 60000, 69999, 90000, 99999, 120000, 129999, 150000, 159999 } } }; - std::array::dataset_poller>, matchers.size()> pollers; + std::array::DataSetPoller>, matchers.size()> pollers; - std::vector>> results; - std::array, matchers.size()> results_cb; + std::vector>> results; + std::array, matchers.size()> resultsCb; - for (std::size_t i = 0; i < results_cb.size(); ++i) { - auto callback = [&r = results_cb[i]](const auto &dataset) { + for (std::size_t i = 0; i < resultsCb.size(); ++i) { + auto callback = [&r = resultsCb[i]](const auto &dataset) { r.push_back(dataset.signal_values.front()); r.push_back(dataset.signal_values.back()); }; - expect(eq(data_sink_registry::instance().register_multiplexed_callback(data_sink_query::sink_name("test_sink"), Matcher(matchers[i]), 100000, std::move(callback)), true)); + expect(eq(DataSinkRegistry::instance().registerMultiplexedCallback(DataSinkQuery::sinkName("test_sink"), Matcher(matchers[i]), 100000, std::move(callback)), true)); - pollers[i] = data_sink_registry::instance().get_multiplexed_poller(data_sink_query::sink_name("test_sink"), Matcher(matchers[i]), 100000, blocking_mode::Blocking); + pollers[i] = DataSinkRegistry::instance().getMultiplexedPoller(DataSinkQuery::sinkName("test_sink"), Matcher(matchers[i]), 100000, BlockingMode::Blocking); expect(neq(pollers[i], nullptr)); } for (std::size_t i = 0; i < pollers.size(); ++i) { auto f = std::async([poller = pollers[i]] { std::vector ranges; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished.load(); + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished.load(); while (poller->process([&ranges](const auto &datasets) { for (const auto &dataset : datasets) { // default signal info, we didn't set anything @@ -591,50 +591,50 @@ const boost::ut::suite DataSinkTests = [] { for (std::size_t i = 0; i < results.size(); ++i) { expect(eq(results[i].get(), expected[i])); - expect(eq(results_cb[i], expected[i])); + expect(eq(resultsCb[i], expected[i])); } }; "blocking polling trigger mode overlapping"_test = [] { - constexpr std::int32_t n_samples = 150000; - constexpr std::size_t n_triggers = 300; + constexpr std::int32_t kSamples = 150000; + constexpr std::size_t kTriggers = 300; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); - for (std::size_t i = 0; i < n_triggers; ++i) { + for (std::size_t i = 0; i < kTriggers; ++i) { src.tags.push_back(Tag{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); } - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const Tag &) { return trigger_match_result::Matching; }; + auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; - auto poller = data_sink_registry::instance().get_trigger_poller(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, blocking_mode::Blocking); + auto poller = DataSinkRegistry::instance().getTriggerPoller(DataSinkQuery::sinkName("test_sink"), isTrigger, 3000, 2000, BlockingMode::Blocking); expect(neq(poller, nullptr)); auto polling = std::async([poller] { - std::vector received_data; - std::vector received_tags; - bool seen_finished = false; - while (!seen_finished) { - seen_finished = poller->finished.load(); - while (poller->process([&received_data, &received_tags](const auto &datasets) { + std::vector receivedData; + std::vector receivedTags; + bool seenFinished = false; + while (!seenFinished) { + seenFinished = poller->finished.load(); + while (poller->process([&receivedData, &receivedTags](const auto &datasets) { for (const auto &dataset : datasets) { expect(eq(dataset.signal_values.size(), 5000u) >> fatal); - received_data.push_back(dataset.signal_values.front()); - received_data.push_back(dataset.signal_values.back()); + receivedData.push_back(dataset.signal_values.front()); + receivedData.push_back(dataset.signal_values.back()); expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, 3000)); - received_tags.insert(received_tags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); + receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); } })) { } } - return std::make_tuple(received_data, received_tags); + return std::make_tuple(receivedData, receivedTags); }); gr::scheduler::Simple sched{ std::move(testGraph) }; @@ -642,42 +642,42 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this - const auto &[received_data, received_tags] = polling.get(); - auto expected_start = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + const auto &[receivedData, receivedTags] = polling.get(); + auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; expect(eq(poller->drop_count.load(), 0u)); - expect(eq(received_data.size(), 2 * n_triggers) >> fatal); - expect(eq(std::vector(received_data.begin(), received_data.begin() + 5), expected_start)); - expect(eq(received_tags.size(), n_triggers)); + expect(eq(receivedData.size(), 2 * kTriggers) >> fatal); + expect(eq(std::vector(receivedData.begin(), receivedData.begin() + 5), expectedStart)); + expect(eq(receivedTags.size(), kTriggers)); }; "callback trigger mode overlapping"_test = [] { - constexpr std::int32_t n_samples = 150000; - constexpr std::size_t n_triggers = 300; + constexpr std::int32_t kSamples = 150000; + constexpr std::size_t kTriggers = 300; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); - for (std::size_t i = 0; i < n_triggers; ++i) { + for (std::size_t i = 0; i < kTriggers; ++i) { src.tags.push_back(Tag{ static_cast(60000 + i), { { "TYPE", "TRIGGER" } } }); } - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto is_trigger = [](const Tag &) { return trigger_match_result::Matching; }; + auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; std::mutex m; - std::vector received_data; + std::vector receivedData; - auto callback = [&received_data, &m](auto &&dataset) { + auto callback = [&receivedData, &m](auto &&dataset) { std::lock_guard lg{ m }; expect(eq(dataset.signal_values.size(), 5000u)); - received_data.push_back(dataset.signal_values.front()); - received_data.push_back(dataset.signal_values.back()); + receivedData.push_back(dataset.signal_values.front()); + receivedData.push_back(dataset.signal_values.back()); }; - data_sink_registry::instance().register_trigger_callback(data_sink_query::sink_name("test_sink"), is_trigger, 3000, 2000, callback); + DataSinkRegistry::instance().registerTriggerCallback(DataSinkQuery::sinkName("test_sink"), isTrigger, 3000, 2000, callback); gr::scheduler::Simple sched{ std::move(testGraph) }; sched.runAndWait(); @@ -685,40 +685,40 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this std::lock_guard lg{ m }; - auto expected_start = std::vector{ 57000, 61999, 57001, 62000, 57002 }; - expect(eq(received_data.size(), 2 * n_triggers)); - expect(eq(std::vector(received_data.begin(), received_data.begin() + 5), expected_start)); + auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; + expect(eq(receivedData.size(), 2 * kTriggers)); + expect(eq(std::vector(receivedData.begin(), receivedData.begin() + 5), expectedStart)); }; "non-blocking polling continuous mode"_test = [] { - constexpr std::int32_t n_samples = 200000; + constexpr std::int32_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", n_samples } }); - auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples } }); + auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); - auto invalid_type_poller = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink")); + auto invalid_type_poller = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink")); expect(eq(invalid_type_poller, nullptr)); - auto poller = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink")); + auto poller = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink")); expect(neq(poller, nullptr)); auto polling = std::async([poller] { expect(neq(poller, nullptr)); - std::size_t samples_seen = 0; - bool seen_finished = false; - while (!seen_finished) { + std::size_t samplesSeen = 0; + bool seenFinished = false; + while (!seenFinished) { using namespace std::chrono_literals; std::this_thread::sleep_for(20ms); - seen_finished = poller->finished.load(); - while (poller->process([&samples_seen](const auto &data) { samples_seen += data.size(); })) { + seenFinished = poller->finished.load(); + while (poller->process([&samplesSeen](const auto &data) { samplesSeen += data.size(); })) { } } - return samples_seen; + return samplesSeen; }); gr::scheduler::Simple sched{ std::move(testGraph) }; @@ -726,8 +726,8 @@ const boost::ut::suite DataSinkTests = [] { sink.stop(); // TODO the scheduler should call this - const auto samples_seen = polling.get(); - expect(eq(samples_seen + poller->drop_count, static_cast(n_samples))); + const auto samplesSeen = polling.get(); + expect(eq(samplesSeen + poller->drop_count, static_cast(kSamples))); }; };