Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
20#include <toolbox/sys/Log.hpp>
21#include <toolbox/sys/Trace.hpp>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27constexpr size_t MaxEvents{128};
28} // namespace
29
31{
32 const auto notify = notify_.fd();
33 data_.resize(max<size_t>(notify + 1, size_hint));
34 epoll_.add(notify, 0, EpollIn);
35}
36
38{
39 epoll_.del(notify_.fd());
40}
41
42Reactor::Handle Reactor::subscribe(int fd, unsigned events, IoSlot slot)
43{
44 assert(fd >= 0);
45 assert(slot);
46 if (fd >= static_cast<int>(data_.size())) {
47 data_.resize(fd + 1);
48 }
49 auto& ref = data_[fd];
50 epoll_.add(fd, ++ref.sid, events);
51 ref.events = events;
52 ref.slot = slot;
53 return {*this, fd, ref.sid};
54}
55
57{
58 enum { High = 0, Low = 1 };
59 using namespace chrono;
60
61 // If timeout is zero then the wait_until time should also be zero to signify no wait.
63 if (!is_zero(timeout) && end_of_cycle_no_wait_hooks.empty()) {
64 const MonoTime next
65 = next_expiry(timeout == NoTimeout ? MonoClock::max() : now.mono_time() + timeout);
66 if (next > now.mono_time()) {
67 wait_until = next;
68 }
69 }
70 // TODO: consider using a dynamic buffer that scales with increased demand.
72
73 int n;
74 error_code ec;
75 if (wait_until < MonoClock::max()) {
76 // The wait function will not block if time is zero.
77 n = epoll_.wait(buf, MaxEvents, wait_until, ec);
78 } else {
79 // Block indefinitely.
80 n = epoll_.wait(buf, MaxEvents, ec);
81 }
82 // Update cycle time after epoll() returns.
83 now = CyclTime::now();
84 if (ec) {
85 if (ec.value() != EINTR) {
86 throw system_error{ec};
87 }
88 return 0;
89 }
90 int work{0};
92 // High priority timers.
93 work = tqs_[High].dispatch(now);
94 // I/O events.
95 work += dispatch(now, buf, n);
96 // Low priority timers are only dispatched during empty cycles.
97 if (work == 0) {
98 work += tqs_[Low].dispatch(now);
99 }
100 // End of cycle hooks.
101 if (work > 0) {
102 io::dispatch(now, end_of_event_dispatch_hooks_);
103 }
104 io::dispatch(now, end_of_cycle_no_wait_hooks);
105 return work;
106}
107
109{
110 // Best effort.
111 std::error_code ec;
112 notify_.write(1, ec);
113}
114
115MonoTime Reactor::next_expiry(MonoTime next) const
116{
117 enum { High = 0, Low = 1 };
118 using namespace chrono;
119 {
120 const auto& tq = tqs_[High];
121 if (!tq.empty()) {
122 // Duration until next expiry. Mitigate scheduler latency by preempting the
123 // high-priority timer and busy-waiting for 200us ahead of timer expiry.
124 next = min(next, tq.front().expiry() - 200us);
125 }
126 }
127 {
128 const auto& tq = tqs_[Low];
129 if (!tq.empty()) {
130 // Duration until next expiry.
131 next = min(next, tq.front().expiry());
132 }
133 }
134 return next;
135}
136
137int Reactor::dispatch(CyclTime now, Event* buf, int size)
138{
139 int work{0};
140 for (int i{0}; i < size; ++i) {
141
142 auto& ev = buf[i];
143 const auto fd = epoll_.fd(ev);
144 if (fd == notify_.fd()) {
145 notify_.read();
146 continue;
147 }
148 const auto& ref = data_[fd];
149 if (!ref.slot) {
150 // Ignore timerfd.
151 continue;
152 }
153
154 const auto sid = epoll_.sid(ev);
155 // Skip this socket if it was modified after the call to wait().
156 if (ref.sid > sid) {
157 continue;
158 }
159 // Apply the interest events to filter-out any events that the user may have removed from
160 // the events since the call to wait() was made. This would typically happen via a reentrant
161 // call into the reactor from an event-handler. N.B. EpollErr and EpollHup are always
162 // reported if they occur, regardless of whether they are specified in events.
163 const auto events = ev.events & (ref.events | EpollErr | EpollHup);
164 if (!events) {
165 continue;
166 }
167
168 try {
169 ref.slot(now, fd, events);
170 } catch (const std::exception& e) {
171 TOOLBOX_ERROR << "exception in i/o event handler: " << e.what();
172 }
173 ++work;
174 }
175 return work;
176}
177
178void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot, error_code& ec) noexcept
179{
180 auto& ref = data_[fd];
181 if (ref.sid == sid) {
182 if (ref.events != events) {
183 epoll_.mod(fd, sid, events, ec);
184 if (ec) {
185 return;
186 }
187 ref.events = events;
188 }
189 ref.slot = slot;
190 }
191}
192
193void Reactor::set_events(int fd, int sid, unsigned events, IoSlot slot)
194{
195 auto& ref = data_[fd];
196 if (ref.sid == sid) {
197 if (ref.events != events) {
198 epoll_.mod(fd, sid, events);
199 ref.events = events;
200 }
201 ref.slot = slot;
202 }
203}
204
205void Reactor::set_events(int fd, int sid, unsigned events, error_code& ec) noexcept
206{
207 auto& ref = data_[fd];
208 if (ref.sid == sid && ref.events != events) {
209 epoll_.mod(fd, sid, events, ec);
210 if (ec) {
211 return;
212 }
213 ref.events = events;
214 }
215}
216
217void Reactor::set_events(int fd, int sid, unsigned events)
218{
219 auto& ref = data_[fd];
220 if (ref.sid == sid && ref.events != events) {
221 epoll_.mod(fd, sid, events);
222 ref.events = events;
223 }
224}
225
226void Reactor::unsubscribe(int fd, int sid) noexcept
227{
228 auto& ref = data_[fd];
229 if (ref.sid == sid) {
230 epoll_.del(fd);
231 ref.events = 0;
232 ref.slot.reset();
233 }
234}
235
236} // namespace io
237} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:93
#define TOOLBOX_PROBE_SCOPED(provider, name,...)
Definition Trace.hpp:46
static constexpr int sid(const Event &ev) noexcept
Definition Epoll.hpp:165
void del(int fd) noexcept
Definition Epoll.hpp:231
void mod(int fd, int sid, unsigned events, std::error_code &ec) noexcept
Definition Epoll.hpp:239
static constexpr int fd(const Event &ev) noexcept
Definition Epoll.hpp:161
void add(int fd, int sid, unsigned events)
Definition Epoll.hpp:225
int wait(Event buf[], std::size_t size, std::error_code &ec) noexcept
Returns the number of file descriptors that are ready.
Definition Epoll.hpp:193
void write(std::int64_t val, std::error_code &ec) noexcept
Definition EventFd.hpp:76
std::int64_t read()
Definition EventFd.hpp:67
int fd() const noexcept
Definition EventFd.hpp:66
Reactor(std::size_t size_hint=0)
Definition Reactor.cpp:30
~Reactor() override
Definition Reactor.cpp:37
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:42
int poll(CyclTime now, Duration timeout=NoTimeout)
Definition Reactor.cpp:56
void do_wakeup() noexcept final
Thread-safe.
Definition Reactor.cpp:108
MonoTime mono_time() const noexcept
Definition Time.hpp:140
STL namespace.
int64_t min(const Histogram &h) noexcept
Definition Utility.cpp:41
constexpr Duration NoTimeout
Definition Reactor.hpp:29
void dispatch(CyclTime now, const HookList &l) noexcept
Definition Hook.cpp:24
BasicSlot< CyclTime, int, unsigned > IoSlot
Definition Reactor.hpp:32
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
constexpr bool is_zero(std::chrono::duration< RepT, PeriodT > d) noexcept
Definition Time.hpp:156
MonoClock::time_point MonoTime
Definition Time.hpp:110
Nanos Duration
Definition Time.hpp:40
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:92