17#ifndef TOOLBOX_IO_REACTOR_HPP
18#define TOOLBOX_IO_REACTOR_HPP
57 EndOfEventDispatch = 2,
67 constexpr Handle(std::nullptr_t =
nullptr) noexcept {}
76 : reactor_{rhs.reactor_}
80 rhs.reactor_ =
nullptr;
90 bool empty() const noexcept {
return reactor_ ==
nullptr; }
91 explicit operator bool() const noexcept {
return reactor_ !=
nullptr; }
92 auto fd() const noexcept {
return fd_; }
93 auto sid() const noexcept {
return sid_; }
95 void reset(std::nullptr_t =
nullptr) noexcept
98 reactor_->unsubscribe(fd_, sid_);
106 std::swap(reactor_, rhs.reactor_);
107 std::swap(fd_, rhs.fd_);
108 std::swap(sid_, rhs.sid_);
115 reactor_->set_events(fd_, sid_, events, slot, ec);
120 reactor_->set_events(fd_, sid_, events, slot);
122 void set_events(
unsigned events, std::error_code& ec)
noexcept
125 reactor_->set_events(fd_, sid_, events, ec);
130 reactor_->set_events(fd_, sid_, events);
136 reactor_->set_io_priority(fd_, sid_, priority);
141 int fd_{-1}, sid_{0};
144 explicit Reactor(ReactorMode mode = ReactorMode::Blocking, std::size_t size_hint = 0);
156 [[nodiscard]]
Handle subscribe(
int fd,
unsigned events,
IoSlot slot);
161 return tqs_[
static_cast<size_t>(priority)].insert(expiry, interval, slot);
166 return tqs_[
static_cast<size_t>(priority)].insert(expiry, slot);
173 case HookType::EndOfCycleNoWait:
174 end_of_cycle_no_wait_hooks.push_back(hook);
176 case HookType::EndOfEventDispatch:
177 end_of_event_dispatch_hooks_.push_back(hook);
193 void do_wakeup() noexcept final;
198 struct BlockingDevice {
202 struct ImmediateDevice {
203 Epoll low_prio_epoll;
204 Epoll high_prio_epoll;
214 Epoll& get_resident_epoll(Data& data);
215 int poll_immediate(ImmediateDevice& dev,CyclTime now);
216 int poll_blocking(BlockingDevice& dev,CyclTime now, Duration timeout);
218 int dispatch(CyclTime now, Event* buf,
int size, Priority priority);
219 void set_events(
int fd,
int sid,
unsigned events, IoSlot slot, std::error_code& ec)
noexcept;
220 void set_events(
int fd,
int sid,
unsigned events, IoSlot slot);
221 void set_events(
int fd,
int sid,
unsigned events, std::error_code& ec)
noexcept;
222 void set_events(
int fd,
int sid,
unsigned events);
223 void unsubscribe(
int fd,
int sid)
noexcept;
224 void set_io_priority(
int fd,
int sid, Priority priority)
noexcept;
226 std::variant<BlockingDevice, ImmediateDevice> device_;
227 std::vector<Data> data_;
228 EventFd notify_{0, EFD_NONBLOCK};
229 static_assert(
static_cast<int>(Priority::High) == 0);
230 static_assert(
static_cast<int>(Priority::Low) == 1);
232 std::array<TimerQueue, 2> tqs_{tp_, tp_};
233 HookList end_of_cycle_no_wait_hooks, end_of_event_dispatch_hooks_;
234 Micros priority_io_poll_threshold = Micros::max();
235 WallTime last_time_priority_io_polled_{};
237 bool currently_handling_priority_events_{
false};