Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28
29int dispatch_low_priority_timers(CyclTime now, TimerQueue& tq, bool idle_cycle)
30{
31 int work_done = 0;
32 if (idle_cycle) {
33 work_done = tq.dispatch(now, 1);
34 }
35 else if (!tq.empty()) {
36 // actively execute low priority timers if they've been delayed by 100ms or more.
37 if ((now.mono_time() - tq.front().expiry()) > 100ms) {
38 work_done = tq.dispatch(now, 1);
39 }
40 }
41 return work_done;
42}
43} // namespace
44
46{
47 const auto notify = notify_.fd();
48 data_.resize(max<size_t>(notify + 1, size_hint));
49 epoll_.add(notify, 0, EpollIn);
50}
51
53{
54 epoll_.del(notify_.fd());
55}
56
57Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
58{
59 assert(fd >= 0);
60 assert(slot);
61 if (fd >= static_cast<int>(data_.size())) {
62 data_.resize(fd + 1);
63 }
64 auto& ref = data_[fd];
65 epoll_.add(fd, ++ref.sid, events);
66 ref.events = events;
67 ref.slot = slot;
68 return {*this, fd, ref.sid};
69}
70
72{
73 enum { High = 0, Low = 1 };
74 using namespace chrono;
75
76 // If timeout is zero then the wait_until time should also be zero to signify no wait.
78 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
79 const MonoTime next
80 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
81 if (next > now.mono_time()) {
82 wait_until = next;
83 }
84 }
85 // TODO: consider using a dynamic buffer that scales with increased demand.
87
88 int n;
89 error_code ec;
90 if (wait_until < MonoClock::max()) {
91 // The wait function will not block if time is zero.
92 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
93 } else {
94 // Block indefinitely.
95 n = epoll_.wait(buf, MaxEvents, ec);
96 }
97 // Update cycle time after epoll() returns.
98 now = CyclTime::now();
99 if (ec) {
100 if (ec.value() != EINTR) {
101 throw system_error{ec};
102 }
103 return 0;
104 }
105 int work{0};
107 // High priority timers.
108 work = tqs_[High].dispatch(now);
109 // I/O events.
110 work += dispatch(now, buf, n);
111 // Low priority timers (typically only dispatched during empty cycles).
112 work += dispatch_low_priority_timers(now, tqs_[Low], work == 0);
113 // End of cycle hooks.
114 if (work > 0) {
115 io::dispatch(now, end_of_event_dispatch_hooks_);
116 }
117 io::dispatch(now, end_of_cycle_no_wait_hooks);
118 return work;
119}
120
122{
123 // Best effort.
124 std::error_code ec;
125 notify_.write(1, ec);
126}
127
128MonoTime Reactor::next_expiry(MonoTime next) const
129{
130 enum { High = 0, Low = 1 };
131 using namespace chrono;
132 {
133 const auto& tq = tqs_[High];
134 if (!tq.empty()) {
135 // Duration until next expiry. Mitigate scheduler latency by preempting the
136 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
137 next = min(next, tq.front().expiry() - 200us);
138 }
139 }
140 {
141 const auto& tq = tqs_[Low];
142 if (!tq.empty()) {
143 // Duration until next expiry.
144 next = min(next, tq.front().expiry());
145 }
146 }
147 return next;
148}
149
150int Reactor::dispatch(CyclTime now, Event* buf, int size)
151{
152 int work{0};
153 for (int i{0}; i < size; ++i) {
154
155 auto& ev = buf[i];
156 const auto fd = epoll_.fd(ev);
157 if (fd == notify_.fd()) {
158 notify_.read();
159 continue;
160 }
161 const auto& ref = data_[fd];
162 if (!ref.slot) {
163 // Ignore timerfd.
164 continue;
165 }
166
167 const auto sid = epoll_.sid(ev);
168 // Skip this socket if it was modified after the call to wait().
169 if (ref.sid > sid) {
170 continue;
171 }
172 // Apply the interest events to filter-out any events that the user may have removed from
173 // the events since the call to wait() was made. This would typically happen via a reentrant
174 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
175 // reported if they occur, regardless of whether they are specified in events.
176 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
177 if (!events) {
178 continue;
179 }
180
181 try {
182 ref.slot(now, fd, events);
183 } catch (const std::exception& e) {
184 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
185 }
186 ++work;
187 }
188 return work;
189}
190
191void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
192{
193 auto& ref = data_[fd];
194 if (ref.sid == sid) {
195 if (ref.events != events) {
196 epoll_.mod(fd, sid, events, ec);
197 if (ec) {
198 return;
199 }
200 ref.events = events;
201 }
202 ref.slot = slot;
203 }
204}
205
206void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
207{
208 auto& ref = data_[fd];
209 if (ref.sid == sid) {
210 if (ref.events != events) {
211 epoll_.mod(fd, sid, events);
212 ref.events = events;
213 }
214 ref.slot = slot;
215 }
216}
217
218void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
219{
220 auto& ref = data_[fd];
221 if (ref.sid == sid && ref.events != events) {
222 epoll_.mod(fd, sid, events, ec);
223 if (ec) {
224 return;
225 }
226 ref.events = events;
227 }
228}
229
230void Reactor::set_events(int fd, int sid, unsigned events)
231{
232 auto& ref = data_[fd];
233 if (ref.sid == sid && ref.events != events) {
234 epoll_.mod(fd, sid, events);
235 ref.events = events;
236 }
237}
238
239void Reactor::unsubscribe(int fd, int sid) noexcept
240{
241 auto& ref = data_[fd];
242 if (ref.sid == sid) {
243 epoll_.del(fd);
244 ref.events = 0;
245 ref.slot.reset();
246 }
247}
248
249} // namespace io
250} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:93
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:45
~Reactor() override
Definition Reactor.cpp:52
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:57
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:71
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:121
MonoTime mono_time() const noexcept
Definition Time.hpp:137
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
BasicSlot< CyclTime, int, unsigned > IoSlot
Definition Reactor.hpp:32
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:153
MonoClock::time_point MonoTime
Definition Time.hpp:110
Nanos Duration
Definition Time.hpp:40
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:92