Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_IO_REACTOR_HPP
18#define TOOLBOX_IO_REACTOR_HPP
19
20#include <toolbox/io/Epoll.hpp>
22#include <toolbox/io/Hook.hpp>
23#include <toolbox/io/Timer.hpp>
24#include <toolbox/io/Waker.hpp>
25
26namespace toolbox {
27inline namespace io {
28
29constexpr Duration NoTimeout{-1};
30enum class Priority { High = 0, Low = 1 };
31using IoSlot = BasicSlot<void(CyclTime, int, unsigned)>;
32
33class TOOLBOX_API Reactor : public Waker {
34 public:
36 // HookType describes the kind of hook.
37 enum class HookType : int {
38 // EndOfCycleNoWait hooks are called at the end of the Reactor cycle.
39 // The Reactor cycle will not wait for i/o and/or timer events
40 // while any of these hooks are installed.
41 EndOfCycleNoWait = 1,
42 // EndOfEventDispatch hooks are called after all i/o and timer events have been dispatched.
43 // These hooks are called, and only if, work done in the cycle is greater than zero.
44 // And they are always called before EndOfCycleNoWait hooks.
45 EndOfEventDispatch = 2,
46 };
47 class Handle {
48 public:
49 Handle(Reactor& reactor, int fd, int sid)
50 : reactor_{&reactor}
51 , fd_{fd}
52 , sid_{sid}
53 {
54 }
55 constexpr Handle(std::nullptr_t = nullptr) noexcept {} // NOLINT(hicpp-explicit-conversions)
56 ~Handle() { reset(); }
57
58 // Copy.
59 Handle(const Handle&) = delete;
60 Handle& operator=(const Handle&) = delete;
61
62 // Move.
63 Handle(Handle&& rhs) noexcept
64 : reactor_{rhs.reactor_}
65 , fd_{rhs.fd_}
66 , sid_{rhs.sid_}
67 {
68 rhs.reactor_ = nullptr;
69 rhs.fd_ = -1;
70 rhs.sid_ = 0;
71 }
72 Handle& operator=(Handle&& rhs) noexcept
73 {
74 reset();
75 swap(rhs);
76 return *this;
77 }
78 bool empty() const noexcept { return reactor_ == nullptr; }
79 explicit operator bool() const noexcept { return reactor_ != nullptr; }
80 auto fd() const noexcept { return fd_; }
81 auto sid() const noexcept { return sid_; }
82
83 void reset(std::nullptr_t = nullptr) noexcept
84 {
85 if (reactor_) {
86 reactor_->unsubscribe(fd_, sid_);
87 reactor_ = nullptr;
88 fd_ = -1;
89 sid_ = 0;
90 }
91 }
92 void swap(Handle& rhs) noexcept
93 {
94 std::swap(reactor_, rhs.reactor_);
95 std::swap(fd_, rhs.fd_);
96 std::swap(sid_, rhs.sid_);
97 }
98
100 void set_events(unsigned events, IoSlot slot, std::error_code& ec) noexcept
101 {
102 assert(reactor_);
103 reactor_->set_events(fd_, sid_, events, slot, ec);
104 }
105 void set_events(unsigned events, IoSlot slot)
106 {
107 assert(reactor_);
108 reactor_->set_events(fd_, sid_, events, slot);
109 }
110 void set_events(unsigned events, std::error_code& ec) noexcept
111 {
112 assert(reactor_);
113 reactor_->set_events(fd_, sid_, events, ec);
114 }
115 void set_events(unsigned events)
116 {
117 assert(reactor_);
118 reactor_->set_events(fd_, sid_, events);
119 }
120
122 {
123 assert(reactor_);
124 reactor_->set_io_priority(fd_, sid_, priority);
125 }
126
127 private:
128 Reactor* reactor_{nullptr};
129 int fd_{-1}, sid_{0};
130 };
131
132 explicit Reactor(std::size_t size_hint = 0);
133 ~Reactor() override;
134
135 // Copy.
136 Reactor(const Reactor&) = delete;
137 Reactor& operator=(const Reactor&) = delete;
138
139 // Move.
140 Reactor(Reactor&&) = delete;
142
143 // clang-format off
144 [[nodiscard]] Handle subscribe(int fd, unsigned events, IoSlot slot);
145
147 [[nodiscard]] Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
148 {
149 return tqs_[static_cast<size_t>(priority)].insert(expiry, interval, slot);
150 }
152 [[nodiscard]] Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
153 {
154 return tqs_[static_cast<size_t>(priority)].insert(expiry, slot);
155 }
156 // clang-format on
157
158 void add_hook(Hook& hook, HookType ht = HookType::EndOfCycleNoWait) noexcept
159 {
160 switch (ht) {
161 case HookType::EndOfCycleNoWait:
162 end_of_cycle_no_wait_hooks.push_back(hook);
163 break;
164 case HookType::EndOfEventDispatch:
165 end_of_event_dispatch_hooks_.push_back(hook);
166 break;
167 }
168 }
172 int poll(CyclTime now, Duration timeout = NoTimeout);
173
174 void yield();
175
176 void set_high_priority_poll_threshold(Micros thresh) { priority_io_poll_threshold = thresh; }
177
178 protected:
180 void do_wakeup() noexcept final;
181
182 private:
183 MonoTime next_expiry(MonoTime next) const;
184
185 // dispatch events only for file descriptors with specified priority
186 int dispatch(CyclTime now, Event* buf, int size, Priority priority);
187 void set_events(int fd, int sid, unsigned events, IoSlot slot, std::error_code& ec) noexcept;
188 void set_events(int fd, int sid, unsigned events, IoSlot slot);
189 void set_events(int fd, int sid, unsigned events, std::error_code& ec) noexcept;
190 void set_events(int fd, int sid, unsigned events);
191 void unsubscribe(int fd, int sid) noexcept;
192 void set_io_priority(int fd, int sid, Priority priority) noexcept;
193
194 struct Data {
195 int sid{};
196 unsigned events{};
197 IoSlot slot;
198 Priority priority = Priority::Low;
199 };
200 Epoll epoll_;
201 Epoll high_prio_epoll_;
202 std::vector<Data> data_;
203 EventFd notify_{0, EFD_NONBLOCK};
204 static_assert(static_cast<int>(Priority::High) == 0);
205 static_assert(static_cast<int>(Priority::Low) == 1);
206 TimerPool tp_;
207 std::array<TimerQueue, 2> tqs_{tp_, tp_};
208 HookList end_of_cycle_no_wait_hooks, end_of_event_dispatch_hooks_;
209 Micros priority_io_poll_threshold = Micros::max();
210 WallTime last_time_priority_io_polled_{};
211 int cycle_work_{0};
212 bool currently_handling_priority_events_{false};
213};
214
215} // namespace io
216} // namespace toolbox
217
218#endif // TOOLBOX_IO_REACTOR_HPP
#define TOOLBOX_API
Definition Config.h:39
auto fd() const noexcept
Definition Reactor.hpp:80
Handle(const Handle &)=delete
constexpr Handle(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:55
void swap(Handle &rhs) noexcept
Definition Reactor.hpp:92
Handle(Reactor &reactor, int fd, int sid)
Definition Reactor.hpp:49
bool empty() const noexcept
Definition Reactor.hpp:78
Handle & operator=(const Handle &)=delete
void set_events(unsigned events, IoSlot slot)
Definition Reactor.hpp:105
void set_events(unsigned events, IoSlot slot, std::error_code &ec) noexcept
Modify I/O event subscription.
Definition Reactor.hpp:100
void set_events(unsigned events)
Definition Reactor.hpp:115
Handle & operator=(Handle &&rhs) noexcept
Definition Reactor.hpp:72
auto sid() const noexcept
Definition Reactor.hpp:81
void reset(std::nullptr_t=nullptr) noexcept
Definition Reactor.hpp:83
void set_io_priority(Priority priority)
Definition Reactor.hpp:121
Handle(Handle &&rhs) noexcept
Definition Reactor.hpp:63
void set_events(unsigned events, std::error_code &ec) noexcept
Definition Reactor.hpp:110
Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:147
Reactor(Reactor &&)=delete
Reactor & operator=(Reactor &&)=delete
Reactor & operator=(const Reactor &)=delete
void add_hook(Hook &hook, HookType ht=HookType::EndOfCycleNoWait) noexcept
Definition Reactor.hpp:158
Timer timer(MonoTime expiry, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:152
Reactor(const Reactor &)=delete
void set_high_priority_poll_threshold(Micros thresh)
Definition Reactor.hpp:176
STL namespace.
constexpr Duration NoTimeout
Definition Reactor.hpp:29
boost::intrusive::list< Hook, boost::intrusive::constant_time_size< false > > HookList
Definition Hook.hpp:39
epoll_event EpollEvent
Definition Epoll.hpp:155
std::chrono::microseconds Micros
Definition Time.hpp:37
WallClock::time_point WallTime
Definition Time.hpp:112
MonoClock::time_point MonoTime
Definition Time.hpp:111
Nanos Duration
Definition Time.hpp:41
constexpr auto bind() noexcept
Definition Slot.hpp:97