38 else if (!tq.empty()) {
40 if ((now.mono_time() - tq.front().expiry()) > 100
ms) {
54 device_.emplace<ImmediateDevice>();
59 auto&
dev = device_.emplace<BlockingDevice>();
67 [
this](ImmediateDevice&
dev) {
dev.low_prio_epoll.del(notify_.
fd()); },
68 [
this](BlockingDevice&
dev) {
dev.epoll.del(notify_.
fd()); }
76 if (fd >=
static_cast<int>(data_.size())) {
79 auto&
ref = data_[fd];
81 [&](ImmediateDevice&
dev) {
dev.low_prio_epoll.add(fd, ++
ref.sid, events); },
82 [&](BlockingDevice&
dev) {
dev.epoll.add(fd, ++
ref.sid, events); }
87 return {*
this, fd,
ref.sid};
93 using namespace chrono;
117 now = CyclTime::now();
118 last_time_priority_io_polled_ = now.
wall_time();
122 throw system_error{
ec};
129 cycle_work_ = tqs_[
High].dispatch(now);
136 if (cycle_work_ > 0) {
143int Reactor::poll_immediate(ImmediateDevice&
dev, CyclTime now)
152 last_time_priority_io_polled_ = now.wall_time();
156 throw system_error{
ec};
166 throw system_error{
ec};
178 cycle_work_ = tqs_[
High].dispatch(now);
184 if (cycle_work_ > 0) {
194 [&](ImmediateDevice&
dev) {
return poll_immediate(
dev, now); },
195 [&](BlockingDevice&
dev) {
return poll_blocking(
dev, now,
timeout); }
201 if (std::holds_alternative<BlockingDevice>(device_)) {
211 using namespace chrono;
213 const auto& tq = tqs_[
High];
217 next =
min(next, tq.front().expiry() - 200u
s);
221 const auto& tq = tqs_[
Low];
224 next =
min(next, tq.front().expiry());
232 if (currently_handling_priority_events_) [[
unlikely]] {
236 if (priority_io_poll_threshold == Micros::max()) {
241 if (now - last_time_priority_io_polled_ > priority_io_poll_threshold) {
242 last_time_priority_io_polled_ = now;
248 [&](ImmediateDevice&
dev) ->
Epoll& {
return dev.high_prio_epoll; },
249 [&](BlockingDevice&
dev) ->
Epoll& {
return dev.epoll; }
256 throw system_error{
ec};
268 assert(!currently_handling_priority_events_);
269 currently_handling_priority_events_ =
true;
272 currently_handling_priority_events_ =
false;
276 for (
int i{0};
i <
size; ++
i) {
280 const auto&
ref = data_[fd];
282 if (
ref.priority != priority) {
286 if (fd == notify_.
fd()) {
311 ref.slot(now, fd, events);
312 }
catch (
const std::exception& e) {
313 TOOLBOX_ERROR <<
"exception in i/o event handler: " << e.what();
320Epoll& Reactor::get_resident_epoll(Data&
data)
322 return std::visit(overloaded{
323 [&](ImmediateDevice&
dev) -> Epoll& {
325 :
dev.low_prio_epoll;
327 [&](BlockingDevice&
dev) -> Epoll& {
return dev.epoll; }
331void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot, error_code&
ec)
noexcept
333 auto&
ref = data_[fd];
334 if (
ref.sid == sid) {
335 if (
ref.events != events) {
336 Epoll& epoll = get_resident_epoll(
ref);
337 epoll.mod(fd, sid, events,
ec);
347void Reactor::set_events(
int fd,
int sid,
unsigned events,
IoSlot slot)
349 auto&
ref = data_[fd];
350 if (
ref.sid == sid) {
351 if (
ref.events != events) {
352 Epoll& epoll = get_resident_epoll(
ref);
353 epoll.mod(fd, sid, events);
360void Reactor::set_events(
int fd,
int sid,
unsigned events, error_code&
ec)
noexcept
362 auto&
ref = data_[fd];
363 if (
ref.sid == sid &&
ref.events != events) {
364 Epoll& epoll = get_resident_epoll(
ref);
365 epoll.mod(fd, sid, events,
ec);
373void Reactor::set_events(
int fd,
int sid,
unsigned events)
375 auto&
ref = data_[fd];
376 if (
ref.sid == sid &&
ref.events != events) {
377 Epoll& epoll = get_resident_epoll(
ref);
378 epoll.mod(fd, sid, events);
383void Reactor::unsubscribe(
int fd,
int sid)
noexcept
385 auto&
ref = data_[fd];
386 if (
ref.sid == sid) {
387 Epoll& epoll = get_resident_epoll(
ref);
395void Reactor::set_io_priority(
int fd,
int sid,
Priority priority)
noexcept
397 auto&
ref = data_[fd];
398 if (
ref.sid == sid &&
ref.priority != priority) {
399 std::visit(overloaded{
400 [&](ImmediateDevice&
dev) {
402 dev.low_prio_epoll.del(fd);
403 dev.high_prio_epoll.add(fd, sid,
ref.events);
405 dev.high_prio_epoll.del(fd);
406 dev.low_prio_epoll.add(fd, sid,
ref.events);
409 [&](BlockingDevice& ) {}
411 ref.priority = priority;
#define TOOLBOX_PROBE_SCOPED(provider, name,...)