Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_IO_REACTOR_HPP
18#define TOOLBOX_IO_REACTOR_HPP
19
20#include <toolbox/io/Epoll.hpp>
22#include <toolbox/io/Hook.hpp>
23#include <toolbox/io/Timer.hpp>
24#include <toolbox/io/Waker.hpp>
25
26namespace toolbox {
27inline namespace io {
28
29constexpr Duration NoTimeout{-1};
30enum class Priority { High = 0, Low = 1 };
31using IoSlot = BasicSlot<void(CyclTime, int, unsigned)>;
33
34class TOOLBOX_API Reactor : public Waker {
35 public:
37 // HookType describes the kind of hook.
38 enum class HookType : int {
39 // EndOfCycleNoWait hooks are called at the end of the Reactor cycle.
40 // The Reactor cycle will not wait for i/o and/or timer events
41 // while any of these hooks are installed.
42 EndOfCycleNoWait = 1,
43 // EndOfEventDispatch hooks are called after all i/o and timer events have been dispatched.
44 // These hooks are called, and only if, work done in the cycle is greater than zero.
45 // And they are always called before EndOfCycleNoWait hooks.
46 EndOfEventDispatch = 2,
47 };
48 class Handle {
49 public:
50 Handle(Reactor& reactor, int fd, int sid)
51 : reactor_{&reactor}
52 , fd_{fd}
53 , sid_{sid}
54 {
55 }
56 constexpr Handle(std::nullptr_t = nullptr) noexcept {} // NOLINT(hicpp-explicit-conversions)
57 ~Handle() { reset(); }
58
59 // Copy.
60 Handle(const Handle&) = delete;
61 Handle& operator=(const Handle&) = delete;
62
63 // Move.
64 Handle(Handle&& rhs) noexcept
65 : reactor_{rhs.reactor_}
66 , fd_{rhs.fd_}
67 , sid_{rhs.sid_}
68 {
69 rhs.reactor_ = nullptr;
70 rhs.fd_ = -1;
71 rhs.sid_ = 0;
72 }
73 Handle& operator=(Handle&& rhs) noexcept
74 {
75 reset();
76 swap(rhs);
77 return *this;
78 }
79 bool empty() const noexcept { return reactor_ == nullptr; }
80 explicit operator bool() const noexcept { return reactor_ != nullptr; }
81 auto fd() const noexcept { return fd_; }
82 auto sid() const noexcept { return sid_; }
83
84 void reset(std::nullptr_t = nullptr) noexcept
85 {
86 if (reactor_) {
87 reactor_->unsubscribe(fd_, sid_);
88 reactor_ = nullptr;
89 fd_ = -1;
90 sid_ = 0;
91 }
92 }
93 void swap(Handle& rhs) noexcept
94 {
95 std::swap(reactor_, rhs.reactor_);
96 std::swap(fd_, rhs.fd_);
97 std::swap(sid_, rhs.sid_);
98 }
99
101 void set_events(unsigned events, IoSlot slot, std::error_code& ec) noexcept
102 {
103 assert(reactor_);
104 reactor_->set_events(fd_, sid_, events, slot, ec);
105 }
106 void set_events(unsigned events, IoSlot slot)
107 {
108 assert(reactor_);
109 reactor_->set_events(fd_, sid_, events, slot);
110 }
111 void set_events(unsigned events, std::error_code& ec) noexcept
112 {
113 assert(reactor_);
114 reactor_->set_events(fd_, sid_, events, ec);
115 }
116 void set_events(unsigned events)
117 {
118 assert(reactor_);
119 reactor_->set_events(fd_, sid_, events);
120 }
121
123 {
124 assert(reactor_);
125 reactor_->set_io_priority(fd_, sid_, priority);
126 }
127
128 private:
129 Reactor* reactor_{nullptr};
130 int fd_{-1}, sid_{0};
131 };
132
133 explicit Reactor(std::size_t size_hint = 0);
134 ~Reactor() override;
135
136 // Copy.
137 Reactor(const Reactor&) = delete;
138 Reactor& operator=(const Reactor&) = delete;
139
140 // Move.
141 Reactor(Reactor&&) = delete;
143
144 // clang-format off
145 [[nodiscard]] Handle subscribe(int fd, unsigned events, IoSlot slot);
146
148 [[nodiscard]] Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
149 {
150 return tqs_[static_cast<size_t>(priority)].insert(expiry, interval, slot);
151 }
153 [[nodiscard]] Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
154 {
155 return tqs_[static_cast<size_t>(priority)].insert(expiry, slot);
156 }
157 // clang-format on
158
159 void add_hook(Hook& hook, HookType ht = HookType::EndOfCycleNoWait) noexcept
160 {
161 switch (ht) {
162 case HookType::EndOfCycleNoWait:
163 end_of_cycle_no_wait_hooks.push_back(hook);
164 break;
165 case HookType::EndOfEventDispatch:
166 end_of_event_dispatch_hooks_.push_back(hook);
167 break;
168 }
169 }
174 int poll(CyclTime now, Duration timeout = NoTimeout);
175
176 void yield() noexcept;
177
178 void set_high_priority_poll_threshold(Micros thresh) { priority_io_poll_threshold_ = thresh; }
179
180 void set_user_high_priority_hook(PollSlot slot) { priority_poll_user_hook_ = slot; }
181 void set_user_hook_poll_threshold(Micros thresh) { user_hook_poll_threshold_ = thresh; }
182
183 protected:
185 void do_wakeup() noexcept final;
186
187 private:
188 MonoTime next_expiry(MonoTime next) const;
189
190 // dispatch events only for file descriptors with specified priority
191 int dispatch(CyclTime now, Event* buf, int size, Priority priority);
192 void set_events(int fd, int sid, unsigned events, IoSlot slot, std::error_code& ec) noexcept;
193 void set_events(int fd, int sid, unsigned events, IoSlot slot);
194 void set_events(int fd, int sid, unsigned events, std::error_code& ec) noexcept;
195 void set_events(int fd, int sid, unsigned events);
196 void unsubscribe(int fd, int sid) noexcept;
197 void set_io_priority(int fd, int sid, Priority priority) noexcept;
198 int do_io_priority_poll(WallTime now) noexcept;
199 int do_user_priority_poll(WallTime now) noexcept;
200
201 struct Data {
202 int sid{};
203 unsigned events{};
204 IoSlot slot;
205 Priority priority = Priority::Low;
206 };
207
208 Epoll epoll_;
209 std::vector<Data> data_;
210 EventFd notify_{0, EFD_NONBLOCK};
211 static_assert(static_cast<int>(Priority::High) == 0);
212 static_assert(static_cast<int>(Priority::Low) == 1);
213 TimerPool tp_;
214 std::array<TimerQueue, 2> tqs_{tp_, tp_};
215 HookList end_of_cycle_no_wait_hooks, end_of_event_dispatch_hooks_;
216 Micros priority_io_poll_threshold_ = Micros::max();
217 Micros user_hook_poll_threshold_ = Micros::max();
218 WallTime last_time_priority_io_polled_{};
219 WallTime last_time_user_hook_polled_{};
220 PollSlot priority_poll_user_hook_;
221 int cycle_work_{0};
222 bool currently_handling_priority_events_{false};
223};
224
225} // namespace io
226} // namespace toolbox
227
228#endif // TOOLBOX_IO_REACTOR_HPP
#define TOOLBOX_API
Definition Config.h:39
auto fd() const noexcept
Definition Reactor.hpp:81
Handle(const Handle &)=delete
constexpr Handle(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:56
void swap(Handle &rhs) noexcept
Definition Reactor.hpp:93
Handle(Reactor &reactor, int fd, int sid)
Definition Reactor.hpp:50
bool empty() const noexcept
Definition Reactor.hpp:79
Handle & operator=(const Handle &)=delete
void set_events(unsigned events, IoSlot slot)
Definition Reactor.hpp:106
void set_events(unsigned events, IoSlot slot, std::error_code &ec) noexcept
Modify I/O event subscription.
Definition Reactor.hpp:101
void set_events(unsigned events)
Definition Reactor.hpp:116
Handle & operator=(Handle &&rhs) noexcept
Definition Reactor.hpp:73
auto sid() const noexcept
Definition Reactor.hpp:82
void reset(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:84
void set_io_priority(Priority priority)
Definition Reactor.hpp:122
Handle(Handle &&rhs) noexcept
Definition Reactor.hpp:64
void set_events(unsigned events, std::error_code &ec) noexcept
Definition Reactor.hpp:111
Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:148
Reactor(Reactor &&)=delete
Reactor & operator=(Reactor &&)=delete
void set_user_hook_poll_threshold(Micros thresh)
Definition Reactor.hpp:181
void set_user_high_priority_hook(PollSlot slot)
Definition Reactor.hpp:180
Reactor & operator=(const Reactor &)=delete
void add_hook(Hook &hook, HookType ht=HookType::EndOfCycleNoWait) noexcept
Definition Reactor.hpp:159
Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:153
Reactor(const Reactor &)=delete
STL namespace.
constexpr Duration NoTimeout
Definition Reactor.hpp:29
boost::intrusive::list< Hook, boost::intrusive::constant_time_size< false > > HookList
Definition Hook.hpp:39
BasicSlot< int(CyclTime)> PollSlot
Definition Reactor.hpp:32
epoll_event EpollEvent
Definition Epoll.hpp:155
std::chrono::microseconds Micros
Definition Time.hpp:37
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
constexpr auto bind() noexcept
Definition Slot.hpp:97