35 else if (!tq.empty()) {
37 if ((now.mono_time() - tq.front().expiry()) > 100
ms) {
54 epoll_.
del(notify_.
fd());
61 if (fd >=
static_cast<int>(data_.size())) {
64 auto&
ref = data_[fd];
65 epoll_.
add(fd, ++
ref.sid, events);
69 return {*
this, fd,
ref.sid};
75 using namespace chrono;
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.
wall_time();
104 throw system_error{
ec};
111 cycle_work_ = tqs_[
High].dispatch(now);
118 if (cycle_work_ > 0) {
135 using namespace chrono;
137 const auto& tq = tqs_[
High];
141 next =
min(next, tq.front().expiry() - 200u
s);
145 const auto& tq = tqs_[
Low];
148 next =
min(next, tq.front().expiry());
156 if (currently_handling_priority_events_) [[
unlikely]] {
161 cycle_work_ += do_io_priority_poll(now);
162 cycle_work_ += do_user_priority_poll(now);
165int Reactor::do_io_priority_poll(
WallTime now)
noexcept
169 const bool enabled = priority_io_poll_threshold_ != Micros::max();
175 last_time_priority_io_polled_ = WallClock::now();
184 TOOLBOX_ERROR <<
"epoll failure during high priority io poll: "
185 <<
ec <<
" [" <<
ec.message() <<
']';
191 }
catch (
const std::exception& e) {
192 TOOLBOX_ERROR <<
"exception during high priority io poll: " << e.what();
198int Reactor::do_user_priority_poll(
WallTime now)
noexcept
202 const bool enabled = (user_hook_poll_threshold_ != Micros::max()) &&
203 priority_poll_user_hook_;
209 last_time_user_hook_polled_ = WallClock::now();
212 currently_handling_priority_events_ =
true;
214 currently_handling_priority_events_ =
false;
217 ret = priority_poll_user_hook_(CyclTime::current());
219 }
catch (
const std::exception& e) {
220 TOOLBOX_ERROR <<
"exception during user high priority hook: " << e.what();
226int Reactor::dispatch(CyclTime now, Event*
buf,
int size,
Priority priority)
229 assert(!currently_handling_priority_events_);
230 currently_handling_priority_events_ =
true;
233 currently_handling_priority_events_ =
false;
237 for (
int i{0};
i <
size; ++
i) {
241 const auto&
ref = data_[fd];
243 if (
ref.priority != priority) {
247 if (fd == notify_.
fd()) {
272 ref.slot(now, fd, events);
273 }
catch (
const std::exception& e) {
274 TOOLBOX_ERROR <<
"exception in i/o event handler: " << e.what();
281void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot, error_code&
ec)
noexcept
283 auto&
ref = data_[fd];
284 if (
ref.sid == sid) {
285 if (
ref.events != events) {
286 epoll_.mod(fd, sid, events,
ec);
296void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot)
298 auto&
ref = data_[fd];
299 if (
ref.sid == sid) {
300 if (
ref.events != events) {
301 epoll_.
mod(fd, sid, events);
308void Reactor::set_events(
int fd,
int sid,
unsigned events, error_code&
ec)
noexcept
310 auto&
ref = data_[fd];
311 if (
ref.sid == sid &&
ref.events != events) {
312 epoll_.mod(fd, sid, events,
ec);
320void Reactor::set_events(
int fd,
int sid,
unsigned events)
322 auto&
ref = data_[fd];
323 if (
ref.sid == sid &&
ref.events != events) {
324 epoll_.
mod(fd, sid, events);
329void Reactor::unsubscribe(
int fd,
int sid)
noexcept
331 auto&
ref = data_[fd];
332 if (
ref.sid == sid) {
340void Reactor::set_io_priority(
int fd,
int sid,
Priority priority)
noexcept
342 auto&
ref = data_[fd];
343 if (
ref.sid == sid &&
ref.priority != priority) {
344 ref.priority = priority;
#define TOOLBOX_PROBE_SCOPED(provider, name,...)