35 else if (!tq.empty()) {
37 if ((now.mono_time() - tq.front().expiry()) > 100
ms) {
54 epoll_.
del(notify_.
fd());
61 if (fd >=
static_cast<int>(data_.size())) {
64 auto&
ref = data_[fd];
65 epoll_.
add(fd, ++
ref.sid, events);
69 return {*
this, fd,
ref.sid};
75 using namespace chrono;
99 now = CyclTime::now();
100 last_time_priority_io_polled_ = now.
mono_time();
101 last_time_user_hook_polled_ = now.
mono_time();
105 throw system_error{
ec};
112 cycle_work_ = tqs_[
High].dispatch(now);
119 if (cycle_work_ > 0) {
136 using namespace chrono;
138 const auto& tq = tqs_[
High];
142 next =
min(next, tq.front().expiry() - 200u
s);
146 const auto& tq = tqs_[
Low];
149 next =
min(next, tq.front().expiry());
157 if (currently_handling_priority_events_) [[
unlikely]] {
162 cycle_work_ += do_io_priority_poll(now);
163 cycle_work_ += do_user_priority_poll(now);
166int Reactor::do_io_priority_poll(
MonoTime now)
noexcept
170 const bool enabled = priority_io_poll_threshold_ != Micros::max();
176 last_time_priority_io_polled_ = MonoClock::now();
185 TOOLBOX_ERROR <<
"epoll failure during high priority io poll: "
186 <<
ec <<
" [" <<
ec.message() <<
']';
192 }
catch (
const std::exception& e) {
193 TOOLBOX_ERROR <<
"exception during high priority io poll: " << e.what();
199int Reactor::do_user_priority_poll(
MonoTime now)
noexcept
203 const bool enabled = (user_hook_poll_threshold_ != Micros::max()) &&
204 priority_poll_user_hook_;
210 last_time_user_hook_polled_ = MonoClock::now();
213 currently_handling_priority_events_ =
true;
215 currently_handling_priority_events_ =
false;
218 ret = priority_poll_user_hook_(CyclTime::current());
220 }
catch (
const std::exception& e) {
221 TOOLBOX_ERROR <<
"exception during user high priority hook: " << e.what();
227int Reactor::dispatch(CyclTime now, Event*
buf,
int size,
Priority priority)
230 assert(!currently_handling_priority_events_);
231 currently_handling_priority_events_ =
true;
234 currently_handling_priority_events_ =
false;
238 for (
int i{0};
i <
size; ++
i) {
242 const auto&
ref = data_[fd];
244 if (
ref.priority != priority) {
248 if (fd == notify_.
fd()) {
273 ref.slot(now, fd, events);
274 }
catch (
const std::exception& e) {
275 TOOLBOX_ERROR <<
"exception in i/o event handler: " << e.what();
282void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot, error_code&
ec)
noexcept
284 auto&
ref = data_[fd];
285 if (
ref.sid == sid) {
286 if (
ref.events != events) {
287 epoll_.mod(fd, sid, events,
ec);
297void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot)
299 auto&
ref = data_[fd];
300 if (
ref.sid == sid) {
301 if (
ref.events != events) {
302 epoll_.
mod(fd, sid, events);
309void Reactor::set_events(
int fd,
int sid,
unsigned events, error_code&
ec)
noexcept
311 auto&
ref = data_[fd];
312 if (
ref.sid == sid &&
ref.events != events) {
313 epoll_.mod(fd, sid, events,
ec);
321void Reactor::set_events(
int fd,
int sid,
unsigned events)
323 auto&
ref = data_[fd];
324 if (
ref.sid == sid &&
ref.events != events) {
325 epoll_.
mod(fd, sid, events);
330void Reactor::unsubscribe(
int fd,
int sid)
noexcept
332 auto&
ref = data_[fd];
333 if (
ref.sid == sid) {
341void Reactor::set_io_priority(
int fd,
int sid,
Priority priority)
noexcept
343 auto&
ref = data_[fd];
344 if (
ref.sid == sid &&
ref.priority != priority) {
345 ref.priority = priority;
#define TOOLBOX_PROBE_SCOPED(provider, name,...)