Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28
29int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
30{
31 int work_done = 0;
32 if (idle_cycle) {
33 work_done = tq.dispatch(now, 1);
34 }
35 else if (!tq.empty()) {
36 // actively execute low priority timers if they've been delayed by 100ms or more.
37 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
38 work_done = tq.dispatch(now, 1);
39 }
40 }
41 return work_done;
42}
43} // namespace
44
46{
47 const auto notify = notify_.fd();
48 data_.resize(max<size_t>(notify + 1, size_hint));
49 epoll_.add(notify, 0, EpollIn);
50}
51
53{
54 epoll_.del(notify_.fd());
55}
56
57Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
58{
59 assert(fd >= 0);
60 assert(slot);
61 if (fd >= static_cast<int>(data_.size())) {
62 data_.resize(fd + 1);
63 }
64 auto& ref = data_[fd];
65 epoll_.add(fd, ++ref.sid, events);
66 ref.events = events;
67 ref.slot = slot;
68 ref.priority = Priority::Low;
69 return {*this, fd, ref.sid};
70}
71
73{
74 enum { High = 0, Low = 1 };
75 using namespace chrono;
76
77 // If timeout is zero then the wait_until time should also be zero to signify no wait.
79 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
80 const MonoTime next
81 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
82 if (next > now.mono_time()) {
83 wait_until = next;
84 }
85 }
86 // TODO: consider using a dynamic buffer that scales with increased demand.
88
89 int n;
90 error_code ec;
91 if (wait_until < MonoClock::max()) {
92 // The wait function will not block if time is zero.
93 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
94 } else {
95 // Block indefinitely.
96 n = epoll_.wait(buf, MaxEvents, ec);
97 }
98 // Update cycle time after epoll() returns.
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.wall_time();
101
102 if (ec) {
103 if (ec.value() != EINTR) {
104 throw system_error{ec};
105 }
106 return 0;
107 }
108 cycle_work_ = 0;
109 TOOLBOX_PROBE_SCOPED(reactor, dispatch, cycle_work_);
110 // High priority timers.
111 cycle_work_ = tqs_[High].dispatch(now);
112 // I/O events.
113 cycle_work_ += dispatch(now, buf, n, Priority::High);
114 cycle_work_ += dispatch(now, buf, n, Priority::Low);
115 // Low priority timers (typically only dispatched during empty cycles).
116 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
117 // End of cycle hooks.
118 if (cycle_work_ > 0) {
119 io::dispatch(now, end_of_event_dispatch_hooks_);
120 }
121 io::dispatch(now, end_of_cycle_no_wait_hooks);
122 return cycle_work_;
123}
124
126{
127 // Best effort.
128 std::error_code ec;
129 notify_.write(1, ec);
130}
131
132MonoTime Reactor::next_expiry(MonoTime next) const
133{
134 enum { High = 0, Low = 1 };
135 using namespace chrono;
136 {
137 const auto& tq = tqs_[High];
138 if (!tq.empty()) {
139 // Duration until next expiry. Mitigate scheduler latency by preempting the
140 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
141 next = min(next, tq.front().expiry() - 200us);
142 }
143 }
144 {
145 const auto& tq = tqs_[Low];
146 if (!tq.empty()) {
147 // Duration until next expiry.
148 next = min(next, tq.front().expiry());
149 }
150 }
151 return next;
152}
153
155{
156 if (currently_handling_priority_events_) [[unlikely]] {
157 return;
158 }
159
160 if (priority_io_poll_threshold == Micros::max()) {
161 return;
162 }
163
164 WallTime now = WallClock::now();
165 if (now - last_time_priority_io_polled_ > priority_io_poll_threshold) {
166 last_time_priority_io_polled_ = now;
167
168 error_code ec;
170
171 int n = epoll_.wait(buf, MaxEvents, MonoTime{}, ec);
172 if (ec) {
173 if (ec.value() != EINTR) {
174 TOOLBOX_ERROR << "epoll failure during high priority poll: "
175 << ec << " [" << ec.message() << ']';
176 }
177 } else {
178 cycle_work_ += dispatch(CyclTime::current(), buf, n, Priority::High);
179 }
180
181 cycle_work_ += dispatch_user_hp_hook();
182 }
183}
184
185int Reactor::dispatch_user_hp_hook()
186{
187 currently_handling_priority_events_ = true;
188 const auto reset_flag = make_finally([this]() noexcept {
189 currently_handling_priority_events_ = false;
190 });
191
192 int ret = 0;
193
194 try {
195 if (priority_poll_user_hook_) {
196 ret = priority_poll_user_hook_(CyclTime::current());
197 }
198 } catch (const std::exception& e) {
199 TOOLBOX_ERROR << "exception during user high priority hook: " << e.what();
200 }
201
202 return ret;
203}
204
205int Reactor::dispatch(CyclTime now, Event* buf, int size, Priority priority)
206{
207 if (priority == Priority::High) {
208 assert(!currently_handling_priority_events_);
209 currently_handling_priority_events_ = true;
210 }
211 const auto reset_flag = make_finally([this]() noexcept {
212 currently_handling_priority_events_ = false;
213 });
214
215 int work{0};
216 for (int i{0}; i < size; ++i) {
217
218 auto& ev = buf[i];
219 const auto fd = Epoll::fd(ev);
220 const auto& ref = data_[fd];
221
222 if (ref.priority != priority) {
223 continue;
224 }
225
226 if (fd == notify_.fd()) {
227 notify_.read();
228 continue;
229 }
230
231 if (!ref.slot) {
232 // Ignore timerfd.
233 continue;
234 }
235
236 const auto sid = Epoll::sid(ev);
237 // Skip this socket if it was modified after the call to wait().
238 if (ref.sid > sid) {
239 continue;
240 }
241 // Apply the interest events to filter-out any events that the user may have removed from
242 // the events since the call to wait() was made. This would typically happen via a reentrant
243 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
244 // reported if they occur, regardless of whether they are specified in events.
245 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
246 if (!events) {
247 continue;
248 }
249
250 try {
251 ref.slot(now, fd, events);
252 } catch (const std::exception& e) {
253 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
254 }
255 ++work;
256 }
257 return work;
258}
259
260void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
261{
262 auto& ref = data_[fd];
263 if (ref.sid == sid) {
264 if (ref.events != events) {
265 epoll_.mod(fd, sid, events, ec);
266 if (ec) {
267 return;
268 }
269 ref.events = events;
270 }
271 ref.slot = slot;
272 }
273}
274
275void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
276{
277 auto& ref = data_[fd];
278 if (ref.sid == sid) {
279 if (ref.events != events) {
280 epoll_.mod(fd, sid, events);
281 ref.events = events;
282 }
283 ref.slot = slot;
284 }
285}
286
287void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
288{
289 auto& ref = data_[fd];
290 if (ref.sid == sid && ref.events != events) {
291 epoll_.mod(fd, sid, events, ec);
292 if (ec) {
293 return;
294 }
295 ref.events = events;
296 }
297}
298
299void Reactor::set_events(int fd, int sid, unsigned events)
300{
301 auto& ref = data_[fd];
302 if (ref.sid == sid && ref.events != events) {
303 epoll_.mod(fd, sid, events);
304 ref.events = events;
305 }
306}
307
308void Reactor::unsubscribe(int fd, int sid) noexcept
309{
310 auto& ref = data_[fd];
311 if (ref.sid == sid) {
312 epoll_.del(fd);
313 ref.events = 0;
314 ref.slot.reset();
315 ref.priority = Priority::Low;
316 }
317}
318
319void Reactor::set_io_priority(int fd, int sid, Priority priority) noexcept
320{
321 auto& ref = data_[fd];
322 if (ref.sid == sid && ref.priority != priority) {
323 ref.priority = priority;
324 }
325}
326
327} // namespace io
328} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:89
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:45
~Reactor() override
Definition Reactor.cpp:52
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:57
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:72
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:125
MonoTime mono_time() const noexcept
Definition Time.hpp:138
WallTime wall_time() const noexcept
Definition Time.hpp:139
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:31
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:154
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
auto make_finally(FnT fn) noexcept
Definition Finally.hpp:48
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:97