Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_IO_REACTOR_HPP
18#define TOOLBOX_IO_REACTOR_HPP
19
20#include <toolbox/io/Epoll.hpp>
22#include <toolbox/io/Hook.hpp>
23#include <toolbox/io/Timer.hpp>
24#include <toolbox/io/Waker.hpp>
25#include <variant>
26
27namespace toolbox {
28inline namespace io {
29
30constexpr Duration NoTimeout{-1};
31enum class Priority { High = 0, Low = 1 };
32using IoSlot = BasicSlot<void(CyclTime, int, unsigned)>;
33
34enum class ReactorMode {
35 // The Reactor may block when polling if a non-zero timeout is specified.
36 // Recommended when the Reactor shares a CPU resource with other processes.
38
39 // The Reactor returns immediately if no events are pending -- i.e. no blocking occurs.
40 // Ideal when the Reactor runs on a dedicated (pinned) CPU resource and is polled
41 // continuously in a loop.
43};
44
45class TOOLBOX_API Reactor : public Waker {
46 public:
48 // HookType describes the kind of hook.
49 enum class HookType : int {
50 // EndOfCycleNoWait hooks are called at the end of the Reactor cycle.
51 // The Reactor cycle will not wait for i/o and/or timer events
52 // while any of these hooks are installed.
53 EndOfCycleNoWait = 1,
54 // EndOfEventDispatch hooks are called after all i/o and timer events have been dispatched.
55 // These hooks are called, and only if, work done in the cycle is greater than zero.
56 // And they are always called before EndOfCycleNoWait hooks.
57 EndOfEventDispatch = 2,
58 };
59 class Handle {
60 public:
61 Handle(Reactor& reactor, int fd, int sid)
62 : reactor_{&reactor}
63 , fd_{fd}
64 , sid_{sid}
65 {
66 }
67 constexpr Handle(std::nullptr_t = nullptr) noexcept {} // NOLINT(hicpp-explicit-conversions)
68 ~Handle() { reset(); }
69
70 // Copy.
71 Handle(const Handle&) = delete;
72 Handle& operator=(const Handle&) = delete;
73
74 // Move.
75 Handle(Handle&& rhs) noexcept
76 : reactor_{rhs.reactor_}
77 , fd_{rhs.fd_}
78 , sid_{rhs.sid_}
79 {
80 rhs.reactor_ = nullptr;
81 rhs.fd_ = -1;
82 rhs.sid_ = 0;
83 }
84 Handle& operator=(Handle&& rhs) noexcept
85 {
86 reset();
87 swap(rhs);
88 return *this;
89 }
90 bool empty() const noexcept { return reactor_ == nullptr; }
91 explicit operator bool() const noexcept { return reactor_ != nullptr; }
92 auto fd() const noexcept { return fd_; }
93 auto sid() const noexcept { return sid_; }
94
95 void reset(std::nullptr_t = nullptr) noexcept
96 {
97 if (reactor_) {
98 reactor_->unsubscribe(fd_, sid_);
99 reactor_ = nullptr;
100 fd_ = -1;
101 sid_ = 0;
102 }
103 }
104 void swap(Handle& rhs) noexcept
105 {
106 std::swap(reactor_, rhs.reactor_);
107 std::swap(fd_, rhs.fd_);
108 std::swap(sid_, rhs.sid_);
109 }
110
112 void set_events(unsigned events, IoSlot slot, std::error_code& ec) noexcept
113 {
114 assert(reactor_);
115 reactor_->set_events(fd_, sid_, events, slot, ec);
116 }
117 void set_events(unsigned events, IoSlot slot)
118 {
119 assert(reactor_);
120 reactor_->set_events(fd_, sid_, events, slot);
121 }
122 void set_events(unsigned events, std::error_code& ec) noexcept
123 {
124 assert(reactor_);
125 reactor_->set_events(fd_, sid_, events, ec);
126 }
127 void set_events(unsigned events)
128 {
129 assert(reactor_);
130 reactor_->set_events(fd_, sid_, events);
131 }
132
134 {
135 assert(reactor_);
136 reactor_->set_io_priority(fd_, sid_, priority);
137 }
138
139 private:
140 Reactor* reactor_{nullptr};
141 int fd_{-1}, sid_{0};
142 };
143
144 explicit Reactor(ReactorMode mode = ReactorMode::Blocking, std::size_t size_hint = 0);
145 ~Reactor() override;
146
147 // Copy.
148 Reactor(const Reactor&) = delete;
149 Reactor& operator=(const Reactor&) = delete;
150
151 // Move.
152 Reactor(Reactor&&) = delete;
154
155 // clang-format off
156 [[nodiscard]] Handle subscribe(int fd, unsigned events, IoSlot slot);
157
159 [[nodiscard]] Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
160 {
161 return tqs_[static_cast<size_t>(priority)].insert(expiry, interval, slot);
162 }
164 [[nodiscard]] Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
165 {
166 return tqs_[static_cast<size_t>(priority)].insert(expiry, slot);
167 }
168 // clang-format on
169
170 void add_hook(Hook& hook, HookType ht = HookType::EndOfCycleNoWait) noexcept
171 {
172 switch (ht) {
173 case HookType::EndOfCycleNoWait:
174 end_of_cycle_no_wait_hooks.push_back(hook);
175 break;
176 case HookType::EndOfEventDispatch:
177 end_of_event_dispatch_hooks_.push_back(hook);
178 break;
179 }
180 }
185 int poll(CyclTime now, Duration timeout = NoTimeout);
186
187 void yield();
188
189 void set_high_priority_poll_threshold(Micros thresh) { priority_io_poll_threshold = thresh; }
190
191 protected:
193 void do_wakeup() noexcept final;
194
195 private:
196 MonoTime next_expiry(MonoTime next) const;
197
198 struct BlockingDevice {
199 Epoll epoll;
200 };
201
202 struct ImmediateDevice {
203 Epoll low_prio_epoll;
204 Epoll high_prio_epoll;
205 };
206
207 struct Data {
208 int sid{};
209 unsigned events{};
210 IoSlot slot;
211 Priority priority = Priority::Low;
212 };
213
214 Epoll& get_resident_epoll(Data& data);
215 int poll_immediate(ImmediateDevice& dev,CyclTime now);
216 int poll_blocking(BlockingDevice& dev,CyclTime now, Duration timeout);
217 // dispatch events only for file descriptors with specified priority
218 int dispatch(CyclTime now, Event* buf, int size, Priority priority);
219 void set_events(int fd, int sid, unsigned events, IoSlot slot, std::error_code& ec) noexcept;
220 void set_events(int fd, int sid, unsigned events, IoSlot slot);
221 void set_events(int fd, int sid, unsigned events, std::error_code& ec) noexcept;
222 void set_events(int fd, int sid, unsigned events);
223 void unsubscribe(int fd, int sid) noexcept;
224 void set_io_priority(int fd, int sid, Priority priority) noexcept;
225
226 std::variant<BlockingDevice, ImmediateDevice> device_;
227 std::vector<Data> data_;
228 EventFd notify_{0, EFD_NONBLOCK};
229 static_assert(static_cast<int>(Priority::High) == 0);
230 static_assert(static_cast<int>(Priority::Low) == 1);
231 TimerPool tp_;
232 std::array<TimerQueue, 2> tqs_{tp_, tp_};
233 HookList end_of_cycle_no_wait_hooks, end_of_event_dispatch_hooks_;
234 Micros priority_io_poll_threshold = Micros::max();
235 WallTime last_time_priority_io_polled_{};
236 int cycle_work_{0};
237 bool currently_handling_priority_events_{false};
238};
239
240} // namespace io
241} // namespace toolbox
242
243#endif // TOOLBOX_IO_REACTOR_HPP
#define TOOLBOX_API
Definition Config.h:39
auto fd() const noexcept
Definition Reactor.hpp:92
Handle(const Handle &)=delete
constexpr Handle(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:67
void swap(Handle &rhs) noexcept
Definition Reactor.hpp:104
Handle(Reactor &reactor, int fd, int sid)
Definition Reactor.hpp:61
bool empty() const noexcept
Definition Reactor.hpp:90
Handle & operator=(const Handle &)=delete
void set_events(unsigned events, IoSlot slot)
Definition Reactor.hpp:117
void set_events(unsigned events, IoSlot slot, std::error_code &ec) noexcept
Modify I/O event subscription.
Definition Reactor.hpp:112
void set_events(unsigned events)
Definition Reactor.hpp:127
Handle & operator=(Handle &&rhs) noexcept
Definition Reactor.hpp:84
auto sid() const noexcept
Definition Reactor.hpp:93
void reset(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:95
void set_io_priority(Priority priority)
Definition Reactor.hpp:133
Handle(Handle &&rhs) noexcept
Definition Reactor.hpp:75
void set_events(unsigned events, std::error_code &ec) noexcept
Definition Reactor.hpp:122
Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:159
Reactor(Reactor &&)=delete
Reactor & operator=(Reactor &&)=delete
Reactor & operator=(const Reactor &)=delete
void add_hook(Hook &hook, HookType ht=HookType::EndOfCycleNoWait) noexcept
Definition Reactor.hpp:170
Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:164
Reactor(const Reactor &)=delete
void set_high_priority_poll_threshold(Micros thresh)
Definition Reactor.hpp:189
constexpr Duration NoTimeout
Definition Reactor.hpp:30
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
boost::intrusive::list< Hook, boost::intrusive::constant_time_size< false > > HookList
Definition Hook.hpp:39
epoll_event EpollEvent
Definition Epoll.hpp:155
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:32
std::chrono::microseconds Micros
Definition Time.hpp:37
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
constexpr auto bind() noexcept
Definition Slot.hpp:97