From f416ad4249e813bff0c9cc56076bd2e8db7ef226 Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Sat, 27 Jun 2020 17:04:24 +0300 Subject: [PATCH] utils: Add async-queue Like GAsyncQueue, but for c++ --- lib/utils/mu-async-queue.hh | 191 ++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 lib/utils/mu-async-queue.hh diff --git a/lib/utils/mu-async-queue.hh b/lib/utils/mu-async-queue.hh new file mode 100644 index 00000000..73087fad --- /dev/null +++ b/lib/utils/mu-async-queue.hh @@ -0,0 +1,191 @@ +/* +** Copyright (C) 2020 Dirk-Jan C. Binnema +** +** This program is free software; you can redistribute it and/or modify it +** under the terms of the GNU General Public License as published by the +** Free Software Foundation; either version 3, or (at your option) any +** later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software Foundation, +** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +** +*/ + +#ifndef __MU_ASYNC_QUEUE_HH__ +#define __MU_ASYNC_QUEUE_HH__ + +#include +#include +#include +#include + +namespace Mu { + +constexpr std::size_t UnlimitedAsyncQueueSize{0}; + +template > /**< allocator the items */ + +class AsyncQueue { +public: + using value_type = ItemType; + using allocator_type = Allocator; + using size_type = std::size_t; + using reference = value_type&; + using const_reference = const value_type&; + using pointer = typename std::allocator_traits::pointer; + using const_pointer = typename std::allocator_traits::const_pointer; + + using Timeout = std::chrono::steady_clock::duration; + + #define LOCKED std::unique_lock lock(m_); + + bool push (const value_type& item, Timeout timeout = {}) { + return push(std::move(value_type(item))); + } + + /** + * Push an item to the end of the queue by moving it + * + * @param item the item to move to the end of the queue + * @param timeout and optional timeout + * + * @return true if the item was pushed; false otherwise. + */ + bool push (value_type&& item, Timeout timeout = {}) { + + LOCKED; + + if (!unlimited()) { + const auto rv = cv_full_.wait_for(lock, timeout,[&](){ + return !full_unlocked();}) && !full_unlocked(); + if (!rv) + return false; + } + + q_.emplace_back(std::move(item)); + lock.unlock(); + + cv_empty_.notify_one(); + return true; + + } + + /** + * Pop an item from the queue + * + * @param receives the value if the funtion returns true + * @param timeout optional time to wait for an item to become available + * + * @return true if an item was popped (into val), false otherwise. + */ + bool pop (value_type& val, Timeout timeout = {}) { + + LOCKED; + + if (timeout != Timeout{}) { + const auto rv = cv_empty_.wait_for(lock, timeout,[&](){ + return !q_.empty(); }) && !q_.empty(); + if (!rv) + return false; + + } else if (q_.empty()) + return false; + + val = std::move(q_.front()); + q_.pop_front(); + lock.unlock(); + cv_full_.notify_one(); + + return true; + } + + /** + * Clear the queue + * + */ + void clear() { + LOCKED; + q_.clear(); + lock.unlock(); + cv_full_.notify_one(); + } + + /** + * Size of the queue + * + * + * @return the size + */ + size_type size() const { + LOCKED; + return q_.size(); + } + + /** + * Maximum size of the queue if specified through the template + * parameter; otherwise the (theoretical) max_size of the inner + * container. + * + * @return the maximum size + */ + size_type max_size() const { + if (unlimited()) + return q_.max_size(); + else + return MaxSize; + } + + /** + * Is the queue empty? + * + * @return true or false + */ + bool empty() const { + LOCKED; + return q_.empty(); + } + + /** + * Is the queue full? Returns false unless a maximum size was specified + * (as a template argument) + * + * @return true or false. + */ + bool full() const { + if (unlimited()) + return false; + + LOCKED; + return full_unlocked(); + } + + /** + * Is this queue (theoretically) unlimited in size? + * + * @return true or false + */ + constexpr static bool unlimited() { + return MaxSize == UnlimitedAsyncQueueSize; + } + +private: + bool full_unlocked() const { + return q_.size() >= max_size(); + } + + std::deque q_; + mutable std::mutex m_; + std::condition_variable cv_full_, cv_empty_; +}; + +} // namespace mu + +#endif /* __MU_ASYNC_QUEUE_HH__ */