Toolbox snapshot
The Reactive C++ Toolbox
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages Concepts
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28
29int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
30{
31 int work_done = 0;
32 if (idle_cycle) {
33 work_done = tq.dispatch(now, 1);
34 }
35 else if (!tq.empty()) {
36 // actively execute low priority timers if they've been delayed by 100ms or more.
37 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
38 work_done = tq.dispatch(now, 1);
39 }
40 }
41 return work_done;
42}
43} // namespace
44
46{
47 const auto notify = notify_.fd();
48 data_.resize(max<size_t>(notify + 1, size_hint));
49 epoll_.add(notify, 0, EpollIn);
50}
51
53{
54 epoll_.del(notify_.fd());
55}
56
57Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
58{
59 assert(fd >= 0);
60 assert(slot);
61 if (fd >= static_cast<int>(data_.size())) {
62 data_.resize(fd + 1);
63 }
64 auto& ref = data_[fd];
65 epoll_.add(fd, ++ref.sid, events);
66 ref.events = events;
67 ref.slot = slot;
68 ref.priority = Priority::Low;
69 return {*this, fd, ref.sid};
70}
71
73{
74 enum { High = 0, Low = 1 };
75 using namespace chrono;
76
77 // If timeout is zero then the wait_until time should also be zero to signify no wait.
79 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
80 const MonoTime next
81 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
82 if (next > now.mono_time()) {
83 wait_until = next;
84 }
85 }
86 // TODO: consider using a dynamic buffer that scales with increased demand.
88
89 int n;
90 error_code ec;
91 if (wait_until < MonoClock::max()) {
92 // The wait function will not block if time is zero.
93 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
94 } else {
95 // Block indefinitely.
96 n = epoll_.wait(buf, MaxEvents, ec);
97 }
98 // Update cycle time after epoll() returns.
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.wall_time();
101
102 if (ec) {
103 if (ec.value() != EINTR) {
104 throw system_error{ec};
105 }
106 return 0;
107 }
108 cycle_work_ = 0;
109 TOOLBOX_PROBE_SCOPED(reactor, dispatch, cycle_work_);
110 // High priority timers.
111 cycle_work_ = tqs_[High].dispatch(now);
112 // I/O events.
113 cycle_work_ += dispatch(now, buf, n, Priority::High);
114 cycle_work_ += dispatch(now, buf, n, Priority::Low);
115 // Low priority timers (typically only dispatched during empty cycles).
116 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
117 // End of cycle hooks.
118 if (cycle_work_ > 0) {
119 io::dispatch(now, end_of_event_dispatch_hooks_);
120 }
121 io::dispatch(now, end_of_cycle_no_wait_hooks);
122 return cycle_work_;
123}
124
126{
127 // Best effort.
128 std::error_code ec;
129 notify_.write(1, ec);
130}
131
132MonoTime Reactor::next_expiry(MonoTime next) const
133{
134 enum { High = 0, Low = 1 };
135 using namespace chrono;
136 {
137 const auto& tq = tqs_[High];
138 if (!tq.empty()) {
139 // Duration until next expiry. Mitigate scheduler latency by preempting the
140 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
141 next = min(next, tq.front().expiry() - 200us);
142 }
143 }
144 {
145 const auto& tq = tqs_[Low];
146 if (!tq.empty()) {
147 // Duration until next expiry.
148 next = min(next, tq.front().expiry());
149 }
150 }
151 return next;
152}
153
155{
156 if (currently_handling_priority_events_) [[unlikely]] {
157 return;
158 }
159
160 if (priority_io_poll_threshold == Micros::max()) {
161 return;
162 }
163
164 WallTime now = WallClock::now();
165 if (now - last_time_priority_io_polled_ > priority_io_poll_threshold) {
166 last_time_priority_io_polled_ = now;
167
168 error_code ec;
170 int n = high_prio_epoll_.wait(buf, MaxEvents, MonoTime{}, ec);
171
172 if (ec) {
173 if (ec.value() != EINTR) {
174 throw system_error{ec};
175 }
176 return;
177 }
178
179 cycle_work_ += dispatch(CyclTime::current(), buf, n, Priority::High);
180 }
181}
182
183int Reactor::dispatch(CyclTime now, Event* buf, int size, Priority priority)
184{
185 if (priority == Priority::High) {
186 assert(!currently_handling_priority_events_);
187 currently_handling_priority_events_ = true;
188 }
189 const auto reset_flag = make_finally([this]() noexcept {
190 currently_handling_priority_events_ = false;
191 });
192
193 int work{0};
194 for (int i{0}; i < size; ++i) {
195
196 auto& ev = buf[i];
197 const auto fd = epoll_.fd(ev);
198 const auto& ref = data_[fd];
199
200 if (ref.priority != priority) {
201 continue;
202 }
203
204 if (fd == notify_.fd()) {
205 notify_.read();
206 continue;
207 }
208
209 if (!ref.slot) {
210 // Ignore timerfd.
211 continue;
212 }
213
214 const auto sid = epoll_.sid(ev);
215 // Skip this socket if it was modified after the call to wait().
216 if (ref.sid > sid) {
217 continue;
218 }
219 // Apply the interest events to filter-out any events that the user may have removed from
220 // the events since the call to wait() was made. This would typically happen via a reentrant
221 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
222 // reported if they occur, regardless of whether they are specified in events.
223 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
224 if (!events) {
225 continue;
226 }
227
228 try {
229 ref.slot(now, fd, events);
230 } catch (const std::exception& e) {
231 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
232 }
233 ++work;
234 }
235 return work;
236}
237
238void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
239{
240 auto& ref = data_[fd];
241 if (ref.sid == sid) {
242 if (ref.events != events) {
243 epoll_.mod(fd, sid, events, ec);
244 if (ec) {
245 return;
246 }
247 if (ref.priority == Priority::High) {
248 high_prio_epoll_.mod(fd, sid, events);
249 }
250 ref.events = events;
251 }
252 ref.slot = slot;
253 }
254}
255
256void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
257{
258 auto& ref = data_[fd];
259 if (ref.sid == sid) {
260 if (ref.events != events) {
261 epoll_.mod(fd, sid, events);
262 if (ref.priority == Priority::High) {
263 high_prio_epoll_.mod(fd, sid, events);
264 }
265 ref.events = events;
266 }
267 ref.slot = slot;
268 }
269}
270
271void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
272{
273 auto& ref = data_[fd];
274 if (ref.sid == sid && ref.events != events) {
275 epoll_.mod(fd, sid, events, ec);
276 if (ec) {
277 return;
278 }
279 if (ref.priority == Priority::High) {
280 high_prio_epoll_.mod(fd, sid, events);
281 }
282 ref.events = events;
283 }
284}
285
286void Reactor::set_events(int fd, int sid, unsigned events)
287{
288 auto& ref = data_[fd];
289 if (ref.sid == sid && ref.events != events) {
290 epoll_.mod(fd, sid, events);
291 ref.events = events;
292 if (ref.priority == Priority::High) {
293 high_prio_epoll_.mod(fd, sid, events);
294 }
295 }
296}
297
298void Reactor::unsubscribe(int fd, int sid) noexcept
299{
300 auto& ref = data_[fd];
301 if (ref.sid == sid) {
302 if (ref.priority == Priority::High) {
303 high_prio_epoll_.del(fd);
304 }
305 epoll_.del(fd);
306 ref.events = 0;
307 ref.slot.reset();
308 ref.priority = Priority::Low;
309 }
310}
311
312void Reactor::set_io_priority(int fd, int sid, Priority priority) noexcept
313{
314 auto& ref = data_[fd];
315 if (ref.sid == sid && ref.priority != priority) {
316 if (priority == Priority::High) {
317 high_prio_epoll_.add(fd, sid, ref.events);
318 } else {
319 high_prio_epoll_.del(fd);
320 }
321 ref.priority = priority;
322 }
323}
324
325} // namespace io
326} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:89
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:45
~Reactor() override
Definition Reactor.cpp:52
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:57
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:72
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:125
MonoTime mono_time() const noexcept
Definition Time.hpp:138
WallTime wall_time() const noexcept
Definition Time.hpp:139
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:31
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:154
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
auto make_finally(FnT fn) noexcept
Definition Finally.hpp:48
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:97