Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
TaskQueue.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_UTIL_TASKQUEUE_HPP
18#define TOOLBOX_UTIL_TASKQUEUE_HPP
19
20#include <condition_variable>
21#include <vector>
22
23namespace toolbox {
24inline namespace util {
25
27template <typename TaskT>
29 using Lock = std::unique_lock<std::mutex>;
30
31 public:
32 TaskQueue() = default;
33 ~TaskQueue() = default;
34
35 // Copy.
36 TaskQueue(const TaskQueue&) = delete;
37 TaskQueue& operator=(const TaskQueue&) = delete;
38
39 // Move.
40 TaskQueue(TaskQueue&&) = delete;
42
46 template <typename FnT>
47 bool run(FnT fn)
48 {
49 TaskT task{};
50
51 Lock lock{mutex_};
52 // Predicate returns false if the waiting should be continued.
53 const auto pred = [this] { return i_ < queue_.size() || stop_; };
54 cond_.wait(lock, pred);
55 if (queue_.empty() && stop_) {
56 return false;
57 }
58 task = std::move(queue_[i_++]);
59 // Clear the queue when the read index reaches the end of the queue.
60 if (i_ == queue_.size()) {
61 i_ = 0;
62 queue_.clear();
63 }
64 lock.unlock();
65
66 fn(std::move(task));
67 return true;
68 }
70 void stop()
71 {
72 Lock lock{mutex_};
73 stop_ = true;
74 // Unlock mutex before waking-up to avoid contention.
75 lock.unlock();
76 cond_.notify_all();
77 }
80 {
81 Lock lock{mutex_};
82 i_ = 0;
83 queue_.clear();
84 }
86 bool push(TaskT&& task) noexcept
87 {
88 Lock lock{mutex_};
89 if (stop_) {
90 return false;
91 }
92 queue_.push_back(std::move(task));
93 lock.unlock();
94 cond_.notify_one();
95 return true;
96 }
97
98 private:
99 mutable std::mutex mutex_;
100 std::condition_variable cond_;
102 std::size_t i_{0};
103 std::vector<TaskT> queue_;
104 bool stop_{false};
105};
106
107} // namespace util
108} // namespace toolbox
109
110#endif // TOOLBOX_UTIL_TASKQUEUE_HPP
A vector-based task queue for use in multi-threaded, producer-consumer components.
Definition TaskQueue.hpp:28
TaskQueue & operator=(const TaskQueue &)=delete
TaskQueue & operator=(TaskQueue &&)=delete
void stop()
Interrupt and exit any inprogress call to run().
Definition TaskQueue.hpp:70
TaskQueue(TaskQueue &&)=delete
void clear() noexcept
Clear task queue.
Definition TaskQueue.hpp:79
bool push(TaskT &&task) noexcept
Push task onto the task queue.
Definition TaskQueue.hpp:86
TaskQueue(const TaskQueue &)=delete
constexpr auto bind() noexcept
Definition Slot.hpp:92