From 1b71a166a9159146cc4d4f52dd7af8330fab5bbd Mon Sep 17 00:00:00 2001 From: fpagliughi Date: Wed, 10 Jul 2024 13:26:33 -0400 Subject: [PATCH] Made thread queue closable --- include/mqtt/thread_queue.h | 114 ++++++++++++++++++++++++++++---- src/async_client.cpp | 42 ++++++------ test/unit/test_ssl_options.cpp | 4 +- test/unit/test_thread_queue.cpp | 79 +++++++++++++++------- 4 files changed, 181 insertions(+), 58 deletions(-) diff --git a/include/mqtt/thread_queue.h b/include/mqtt/thread_queue.h index 4b7828f1..1f3ac0c8 100644 --- a/include/mqtt/thread_queue.h +++ b/include/mqtt/thread_queue.h @@ -35,6 +35,16 @@ namespace mqtt { +/** + * Exception that is thrown when operations are performed on a closed + * queue. + */ +class queue_closed : public std::runtime_error +{ +public: + queue_closed() : std::runtime_error("queue is closed") {} +}; + ///////////////////////////////////////////////////////////////////////////// /** @@ -53,6 +63,12 @@ namespace mqtt { * queue will block until the number of items are removed from the queue to * bring the size below the new capacity. * @par + * The queue can be closed. After that, no new items can be placed into it; + * a `put()` calls will fail. Receivers can still continue to get any items + * out of the queue that were added before it was closed. Once there are no + * more items left in the queue after it is closed, it is considered "done". + * Nothing useful can be done with the queue. + * @par * Note that the queue uses move semantics to place items into the queue and * remove items from the queue. This means that the type, T, of the data * held by the queue only needs to follow move semantics; not copy @@ -87,7 +103,10 @@ class thread_queue /** Condition gets signaled then item removed from full queue */ std::condition_variable notFullCond_; /** The capacity of the queue */ - size_type cap_; + size_type cap_{MAX_CAPACITY}; + /** Whether the queue is closed */ + bool closed_{false}; + /** The actual STL container to hold data */ std::queue que_; @@ -96,13 +115,25 @@ class thread_queue /** General purpose guard */ using unique_guard = std::unique_lock; + /** Throw an excpetion if the queue is closed. */ + void check_closed() { + if (closed_) throw queue_closed{}; + } + + /** Throw an excpetion if the queue is done. */ + void check_done() { + if (closed_ && que_.empty()) throw queue_closed{}; + } + public: /** * Constructs a queue with the maximum capacity. + * This is effectively an unbounded queue. */ - thread_queue() : cap_(MAX_CAPACITY) {} + thread_queue() {} /** * Constructs a queue with the specified capacity. + * This is a bounded queue. * @param cap The maximum number of items that can be placed in the * queue. The minimum capacity is 1. */ @@ -113,7 +144,7 @@ class thread_queue * there are any items in the queue. */ bool empty() const { - guard g(lock_); + guard g{lock_}; return que_.empty(); } /** @@ -121,7 +152,7 @@ class thread_queue * @return The maximum number of elements before the queue is full. */ size_type capacity() const { - guard g(lock_); + guard g{lock_}; return cap_; } /** @@ -142,6 +173,46 @@ class thread_queue guard g(lock_); return que_.size(); } + /** + * Close the queue. + * Once closed, the queue will not accept any new items, but receievers + * will still be able to get any remaining items out of the queue until + * it is empty. + */ + void close() { + guard g{lock_}; + closed_ = true; + } + /* + void close(value_type finalVal) { + unique_guard g(lock_); + if (closed_) return; + que_.emplace(std::move(finalVal)); + g.unlock(); + notEmptyCond_.notify_one(); + } + */ + /** + * Determines if the queue is closed. + * Once closed, the queue will not accept any new items, but receievers + * will still be able to get any remaining items out of the queue until + * it is empty. + * @return @em true if the queue is closed, @false otherwise. + */ + bool closed() const { + guard g{lock_}; + return closed_; + } + /** + * Determines if all possible operations are done on the queue. If the + * queue is closed and empty, then no further useful operations can be + * done on it. + * @return @true if the queue is closed and empty, @em false otherwise. + */ + bool done() const { + guard g{lock_}; + return closed_ && que_.empty(); + } /** * Put an item into the queue. * If the queue is full, this will block the caller until items are @@ -150,8 +221,9 @@ class thread_queue */ void put(value_type val) { unique_guard g(lock_); - notFullCond_.wait(g, [this] { return que_.size() < cap_; }); + notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; }); + check_closed(); que_.emplace(std::move(val)); g.unlock(); notEmptyCond_.notify_one(); @@ -164,6 +236,7 @@ class thread_queue */ bool try_put(value_type val) { unique_guard g(lock_); + check_closed(); if (que_.size() >= cap_) return false; @@ -184,7 +257,10 @@ class thread_queue template bool try_put_for(value_type val, const std::chrono::duration& relTime) { unique_guard g(lock_); - if (!notFullCond_.wait_for(g, relTime, [this] { return que_.size() < cap_; })) + bool to = !notFullCond_.wait_for(g, relTime, + [this] { return que_.size() < cap_ || closed_; }); + check_closed(); + if (to) return false; que_.emplace(std::move(val)); @@ -207,7 +283,11 @@ class thread_queue value_type val, const std::chrono::time_point& absTime ) { unique_guard g(lock_); - if (!notFullCond_.wait_until(g, absTime, [this] { return que_.size() < cap_; })) + bool to = !notFullCond_.wait_until(g, absTime, + [this] { return que_.size() < cap_ || closed_; }); + + check_closed(); + if (to) return false; que_.emplace(std::move(val)); @@ -226,7 +306,8 @@ class thread_queue return; unique_guard g(lock_); - notEmptyCond_.wait(g, [this] { return !que_.empty(); }); + notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; }); + check_done(); *val = std::move(que_.front()); que_.pop(); @@ -241,7 +322,8 @@ class thread_queue */ value_type get() { unique_guard g(lock_); - notEmptyCond_.wait(g, [this] { return !que_.empty(); }); + notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; }); + check_done(); value_type val = std::move(que_.front()); que_.pop(); @@ -262,8 +344,11 @@ class thread_queue return false; unique_guard g(lock_); - if (que_.empty()) + if (que_.empty()) { + if (closed_) + throw queue_closed{}; return false; + } *val = std::move(que_.front()); que_.pop(); @@ -287,7 +372,10 @@ class thread_queue return false; unique_guard g(lock_); - if (!notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty(); })) + bool to = !notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; }); + + check_done(); + if (to) return false; *val = std::move(que_.front()); @@ -314,7 +402,9 @@ class thread_queue return false; unique_guard g(lock_); - if (!notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); })) + bool to = !notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); }); + check_done(); + if (to) return false; *val = std::move(que_.front()); diff --git a/src/async_client.cpp b/src/async_client.cpp index 8c47be84..bc4fc260 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -876,37 +876,37 @@ void async_client::stop_consuming() const_message_ptr async_client::consume_message() { - // For backward compatibility we ignore the 'connected' events, - // whereas disconnected/lost return an empty pointer. - while (true) { - auto evt = que_->get(); + // For backward compatibility we ignore the 'connected' events, + // whereas disconnected/lost return an empty pointer. + while (true) { + auto evt = que_->get(); - if (const auto* pval = std::get_if(&evt)) - return *pval; + if (const auto* pval = std::get_if(&evt)) + return *pval; - if (!std::holds_alternative(evt)) - return const_message_ptr{}; - } + if (!std::holds_alternative(evt)) + return const_message_ptr{}; + } } bool async_client::try_consume_message(const_message_ptr* msg) { event_type evt; - while (true) { - if (!que_->try_get(&evt)) - return false; + while (true) { + if (!que_->try_get(&evt)) + return false; - if (const auto* pval = std::get_if(&evt)) { - *msg = std::move(*pval); - break; - } + if (const auto* pval = std::get_if(&evt)) { + *msg = std::move(*pval); + break; + } - if (!std::holds_alternative(evt)) { - *msg = const_message_ptr{}; - break; - } - } + if (!std::holds_alternative(evt)) { + *msg = const_message_ptr{}; + break; + } + } return true; } diff --git a/test/unit/test_ssl_options.cpp b/test/unit/test_ssl_options.cpp index fad17fc1..ff5f740a 100644 --- a/test/unit/test_ssl_options.cpp +++ b/test/unit/test_ssl_options.cpp @@ -402,7 +402,7 @@ TEST_CASE("ssl_options test error handler", "[options]") { mqtt::ssl_options opts{orgOpts}; - orgOpts.set_error_handler([](const std::string& msg) { - std::cerr << "SSL Error: " << msg << std::endl; + orgOpts.set_error_handler([](const std::string& msg) { + std::cerr << "SSL Error: " << msg << std::endl; }); } diff --git a/test/unit/test_thread_queue.cpp b/test/unit/test_thread_queue.cpp index a0c5dbae..efb2d1fe 100644 --- a/test/unit/test_thread_queue.cpp +++ b/test/unit/test_thread_queue.cpp @@ -4,7 +4,7 @@ // /******************************************************************************* - * Copyright (c) 2022 Frank Pagliughi + * Copyright (c) 2022-2024 Frank Pagliughi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 @@ -33,7 +33,7 @@ using namespace mqtt; using namespace std::chrono; -TEST_CASE("que put/get", "[thread_queue]") +TEST_CASE("thread_queue put/get", "[thread_queue]") { thread_queue que; @@ -46,21 +46,21 @@ TEST_CASE("que put/get", "[thread_queue]") REQUIRE(que.get() == 3); } -TEST_CASE("que tryget", "[thread_queue]") +TEST_CASE("thread_queue tryget", "[thread_queue]") { thread_queue que; - int n; + int n; - // try_get's should fail on empty queue + // try_get's should fail on empty queue REQUIRE(!que.try_get(&n)); REQUIRE(!que.try_get_for(&n, 5ms)); - auto timeout = steady_clock::now() + 15ms; - REQUIRE(!que.try_get_until(&n, timeout)); + auto timeout = steady_clock::now() + 15ms; + REQUIRE(!que.try_get_until(&n, timeout)); que.put(1); que.put(2); - REQUIRE(que.try_get(&n)); + REQUIRE(que.try_get(&n)); REQUIRE(n == 1); que.put(3); @@ -69,33 +69,33 @@ TEST_CASE("que tryget", "[thread_queue]") REQUIRE(que.try_get(&n)); REQUIRE(n == 3); - // Empty now. Try should fail and leave 'n' unchanged - REQUIRE(!que.try_get(&n)); + // Empty now. Try should fail and leave 'n' unchanged + REQUIRE(!que.try_get(&n)); REQUIRE(n == 3); } -TEST_CASE("que mt put/get", "[thread_queue]") +TEST_CASE("thread_queue mt put/get", "[thread_queue]") { thread_queue que; - const size_t N = 1000000; + const size_t N = 100000; const size_t N_THR = 2; auto producer = [&que, &N]() { string s; for (size_t i = 0; i < 512; ++i) { - s.push_back('a' + i % 26); - } + s.push_back('a' + i % 26); + } for (size_t i = 0; i < N; ++i) { - que.put(s); - } + que.put(s); + } }; auto consumer = [&que, &N]() { string s; bool ok = true; for (size_t i = 0; i < N && ok; ++i) { - ok = que.try_get_for(&s, seconds{1}); + ok = que.try_get_for(&s, 250ms); } return ok; }; @@ -104,18 +104,51 @@ TEST_CASE("que mt put/get", "[thread_queue]") std::vector> consumers; for (size_t i = 0; i < N_THR; ++i) { - producers.push_back(std::thread(producer)); - } + producers.push_back(std::thread(producer)); + } for (size_t i = 0; i < N_THR; ++i) { - consumers.push_back(std::async(consumer)); - } + consumers.push_back(std::async(consumer)); + } for (size_t i = 0; i < N_THR; ++i) { - producers[i].join(); - } + producers[i].join(); + } for (size_t i = 0; i < N_THR; ++i) { REQUIRE(consumers[i].get()); } } + +TEST_CASE("thread_queue close", "[thread_queue]") +{ + thread_queue que; + REQUIRE(!que.closed()); + + que.put(1); + que.put(2); + que.close(); + + // Queue is closed. Shouldn't accept any new items. + REQUIRE(que.closed()); + REQUIRE(que.size() == 2); + + REQUIRE_THROWS_AS(que.put(3), queue_closed); + REQUIRE_THROWS_AS(que.try_put(3), queue_closed); + REQUIRE_THROWS_AS(que.try_put_for(3, 10ms), queue_closed); + REQUIRE_THROWS_AS(que.try_put_until(3, steady_clock::now() + 10ms), queue_closed); + + // But can get any items already in there. + REQUIRE(que.get() == 1); + REQUIRE(que.get() == 2); + + // When done (closed and empty), should throw on a get() + REQUIRE(que.empty()); + REQUIRE(que.done()); + + int n; + REQUIRE_THROWS_AS(que.get(), queue_closed); + REQUIRE_THROWS_AS(que.try_get(&n), queue_closed); + REQUIRE_THROWS_AS(que.try_get_for(&n, 10ms), queue_closed); + REQUIRE_THROWS_AS(que.try_get_until(&n, steady_clock::now() + 10ms), queue_closed); +}