Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28
29int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
30{
31 int work_done = 0;
32 if (idle_cycle) {
33 work_done = tq.dispatch(now, 1);
34 }
35 else if (!tq.empty()) {
36 // actively execute low priority timers if they've been delayed by 100ms or more.
37 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
38 work_done = tq.dispatch(now, 1);
39 }
40 }
41 return work_done;
42}
43} // namespace
44
46{
47 const auto notify = notify_.fd();
48 data_.resize(max<size_t>(notify + 1, size_hint));
49 epoll_.add(notify, 0, EpollIn);
50}
51
53{
54 epoll_.del(notify_.fd());
55}
56
57Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
58{
59 assert(fd >= 0);
60 assert(slot);
61 if (fd >= static_cast<int>(data_.size())) {
62 data_.resize(fd + 1);
63 }
64 auto& ref = data_[fd];
65 epoll_.add(fd, ++ref.sid, events);
66 ref.events = events;
67 ref.slot = slot;
68 ref.priority = Priority::Low;
69 return {*this, fd, ref.sid};
70}
71
73{
74 enum { High = 0, Low = 1 };
75 using namespace chrono;
76
77 // If timeout is zero then the wait_until time should also be zero to signify no wait.
79 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
80 const MonoTime next
81 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
82 if (next > now.mono_time()) {
83 wait_until = next;
84 }
85 }
86 // TODO: consider using a dynamic buffer that scales with increased demand.
88
89 int n;
90 error_code ec;
91 if (wait_until < MonoClock::max()) {
92 // The wait function will not block if time is zero.
93 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
94 } else {
95 // Block indefinitely.
96 n = epoll_.wait(buf, MaxEvents, ec);
97 }
98 // Update cycle time after epoll() returns.
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.wall_time();
101
102 if (ec) {
103 if (ec.value() != EINTR) {
104 throw system_error{ec};
105 }
106 return 0;
107 }
108 cycle_work_ = 0;
109 TOOLBOX_PROBE_SCOPED(reactor, dispatch, cycle_work_);
110 // High priority timers.
111 cycle_work_ = tqs_[High].dispatch(now);
112 // I/O events.
113 cycle_work_ += dispatch(now, buf, n, Priority::High);
114 cycle_work_ += dispatch(now, buf, n, Priority::Low);
115 // Low priority timers (typically only dispatched during empty cycles).
116 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
117 // End of cycle hooks.
118 if (cycle_work_ > 0) {
119 io::dispatch(now, end_of_event_dispatch_hooks_);
120 }
121 io::dispatch(now, end_of_cycle_no_wait_hooks);
122 return cycle_work_;
123}
124
126{
127 // Best effort.
128 std::error_code ec;
129 notify_.write(1, ec);
130}
131
132MonoTime Reactor::next_expiry(MonoTime next) const
133{
134 enum { High = 0, Low = 1 };
135 using namespace chrono;
136 {
137 const auto& tq = tqs_[High];
138 if (!tq.empty()) {
139 // Duration until next expiry. Mitigate scheduler latency by preempting the
140 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
141 next = min(next, tq.front().expiry() - 200us);
142 }
143 }
144 {
145 const auto& tq = tqs_[Low];
146 if (!tq.empty()) {
147 // Duration until next expiry.
148 next = min(next, tq.front().expiry());
149 }
150 }
151 return next;
152}
153
155{
156 if (currently_handling_priority_events_) [[unlikely]] {
157 return;
158 }
159
160 WallTime now = WallClock::now();
161 cycle_work_ += do_io_priority_poll(now);
162 cycle_work_ += do_user_priority_poll(now);
163}
164
165int Reactor::do_io_priority_poll(WallTime now) noexcept
166{
167 int ret = 0;
168 try {
169 const bool enabled = priority_io_poll_threshold_ != Micros::max();
170 const auto delta = duration_cast<Micros>(now - last_time_priority_io_polled_);
171 const bool breached = delta > priority_io_poll_threshold_;
172
173 if (enabled && breached) {
174 const auto update_poll_time = make_finally([this]() noexcept {
175 last_time_priority_io_polled_ = WallClock::now();
176 });
177
178 error_code ec;
179 Event buf[MaxEvents];
180
181 int n = epoll_.wait(buf, MaxEvents, MonoTime{}, ec);
182 if (ec) {
183 if (ec.value() != EINTR) {
184 TOOLBOX_ERROR << "epoll failure during high priority io poll: "
185 << ec << " [" << ec.message() << ']';
186 }
187 } else {
188 ret = dispatch(CyclTime::current(), buf, n, Priority::High);
189 }
190 }
191 } catch (const std::exception& e) {
192 TOOLBOX_ERROR << "exception during high priority io poll: " << e.what();
193 }
194
195 return ret;
196}
197
198int Reactor::do_user_priority_poll(WallTime now) noexcept
199{
200 int ret = 0;
201 try {
202 const bool enabled = (user_hook_poll_threshold_ != Micros::max()) &&
203 priority_poll_user_hook_;
204 const auto delta = duration_cast<Micros>(now - last_time_user_hook_polled_);
205 const bool breached = delta > user_hook_poll_threshold_;
206
207 if (enabled && breached) {
208 const auto update_poll_time = make_finally([this]() noexcept {
209 last_time_user_hook_polled_ = WallClock::now();
210 });
211
212 currently_handling_priority_events_ = true;
213 const auto reset_flag = make_finally([this]() noexcept {
214 currently_handling_priority_events_ = false;
215 });
216
217 ret = priority_poll_user_hook_(CyclTime::current());
218 }
219 } catch (const std::exception& e) {
220 TOOLBOX_ERROR << "exception during user high priority hook: " << e.what();
221 }
222
223 return ret;
224}
225
226int Reactor::dispatch(CyclTime now, Event* buf, int size, Priority priority)
227{
228 if (priority == Priority::High) {
229 assert(!currently_handling_priority_events_);
230 currently_handling_priority_events_ = true;
231 }
232 const auto reset_flag = make_finally([this]() noexcept {
233 currently_handling_priority_events_ = false;
234 });
235
236 int work{0};
237 for (int i{0}; i < size; ++i) {
238
239 auto& ev = buf[i];
240 const auto fd = Epoll::fd(ev);
241 const auto& ref = data_[fd];
242
243 if (ref.priority != priority) {
244 continue;
245 }
246
247 if (fd == notify_.fd()) {
248 notify_.read();
249 continue;
250 }
251
252 if (!ref.slot) {
253 // Ignore timerfd.
254 continue;
255 }
256
257 const auto sid = Epoll::sid(ev);
258 // Skip this socket if it was modified after the call to wait().
259 if (ref.sid > sid) {
260 continue;
261 }
262 // Apply the interest events to filter-out any events that the user may have removed from
263 // the events since the call to wait() was made. This would typically happen via a reentrant
264 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
265 // reported if they occur, regardless of whether they are specified in events.
266 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
267 if (!events) {
268 continue;
269 }
270
271 try {
272 ref.slot(now, fd, events);
273 } catch (const std::exception& e) {
274 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
275 }
276 ++work;
277 }
278 return work;
279}
280
281void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
282{
283 auto& ref = data_[fd];
284 if (ref.sid == sid) {
285 if (ref.events != events) {
286 epoll_.mod(fd, sid, events, ec);
287 if (ec) {
288 return;
289 }
290 ref.events = events;
291 }
292 ref.slot = slot;
293 }
294}
295
296void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
297{
298 auto& ref = data_[fd];
299 if (ref.sid == sid) {
300 if (ref.events != events) {
301 epoll_.mod(fd, sid, events);
302 ref.events = events;
303 }
304 ref.slot = slot;
305 }
306}
307
308void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
309{
310 auto& ref = data_[fd];
311 if (ref.sid == sid && ref.events != events) {
312 epoll_.mod(fd, sid, events, ec);
313 if (ec) {
314 return;
315 }
316 ref.events = events;
317 }
318}
319
320void Reactor::set_events(int fd, int sid, unsigned events)
321{
322 auto& ref = data_[fd];
323 if (ref.sid == sid && ref.events != events) {
324 epoll_.mod(fd, sid, events);
325 ref.events = events;
326 }
327}
328
329void Reactor::unsubscribe(int fd, int sid) noexcept
330{
331 auto& ref = data_[fd];
332 if (ref.sid == sid) {
333 epoll_.del(fd);
334 ref.events = 0;
335 ref.slot.reset();
336 ref.priority = Priority::Low;
337 }
338}
339
340void Reactor::set_io_priority(int fd, int sid, Priority priority) noexcept
341{
342 auto& ref = data_[fd];
343 if (ref.sid == sid && ref.priority != priority) {
344 ref.priority = priority;
345 }
346}
347
348} // namespace io
349} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:89
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:45
~Reactor() override
Definition Reactor.cpp:52
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:57
void yield() noexcept
Definition Reactor.cpp:154
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:72
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:125
MonoTime mono_time() const noexcept
Definition Time.hpp:138
WallTime wall_time() const noexcept
Definition Time.hpp:139
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:31
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:154
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
auto make_finally(FnT fn) noexcept
Definition Finally.hpp:48
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:97