Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Runner.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Runner.hpp"
18
19#include "Reactor.hpp"
20
21#include <toolbox/sys/Log.hpp>
23
24namespace toolbox {
25inline namespace io {
26namespace {
27void run_reactor(Reactor& r, long busy_cycles, ThreadConfig config, const std::atomic<bool>& stop)
28{
29 // ==== KEEP IN SYNC WITH CHANGES IN run_metrics_reactor ====
31 try {
33 TOOLBOX_NOTICE << "started " << config.name << " thread";
34 long i{0};
35 while (!stop.load(std::memory_order_acquire)) {
36 // Busy-wait for "busy cycles" after work was done.
37 if (r.poll(CyclTime::now(), i++ < busy_cycles ? 0s : NoTimeout) > 0) {
38 // Reset counter when work has been done.
39 i = 0;
40 }
41 }
42 } catch (const std::exception& e) {
43 TOOLBOX_CRIT << "exception on " << config.name << " thread: " << e.what();
45 }
46 TOOLBOX_NOTICE << "stopping " << config.name << " thread";
47}
48
50{
51 // Record microseconds with 3sf and max expected value of one second.
52 return HistogramPtr{new Histogram{1, 1'000'000, 3}};
53}
54
56{
57 // Histogram is 100% accurate to 256, covering Reactor::MaxEvents of 128 work items.
58 return HistogramPtr{new Histogram{1, 1000, 2}};
59}
60
61void run_metrics_reactor(Reactor& r, long busy_cycles, ThreadConfig config,
62 const std::atomic<bool>& stop, MetricCallbackFunction metric_cb,
64{
65 constexpr std::chrono::seconds MetricInterval = 60s;
66
67 // ==== KEEP IN SYNC WITH CHANGES IN run_reactor ====
69 try {
71 TOOLBOX_NOTICE << "started " << config.name << " thread";
72
74 // 128 possible buffer slots in poll + high and low priority timers
76
77 long i{0};
78 auto metric_time = MonoClock::now() + MetricInterval;
79 while (!stop.load(std::memory_order_acquire)) {
80 // Busy-wait for "busy cycles" after work was done.
81 auto work = r.poll(CyclTime::now(), i++ < busy_cycles ? 0s : NoTimeout);
82 const auto now = CyclTime::current();
83 if (work > 0) {
84 // Don't skew distribution with a lot of zero work.
85 const auto elapsed_us = std::chrono::duration_cast<std::chrono::microseconds>(
86 MonoClock::now() - now.mono_time());
87 time_hist->record_value(elapsed_us.count());
88 work_hist->record_value(work);
89 loop_cb(now);
90 // Reset counter when work has been done.
91 i = 0;
92 }
93 if (now.mono_time() >= metric_time) {
94 // Metric reporting.
95 metric_time = now.mono_time() + MetricInterval;
96 metric_cb(now, std::move(time_hist), std::move(work_hist));
99 }
100 }
101 } catch (const std::exception& e) {
102 TOOLBOX_CRIT << "exception on " << config.name << " thread: " << e.what();
103 kill(getpid(), SIGTERM);
104 }
105 TOOLBOX_NOTICE << "stopping " << config.name << " thread";
106}
107
108} // namespace
109
111: reactor_{r}
112, thread_{run_reactor, std::ref(r), busy_cycles, config, std::cref(stop_)}
113{
114}
115
121
129
131{
132 stop_.store(true, std::memory_order_release);
133 reactor_.wakeup();
134 thread_.join();
135}
136
137} // namespace io
138} // namespace toolbox
#define TOOLBOX_CRIT
Definition Log.hpp:92
#define TOOLBOX_NOTICE
Definition Log.hpp:96
ReactorRunner(Reactor &r, long busy_cycles, ThreadConfig config)
Definition Runner.cpp:110
void wakeup() noexcept
Definition Waker.hpp:40
STL namespace.
std::function< void(CyclTime now)> LoopCallbackFunction
LoopCallbackFunction called at end of each Reactor loop, indicating micros taken and work done.
Definition Runner.hpp:37
constexpr Duration NoTimeout
Definition Reactor.hpp:29
std::function< void(CyclTime now, HistogramPtr &&time_hist, HistogramPtr &&work_hist)> MetricCallbackFunction
MetricCallbackFunction implementer is responsible for deleting the Histogram.
Definition Runner.hpp:35
std::unique_ptr< Histogram > HistogramPtr
Definition Runner.hpp:31
void set_thread_attrs(const ThreadConfig &config)
Definition Thread.cpp:70
void sig_block_all()
Block all signals.
Definition Signal.cpp:82
constexpr auto bind() noexcept
Definition Slot.hpp:92
ThreadConfig holds the thread attributes.
Definition Thread.hpp:29