Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
23
24#include <variant>
25
26namespace toolbox {
27inline namespace io {
28using namespace std;
29namespace {
30constexpr size_t MaxEvents{128};
31
32int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
33{
34 int work_done = 0;
35 if (idle_cycle) {
36 work_done = tq.dispatch(now, 1);
37 }
38 else if (!tq.empty()) {
39 // actively execute low priority timers if they've been delayed by 100ms or more.
40 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
41 work_done = tq.dispatch(now, 1);
42 }
43 }
44 return work_done;
45}
46} // namespace
47
49{
50 const auto notify = notify_.fd();
51 data_.resize(max<size_t>(notify + 1, size_hint));
52
54 device_.emplace<ImmediateDevice>();
55 // notify fd does not need to be added to ImmediateDevice's epoll set.
56 // This is because the notify fd only exists to allow the Reactor to be woken
57 // up -- however, in immediate mode the reactor never blocks.
58 } else {
59 auto& dev = device_.emplace<BlockingDevice>();
60 dev.epoll.add(notify, 0, EpollIn);
61 }
62}
63
65{
66 std::visit(overloaded{
67 [this](ImmediateDevice& dev) { dev.low_prio_epoll.del(notify_.fd()); },
68 [this](BlockingDevice& dev) { dev.epoll.del(notify_.fd()); }
69 }, device_);
70}
71
72Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
73{
74 assert(fd >= 0);
75 assert(slot);
76 if (fd >= static_cast<int>(data_.size())) {
77 data_.resize(fd + 1);
78 }
79 auto& ref = data_[fd];
80 std::visit(overloaded{
81 [&](ImmediateDevice& dev) { dev.low_prio_epoll.add(fd, ++ref.sid, events); },
82 [&](BlockingDevice& dev) { dev.epoll.add(fd, ++ref.sid, events); }
83 }, device_);
84 ref.events = events;
85 ref.slot = slot;
86 ref.priority = Priority::Low;
87 return {*this, fd, ref.sid};
88}
89
90int Reactor::poll_blocking(BlockingDevice& dev, CyclTime now, Duration timeout)
91{
92 enum { High = 0, Low = 1 };
93 using namespace chrono;
94
95 // If timeout is zero then the wait_until time should also be zero to signify no wait.
97 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
98 const MonoTime next
99 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
100 if (next > now.mono_time()) {
101 wait_until = next;
102 }
103 }
104 // TODO: consider using a dynamic buffer that scales with increased demand.
106
107 int n;
108 error_code ec;
109 if (wait_until < MonoClock::max()) {
110 // The wait function will not block if time is zero.
111 n = dev.epoll.wait(buf, MaxEvents, wait_until, ec);
112 } else {
113 // Block indefinitely.
114 n = dev.epoll.wait(buf, MaxEvents, ec);
115 }
116 // Update cycle time after epoll() returns.
117 now = CyclTime::now();
118 last_time_priority_io_polled_ = now.wall_time();
119
120 if (ec) {
121 if (ec.value() != EINTR) {
122 throw system_error{ec};
123 }
124 return 0;
125 }
126 cycle_work_ = 0;
128 // High priority timers.
129 cycle_work_ = tqs_[High].dispatch(now);
130 // I/O events.
131 cycle_work_ += dispatch(now, buf, n, Priority::High);
132 cycle_work_ += dispatch(now, buf, n, Priority::Low);
133 // Low priority timers (typically only dispatched during empty cycles).
134 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
135 // End of cycle hooks.
136 if (cycle_work_ > 0) {
137 io::dispatch(now, end_of_event_dispatch_hooks_);
138 }
139 io::dispatch(now, end_of_cycle_no_wait_hooks);
140 return cycle_work_;
141}
142
143int Reactor::poll_immediate(ImmediateDevice& dev, CyclTime now)
144{
145 enum { High = 0, Low = 1 };
146
147 auto dispatch_io = [&]() -> int {
149 error_code ec;
150
151 // high priority IO
152 last_time_priority_io_polled_ = now.wall_time();
153 int hn = dev.high_prio_epoll.wait(buf, MaxEvents, MonoTime{}, ec);
154 if (ec) {
155 if (ec.value() != EINTR) {
156 throw system_error{ec};
157 }
158 return 0;
159 }
160 dispatch(now, buf, hn, Priority::High);
161
162 // low priority IO
163 int ln = dev.low_prio_epoll.wait(buf, MaxEvents, MonoTime{}, ec);
164 if (ec) {
165 if (ec.value() != EINTR) {
166 throw system_error{ec};
167 }
168 return hn;
169 }
170 dispatch(now, buf, ln, Priority::Low);
171
172 return hn + ln;
173 };
174
175 cycle_work_ = 0;
177 // High priority timers.
178 cycle_work_ = tqs_[High].dispatch(now);
179 // I/O events.
180 cycle_work_ += dispatch_io();
181 // Low priority timers (typically only dispatched during empty cycles).
182 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
183 // End of cycle hooks.
184 if (cycle_work_ > 0) {
185 io::dispatch(now, end_of_event_dispatch_hooks_);
186 }
187 io::dispatch(now, end_of_cycle_no_wait_hooks);
188 return cycle_work_;
189}
190
192{
193 return std::visit(overloaded{
194 [&](ImmediateDevice& dev) { return poll_immediate(dev, now); },
195 [&](BlockingDevice& dev) { return poll_blocking(dev, now, timeout); }
196 }, device_);
197}
198
200{
201 if (std::holds_alternative<BlockingDevice>(device_)) {
202 // Best effort.
203 std::error_code ec;
204 notify_.write(1, ec);
205 }
206}
207
208MonoTime Reactor::next_expiry(MonoTime next) const
209{
210 enum { High = 0, Low = 1 };
211 using namespace chrono;
212 {
213 const auto& tq = tqs_[High];
214 if (!tq.empty()) {
215 // Duration until next expiry. Mitigate scheduler latency by preempting the
216 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
217 next = min(next, tq.front().expiry() - 200us);
218 }
219 }
220 {
221 const auto& tq = tqs_[Low];
222 if (!tq.empty()) {
223 // Duration until next expiry.
224 next = min(next, tq.front().expiry());
225 }
226 }
227 return next;
228}
229
231{
232 if (currently_handling_priority_events_) [[unlikely]] {
233 return;
234 }
235
236 if (priority_io_poll_threshold == Micros::max()) {
237 return;
238 }
239
240 WallTime now = WallClock::now();
241 if (now - last_time_priority_io_polled_ > priority_io_poll_threshold) {
242 last_time_priority_io_polled_ = now;
243
244 error_code ec;
246
247 Epoll& epoll = std::visit(overloaded{
248 [&](ImmediateDevice& dev) -> Epoll& { return dev.high_prio_epoll; },
249 [&](BlockingDevice& dev) -> Epoll& { return dev.epoll; }
250 }, device_);
251
252 int n = epoll.wait(buf, MaxEvents, MonoTime{}, ec);
253
254 if (ec) {
255 if (ec.value() != EINTR) {
256 throw system_error{ec};
257 }
258 return;
259 }
260
261 cycle_work_ += dispatch(CyclTime::current(), buf, n, Priority::High);
262 }
263}
264
265int Reactor::dispatch(CyclTime now, Event* buf, int size, Priority priority)
266{
267 if (priority == Priority::High) {
268 assert(!currently_handling_priority_events_);
269 currently_handling_priority_events_ = true;
270 }
271 const auto reset_flag = make_finally([this]() noexcept {
272 currently_handling_priority_events_ = false;
273 });
274
275 int work{0};
276 for (int i{0}; i < size; ++i) {
277
278 auto& ev = buf[i];
279 const auto fd = Epoll::fd(ev);
280 const auto& ref = data_[fd];
281
282 if (ref.priority != priority) {
283 continue;
284 }
285
286 if (fd == notify_.fd()) {
287 notify_.read();
288 continue;
289 }
290
291 if (!ref.slot) {
292 // Ignore timerfd.
293 continue;
294 }
295
296 const auto sid = Epoll::sid(ev);
297 // Skip this socket if it was modified after the call to wait().
298 if (ref.sid > sid) {
299 continue;
300 }
301 // Apply the interest events to filter-out any events that the user may have removed from
302 // the events since the call to wait() was made. This would typically happen via a reentrant
303 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
304 // reported if they occur, regardless of whether they are specified in events.
305 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
306 if (!events) {
307 continue;
308 }
309
310 try {
311 ref.slot(now, fd, events);
312 } catch (const std::exception& e) {
313 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
314 }
315 ++work;
316 }
317 return work;
318}
319
320Epoll& Reactor::get_resident_epoll(Data& data)
321{
322 return std::visit(overloaded{
323 [&](ImmediateDevice& dev) -> Epoll& {
324 return (data.priority == Priority::High) ? dev.high_prio_epoll
325 : dev.low_prio_epoll;
326 },
327 [&](BlockingDevice& dev) -> Epoll& { return dev.epoll; }
328 }, device_);
329}
330
331void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
332{
333 auto& ref = data_[fd];
334 if (ref.sid == sid) {
335 if (ref.events != events) {
336 Epoll& epoll = get_resident_epoll(ref);
337 epoll.mod(fd, sid, events, ec);
338 if (ec) {
339 return;
340 }
341 ref.events = events;
342 }
343 ref.slot = slot;
344 }
345}
346
347void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
348{
349 auto& ref = data_[fd];
350 if (ref.sid == sid) {
351 if (ref.events != events) {
352 Epoll& epoll = get_resident_epoll(ref);
353 epoll.mod(fd, sid, events);
354 ref.events = events;
355 }
356 ref.slot = slot;
357 }
358}
359
360void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
361{
362 auto& ref = data_[fd];
363 if (ref.sid == sid && ref.events != events) {
364 Epoll& epoll = get_resident_epoll(ref);
365 epoll.mod(fd, sid, events, ec);
366 if (ec) {
367 return;
368 }
369 ref.events = events;
370 }
371}
372
373void Reactor::set_events(int fd, int sid, unsigned events)
374{
375 auto& ref = data_[fd];
376 if (ref.sid == sid && ref.events != events) {
377 Epoll& epoll = get_resident_epoll(ref);
378 epoll.mod(fd, sid, events);
379 ref.events = events;
380 }
381}
382
383void Reactor::unsubscribe(int fd, int sid) noexcept
384{
385 auto& ref = data_[fd];
386 if (ref.sid == sid) {
387 Epoll& epoll = get_resident_epoll(ref);
388 epoll.del(fd);
389 ref.events = 0;
390 ref.slot.reset();
391 ref.priority = Priority::Low;
392 }
393}
394
395void Reactor::set_io_priority(int fd, int sid, Priority priority) noexcept
396{
397 auto& ref = data_[fd];
398 if (ref.sid == sid && ref.priority != priority) {
399 std::visit(overloaded{
400 [&](ImmediateDevice& dev) {
401 if (ref.priority == Priority::Low) {
402 dev.low_prio_epoll.del(fd);
403 dev.high_prio_epoll.add(fd, sid, ref.events);
404 } else {
405 dev.high_prio_epoll.del(fd);
406 dev.low_prio_epoll.add(fd, sid, ref.events);
407 }
408 },
409 [&](BlockingDevice& /*dev*/) {}
410 }, device_);
411 ref.priority = priority;
412 }
413}
414
415} // namespace io
416} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:89
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
~Reactor() override
Definition Reactor.cpp:64
Reactor(ReactorMode mode=ReactorMode::Blocking, std::size_t size_hint=0)
Definition Reactor.cpp:48
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:72
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:191
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:199
MonoTime mono_time() const noexcept
Definition Time.hpp:138
WallTime wall_time() const noexcept
Definition Time.hpp:139
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:30
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
const DataT & data(const MsgEvent &ev) noexcept
Definition Event.hpp:54
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:32
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:154
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
auto make_finally(FnT fn) noexcept
Definition Finally.hpp:48
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:97