Skip to content

Commit

Permalink
changed node:work() signature to {requested, performed, status} work(…
Browse files Browse the repository at this point in the history
…size requested) {..}

Signed-off-by: rstein <[email protected]>
  • Loading branch information
RalphSteinhagen authored and wirew0rm committed Jul 11, 2023
1 parent 4ff3b78 commit 81c8e8e
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 194 deletions.
36 changes: 18 additions & 18 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::IN<T, 0, N_MAX, "i
}
}

[[nodiscard]] constexpr fg::work_return_t
[[nodiscard]] constexpr fg::work_return_status_t
process_bulk(std::span<const T> input, std::span<T> output) const noexcept {
// classic for-loop
for (std::size_t i = 0; i < input.size(); i++) {
Expand All @@ -95,7 +95,7 @@ class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::IN<T, 0, N_MAX, "i
// C++20 ranges
// std::ranges::transform(input, output.begin(), [this](const T& elem) { return process_one(elem); });

return fg::work_return_t::OK;
return fg::work_return_status_t::OK;
}
};

Expand Down Expand Up @@ -187,7 +187,7 @@ class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::IN<T,
explicit gen_operation_SIMD(T value, std::string name_ = fair::graph::this_source_location()) : _value(value) { this->name = name_; }

fair::graph::work_return_t
work() noexcept {
work(std::size_t requested_work) noexcept {
auto &out_port = output_port<0>(this);
auto &in_port = input_port<0>(this);

Expand All @@ -196,9 +196,9 @@ class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::IN<T,
const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
const auto n_writable = std::min(writer.available(), out_port.max_buffer_size());
if (n_readable == 0) {
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
return { requested_work, 0UL, fair::graph::work_return_status_t::INSUFFICIENT_INPUT_ITEMS };
} else if (n_writable == 0) {
return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
return { requested_work, 0UL, fair::graph::work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS };
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);

Expand Down Expand Up @@ -247,9 +247,9 @@ class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::IN<T,
n_to_publish);

if (!reader.consume(n_to_publish)) {
return fair::graph::work_return_t::ERROR;
return { requested_work, n_to_publish, fair::graph::work_return_status_t::ERROR };
}
return fair::graph::work_return_t::OK;
return { requested_work, n_to_publish, fair::graph::work_return_status_t::OK };
}
};

Expand Down Expand Up @@ -278,7 +278,7 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
}

fair::graph::work_return_t
work() noexcept { // TODO - make this an alternate version to 'process_one'
work(std::size_t requested_work) noexcept { // TODO - make this an alternate version to 'process_one'
auto &out_port = out;
auto &in_port = in;

Expand All @@ -287,9 +287,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
const auto n_writable = std::min(writer.available(), out_port.max_buffer_size());
if (n_readable == 0) {
return fair::graph::work_return_t::DONE;
return { requested_work, 0UL, fair::graph::work_return_status_t::DONE };
} else if (n_writable == 0) {
return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
return { requested_work, 0UL, fair::graph::work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS };
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);

Expand All @@ -308,9 +308,9 @@ class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memco
n_to_publish);
}
if (!reader.consume(n_to_publish)) {
return fair::graph::work_return_t::ERROR;
return { requested_work, 0UL, fair::graph::work_return_status_t::ERROR };
}
return fair::graph::work_return_t::OK;
return { requested_work, 0UL, fair::graph::work_return_status_t::OK };
}
};

Expand Down Expand Up @@ -339,7 +339,7 @@ class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_
constexpr static std::size_t simd_size = std::max(from_simd_size, to_simd_size);

public:
fair::graph::work_return_t
fair::graph::work_return_status_t
work() noexcept {
using namespace stdx;
auto &out_port = output_port<"out">(this);
Expand All @@ -350,9 +350,9 @@ class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_
const auto n_readable = std::min(reader.available(), in_port.max_buffer_size());
const auto n_writable = std::min(writer.available(), out_port.max_buffer_size());
if (n_readable < to_simd_size) {
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
return fair::graph::work_return_status_t::INSUFFICIENT_INPUT_ITEMS;
} else if (n_writable < from_simd_size) {
return fair::graph::work_return_t::INSUFFICIENT_OUTPUT_ITEMS;
return fair::graph::work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS;
}
const auto n_readable_scalars = n_readable * from_simd_size;
const auto n_writable_scalars = n_writable * to_simd_size;
Expand All @@ -361,7 +361,7 @@ class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_
const auto objects_to_write = stdx::is_simd_v<To> ? n_simd_to_convert : scalars_to_convert;
const auto objects_to_read = stdx::is_simd_v<From> ? n_simd_to_convert : scalars_to_convert;

auto return_value = fair::graph::work_return_t::OK;
auto return_value = fair::graph::work_return_status_t::OK;
writer.publish( //
[&](std::span<To> output) {
const auto input = reader.get();
Expand All @@ -377,7 +377,7 @@ class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_
}
}
if (!reader.consume(objects_to_read)) {
return_value = fair::graph::work_return_t::ERROR;
return_value = fair::graph::work_return_status_t::ERROR;
return;
}
},
Expand Down Expand Up @@ -413,7 +413,7 @@ loop_over_work(auto &node) {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
while (test::n_samples_consumed < N_SAMPLES) {
node.work();
std::ignore = node.work(std::numeric_limits<std::size_t>::max());
}
expect(eq(test::n_samples_produced, N_SAMPLES)) << "produced too many/few samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "consumed too many/few samples";
Expand Down
10 changes: 5 additions & 5 deletions bench/bm_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ loop_over_work(auto &node) {
test::n_samples_produced = 0LU;
test::n_samples_consumed = 0LU;
while (test::n_samples_consumed < N_SAMPLES) {
node.work();
std::ignore = node.work(std::numeric_limits<std::size_t>::max());
}
expect(eq(test::n_samples_produced, N_SAMPLES)) << "produced too many/few samples";
expect(eq(test::n_samples_consumed, N_SAMPLES)) << "consumed too many/few samples";
Expand Down Expand Up @@ -92,7 +92,7 @@ inline const boost::ut::suite _constexpr_bm = [] {

expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink)));

fair::graph::scheduler::simple sched{std::move(flow_graph)};
fair::graph::scheduler::simple sched{ std::move(flow_graph) };

"runtime src->sink overhead"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() { invoke_work(sched); };
}
Expand All @@ -106,7 +106,7 @@ inline const boost::ut::suite _constexpr_bm = [] {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source<float>::out).to<"in">(filter)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink<float>::in)));

fair::graph::scheduler::simple sched{std::move(flow_graph)};
fair::graph::scheduler::simple sched{ std::move(flow_graph) };

"runtime src->fir_filter->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() { invoke_work(sched); };
}
Expand All @@ -120,7 +120,7 @@ inline const boost::ut::suite _constexpr_bm = [] {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source<float>::out).to<"in">(filter)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink<float>::in)));

fair::graph::scheduler::simple sched{std::move(flow_graph)};
fair::graph::scheduler::simple sched{ std::move(flow_graph) };

"runtime src->iir_filter->sink - direct-form I"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() { invoke_work(sched); };
}
Expand All @@ -134,7 +134,7 @@ inline const boost::ut::suite _constexpr_bm = [] {
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect(src, &::test::source<float>::out).to<"in">(filter)));
expect(eq(fg::connection_result_t::SUCCESS, flow_graph.connect<"out">(filter).to(sink, &::test::sink<float>::in)));

fair::graph::scheduler::simple sched{std::move(flow_graph)};
fair::graph::scheduler::simple sched{ std::move(flow_graph) };

"runtime src->iir_filter->sink - direct-form II"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&sched]() { invoke_work(sched); };
}
Expand Down
14 changes: 7 additions & 7 deletions bench/bm_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ inline static std::size_t n_samples_produced = 0_UZ;
template<typename T, std::size_t min = 0_UZ, std::size_t count = N_MAX, bool use_bulk_operation = true>
class source : public fg::node<source<T, min, count>> {
public:
uint64_t _n_samples_max;
uint64_t _n_samples_max;
std::size_t _n_tag_offset;
fg::OUT<T> out;

Expand All @@ -34,7 +34,7 @@ class source : public fg::node<source<T, min, count>> {
[[nodiscard]] constexpr auto
process_one_simd(auto N) const noexcept -> fair::meta::simdize<T, decltype(N)::value> {
n_samples_produced += N;
fair::meta::simdize<T, N> x {};
fair::meta::simdize<T, N> x{};
benchmark::force_to_memory(x);
return x;
}
Expand All @@ -48,7 +48,7 @@ class source : public fg::node<source<T, min, count>> {
}

fair::graph::work_return_t
work() {
work(std::size_t requested_work) {
const std::size_t n_to_publish = _n_samples_max - n_samples_produced;
if (n_to_publish > 0) {
auto &port = out;
Expand All @@ -60,7 +60,7 @@ class source : public fg::node<source<T, min, count>> {
if constexpr (use_bulk_operation) {
std::size_t n_write = std::clamp(n_to_publish, 0UL, std::min(writer.available(), port.max_buffer_size()));
if (n_write == 0_UZ) {
return fair::graph::work_return_t::INSUFFICIENT_INPUT_ITEMS;
return { requested_work, 0_UZ, fair::graph::work_return_status_t::INSUFFICIENT_INPUT_ITEMS };
}

writer.publish( //
Expand All @@ -73,14 +73,14 @@ class source : public fg::node<source<T, min, count>> {
} else {
auto [data, token] = writer.get(1);
if (data.size() == 0_UZ) {
return fair::graph::work_return_t::ERROR;
return { requested_work, 0_UZ, fair::graph::work_return_status_t::ERROR };
}
data[0] = process_one();
writer.publish(token, 1);
}
return fair::graph::work_return_t::OK;
return { requested_work, 1_UZ, fair::graph::work_return_status_t::OK };
} else {
return fair::graph::work_return_t::DONE;
return { requested_work, 0_UZ, fair::graph::work_return_status_t::DONE };
}
}
};
Expand Down
6 changes: 3 additions & 3 deletions include/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ class data_sink : public node<data_sink<T>> {
}
}

[[nodiscard]] work_return_t
[[nodiscard]] work_return_status_t
process_bulk(std::span<const T> in_data) noexcept {
std::optional<property_map> tagData;
if (this->input_tags_present()) {
Expand All @@ -499,7 +499,7 @@ class data_sink : public node<data_sink<T>> {
}
}

return work_return_t::OK;
return work_return_status_t::OK;
}

private:
Expand Down Expand Up @@ -561,7 +561,7 @@ class data_sink : public node<data_sink<T>> {

void
ensure_history_size(std::size_t new_size) {
const auto old_size = _history ? _history->capacity() : std::size_t{0};
const auto old_size = _history ? _history->capacity() : std::size_t{ 0 };
if (new_size <= old_size) {
return;
}
Expand Down
30 changes: 18 additions & 12 deletions include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ class node_model {
= 0;

[[nodiscard]] virtual work_return_t
work() = 0;
work(std::size_t requested_work)
= 0;

[[nodiscard]] virtual void *
raw() = 0;
Expand Down Expand Up @@ -526,8 +527,8 @@ class node_wrapper : public node_model {
}

[[nodiscard]] constexpr work_return_t
work() override {
return node_ref().work();
work(std::size_t requested_work = std::numeric_limits<std::size_t>::max()) override {
return node_ref().work(requested_work);
}

[[nodiscard]] std::string_view
Expand Down Expand Up @@ -645,9 +646,9 @@ class edge {

class graph {
private:
std::vector<std::function<connection_result_t(graph&)>> _connection_definitions;
std::vector<std::unique_ptr<node_model>> _nodes;
std::vector<edge> _edges;
std::vector<std::function<connection_result_t(graph &)>> _connection_definitions;
std::vector<std::unique_ptr<node_model>> _nodes;
std::vector<edge> _edges;

template<typename Node>
std::unique_ptr<node_model> &
Expand Down Expand Up @@ -777,11 +778,16 @@ class graph {
connect(Source &source, Port Source::*member_ptr);

public:
graph(graph&) = delete;
graph(graph&&) = default;
graph() = default;
graph &operator=(graph&) = delete;
graph &operator=(graph&&) = default;
graph(graph &) = delete;
graph(graph &&) = default;
graph() = default;
graph &
operator=(graph &)
= delete;
graph &
operator=(graph &&)
= default;

/**
* @return a list of all blocks contained in this graph
* N.B. some 'blocks' may be (sub-)graphs themselves
Expand Down Expand Up @@ -869,7 +875,7 @@ class graph {
return result;
}

const std::vector<std::function<connection_result_t(graph&)>> &
const std::vector<std::function<connection_result_t(graph &)>> &
connection_definitions() {
return _connection_definitions;
}
Expand Down
Loading

0 comments on commit 81c8e8e

Please sign in to comment.