Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28
29int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
30{
31 int work_done = 0;
32 if (idle_cycle) {
33 work_done = tq.dispatch(now, 1);
34 }
35 else if (!tq.empty()) {
36 // actively execute low priority timers if they've been delayed by 100ms or more.
37 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
38 work_done = tq.dispatch(now, 1);
39 }
40 }
41 return work_done;
42}
43} // namespace
44
46{
47 const auto notify = notify_.fd();
48 data_.resize(max<size_t>(notify + 1, size_hint));
49 epoll_.add(notify, 0, EpollIn);
50}
51
53{
54 epoll_.del(notify_.fd());
55}
56
57Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
58{
59 assert(fd >= 0);
60 assert(slot);
61 if (fd >= static_cast<int>(data_.size())) {
62 data_.resize(fd + 1);
63 }
64 auto& ref = data_[fd];
65 epoll_.add(fd, ++ref.sid, events);
66 ref.events = events;
67 ref.slot = slot;
68 ref.priority = Priority::Low;
69 return {*this, fd, ref.sid};
70}
71
73{
74 enum { High = 0, Low = 1 };
75 using namespace chrono;
76
77 // If timeout is zero then the wait_until time should also be zero to signify no wait.
79 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
80 const MonoTime next
81 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
82 if (next > now.mono_time()) {
83 wait_until = next;
84 }
85 }
86 // TODO: consider using a dynamic buffer that scales with increased demand.
88
89 int n;
90 error_code ec;
91 if (wait_until < MonoClock::max()) {
92 // The wait function will not block if time is zero.
93 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
94 } else {
95 // Block indefinitely.
96 n = epoll_.wait(buf, MaxEvents, ec);
97 }
98 // Update cycle time after epoll() returns.
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.mono_time();
101 last_time_user_hook_polled_ = now.mono_time();
102
103 if (ec) {
104 if (ec.value() != EINTR) {
105 throw system_error{ec};
106 }
107 return 0;
108 }
109 cycle_work_ = 0;
110 TOOLBOX_PROBE_SCOPED(reactor, dispatch, cycle_work_);
111 // High priority timers.
112 cycle_work_ = tqs_[High].dispatch(now);
113 // I/O events.
114 cycle_work_ += dispatch(now, buf, n, Priority::High);
115 cycle_work_ += dispatch(now, buf, n, Priority::Low);
116 // Low priority timers (typically only dispatched during empty cycles).
117 cycle_work_ += dispatch_low_priority_timers(now, tqs_[Low], cycle_work_ == 0);
118 // End of cycle hooks.
119 if (cycle_work_ > 0) {
120 io::dispatch(now, end_of_event_dispatch_hooks_);
121 }
122 io::dispatch(now, end_of_cycle_no_wait_hooks);
123 return cycle_work_;
124}
125
127{
128 // Best effort.
129 std::error_code ec;
130 notify_.write(1, ec);
131}
132
133MonoTime Reactor::next_expiry(MonoTime next) const
134{
135 enum { High = 0, Low = 1 };
136 using namespace chrono;
137 {
138 const auto& tq = tqs_[High];
139 if (!tq.empty()) {
140 // Duration until next expiry. Mitigate scheduler latency by preempting the
141 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
142 next = min(next, tq.front().expiry() - 200us);
143 }
144 }
145 {
146 const auto& tq = tqs_[Low];
147 if (!tq.empty()) {
148 // Duration until next expiry.
149 next = min(next, tq.front().expiry());
150 }
151 }
152 return next;
153}
154
156{
157 if (currently_handling_priority_events_) [[unlikely]] {
158 return;
159 }
160
161 MonoTime now = MonoClock::now();
162 cycle_work_ += do_io_priority_poll(now);
163 cycle_work_ += do_user_priority_poll(now);
164}
165
166int Reactor::do_io_priority_poll(MonoTime now) noexcept
167{
168 int ret = 0;
169 try {
170 const bool enabled = priority_io_poll_threshold_ != Micros::max();
171 const auto delta = duration_cast<Micros>(now - last_time_priority_io_polled_);
172 const bool breached = delta > priority_io_poll_threshold_;
173
174 if (enabled && breached) {
175 const auto update_poll_time = make_finally([this]() noexcept {
176 last_time_priority_io_polled_ = MonoClock::now();
177 });
178
179 error_code ec;
180 Event buf[MaxEvents];
181
182 int n = epoll_.wait(buf, MaxEvents, MonoTime{}, ec);
183 if (ec) {
184 if (ec.value() != EINTR) {
185 TOOLBOX_ERROR << "epoll failure during high priority io poll: "
186 << ec << " [" << ec.message() << ']';
187 }
188 } else {
189 ret = dispatch(CyclTime::current(), buf, n, Priority::High);
190 }
191 }
192 } catch (const std::exception& e) {
193 TOOLBOX_ERROR << "exception during high priority io poll: " << e.what();
194 }
195
196 return ret;
197}
198
199int Reactor::do_user_priority_poll(MonoTime now) noexcept
200{
201 int ret = 0;
202 try {
203 const bool enabled = (user_hook_poll_threshold_ != Micros::max()) &&
204 priority_poll_user_hook_;
205 const auto delta = duration_cast<Micros>(now - last_time_user_hook_polled_);
206 const bool breached = delta > user_hook_poll_threshold_;
207
208 if (enabled && breached) {
209 const auto update_poll_time = make_finally([this]() noexcept {
210 last_time_user_hook_polled_ = MonoClock::now();
211 });
212
213 currently_handling_priority_events_ = true;
214 const auto reset_flag = make_finally([this]() noexcept {
215 currently_handling_priority_events_ = false;
216 });
217
218 ret = priority_poll_user_hook_(CyclTime::current());
219 }
220 } catch (const std::exception& e) {
221 TOOLBOX_ERROR << "exception during user high priority hook: " << e.what();
222 }
223
224 return ret;
225}
226
227int Reactor::dispatch(CyclTime now, Event* buf, int size, Priority priority)
228{
229 if (priority == Priority::High) {
230 assert(!currently_handling_priority_events_);
231 currently_handling_priority_events_ = true;
232 }
233 const auto reset_flag = make_finally([this]() noexcept {
234 currently_handling_priority_events_ = false;
235 });
236
237 int work{0};
238 for (int i{0}; i < size; ++i) {
239
240 auto& ev = buf[i];
241 const auto fd = Epoll::fd(ev);
242 const auto& ref = data_[fd];
243
244 if (ref.priority != priority) {
245 continue;
246 }
247
248 if (fd == notify_.fd()) {
249 notify_.read();
250 continue;
251 }
252
253 if (!ref.slot) {
254 // Ignore timerfd.
255 continue;
256 }
257
258 const auto sid = Epoll::sid(ev);
259 // Skip this socket if it was modified after the call to wait().
260 if (ref.sid > sid) {
261 continue;
262 }
263 // Apply the interest events to filter-out any events that the user may have removed from
264 // the events since the call to wait() was made. This would typically happen via a reentrant
265 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
266 // reported if they occur, regardless of whether they are specified in events.
267 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
268 if (!events) {
269 continue;
270 }
271
272 try {
273 ref.slot(now, fd, events);
274 } catch (const std::exception& e) {
275 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
276 }
277 ++work;
278 }
279 return work;
280}
281
282void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
283{
284 auto& ref = data_[fd];
285 if (ref.sid == sid) {
286 if (ref.events != events) {
287 epoll_.mod(fd, sid, events, ec);
288 if (ec) {
289 return;
290 }
291 ref.events = events;
292 }
293 ref.slot = slot;
294 }
295}
296
297void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
298{
299 auto& ref = data_[fd];
300 if (ref.sid == sid) {
301 if (ref.events != events) {
302 epoll_.mod(fd, sid, events);
303 ref.events = events;
304 }
305 ref.slot = slot;
306 }
307}
308
309void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
310{
311 auto& ref = data_[fd];
312 if (ref.sid == sid && ref.events != events) {
313 epoll_.mod(fd, sid, events, ec);
314 if (ec) {
315 return;
316 }
317 ref.events = events;
318 }
319}
320
321void Reactor::set_events(int fd, int sid, unsigned events)
322{
323 auto& ref = data_[fd];
324 if (ref.sid == sid && ref.events != events) {
325 epoll_.mod(fd, sid, events);
326 ref.events = events;
327 }
328}
329
330void Reactor::unsubscribe(int fd, int sid) noexcept
331{
332 auto& ref = data_[fd];
333 if (ref.sid == sid) {
334 epoll_.del(fd);
335 ref.events = 0;
336 ref.slot.reset();
337 ref.priority = Priority::Low;
338 }
339}
340
341void Reactor::set_io_priority(int fd, int sid, Priority priority) noexcept
342{
343 auto& ref = data_[fd];
344 if (ref.sid == sid && ref.priority != priority) {
345 ref.priority = priority;
346 }
347}
348
349} // namespace io
350} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:93
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:45
~Reactor() override
Definition Reactor.cpp:52
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:57
void yield() noexcept
Definition Reactor.cpp:155
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:72
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:126
MonoTime mono_time() const noexcept
Definition Time.hpp:138
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicSlot< void(CyclTime, int, unsigned)> IoSlot
Definition Reactor.hpp:31
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:154
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
auto make_finally(FnT fn) noexcept
Definition Finally.hpp:48
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:97