Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.ut.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
22
23#include <boost/test/unit_test.hpp>
24#include <boost/test/data/test_case.hpp>
25#include <thread>
26#include <string_view>
27
28using namespace std;
29using namespace toolbox;
30
31namespace {
32
33struct TestHandler : RefCount<TestHandler, ThreadUnsafePolicy> {
34 void on_input(CyclTime /*now*/, int fd, unsigned /*events*/)
35 {
36 char buf[4];
37 os::recv(fd, buf, 4, 0);
38 if (strcmp(buf, "foo") == 0) {
39 ++matches;
40 }
41 }
42 int matches{};
43};
44
45} // namespace
46
47namespace param {
51
52 static constexpr ReactorModeWrapper reactor_modes[] = {
53 {ReactorMode::Blocking},
54 {ReactorMode::Immediate}
55 };
56
57 static std::ostream& operator<<(std::ostream& os, ReactorModeWrapper mode) {
58 if (mode.v == ReactorMode::Blocking) {
59 return os << "ReactorMode::Blocking";
60 } else {
61 return os << "ReactorMode::Immediate";
62 }
63 }
64}
65
67
69{
70 using namespace literals::chrono_literals;
71
72 Reactor r{mode.v, 1024};
74
76 const auto sub = r.subscribe(*socks.second, EpollIn, bind<&TestHandler::on_input>(h.get()));
77
78 const auto now = CyclTime::now();
79 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
80 BOOST_CHECK_EQUAL(h->matches, 0);
81
82 socks.first.send("foo", 4, 0);
83 socks.first.send("foo", 4, 0);
84 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
85 BOOST_CHECK_EQUAL(h->matches, 1);
86 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
87 BOOST_CHECK_EQUAL(h->matches, 2);
88
89 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
90 BOOST_CHECK_EQUAL(h->matches, 2);
91
92 socks.first.send("foo", 4, 0);
93 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
94 BOOST_CHECK_EQUAL(h->matches, 3);
95
96 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
97 BOOST_CHECK_EQUAL(h->matches, 3);
98}
99
101{
102 using namespace literals::chrono_literals;
103
104 Reactor r{mode.v, 1024};
105
107
108 std::vector<int> fd_process_order;
109 auto handler = [&](CyclTime, int fd, unsigned) {
110 char buf[5];
111 auto rcvd = os::recv(fd, buf, 5, 0);
113
114 std::string_view bsv(buf, buf+sizeof(buf));
115 BOOST_CHECK_EQUAL(bsv, "Hello");
116 fd_process_order.push_back(fd);
117 };
118
119 auto sub1 = r.subscribe(*first_sock, EpollIn, bind(&handler));
120 auto sub2 = r.subscribe(*second_sock, EpollIn, bind(&handler));
121
122 // test send data from first_sock --> second_sock
123 first_sock.send("Hello", 5, 0);
124 r.poll(CyclTime::now(), 0ms);
127 fd_process_order.clear();
128
129 // test send data from second_sock --> first_sock
130 second_sock.send("Hello", 5, 0);
131 r.poll(CyclTime::now(), 0ms);
134 fd_process_order.clear();
135
136 // send data to both sockets -- but from `first_sock` first
137 // Set priority of `first_sock` to be High.
138 // `second_sock` will receive data first, but because of High priority
139 // of `first_sock`, it will be processed first.
140 sub1.set_io_priority(Priority::High);
141 sub2.set_io_priority(Priority::Low);
142 first_sock.send("Hello", 5, 0);
143 std::this_thread::sleep_for(100ms);
144 second_sock.send("Hello", 5, 0);
145 r.poll(CyclTime::now(), 0ms);
149 fd_process_order.clear();
150
151 // Do same again, except switch priorities.
152 sub1.set_io_priority(Priority::Low);
153 sub2.set_io_priority(Priority::High);
154 first_sock.send("Hello", 5, 0);
155 std::this_thread::sleep_for(100ms);
156 second_sock.send("Hello", 5, 0);
157 r.poll(CyclTime::now(), 0ms);
161 fd_process_order.clear();
162}
163
165{
166 using namespace literals::chrono_literals;
167
168 Reactor r{mode.v, 1024};
170
171 // h0, h1 will be high priority
172 // l0, l1 will be low priority
173 auto [h0, h1] = socketpair(UnixStreamProtocol{});
174 auto [l0, l1] = socketpair(UnixStreamProtocol{});
175
176 auto send_data_to = [&](IoSock& sock) {
177 IoSock& corresponding = (sock == h0) ? h1
178 : (sock == h1) ? h0
179 : (sock == l0) ? l1
180 : l0;
181 corresponding.send("Hello", 5, 0);
182 };
183
184 // this will track entry/exit of handlers of each of the sockets
185 struct Audit {
186 enum { Entry, Exit } what;
187 int fd;
188 bool operator==(const Audit& o) const = default;
189 };
190 std::vector<Audit> audit_trail;
191
192 // each invocation will take ~100ms
193 auto spin_and_yield_periodically = [&r]() {
194 WallTime now = WallClock::now();
195 WallTime end = now + 100ms;
196 while (now < end) {
197 // wait for 10us, then yield
198 auto next_stop = now + 10us;
199 while (now < next_stop) {
200 now = WallClock::now();
201 }
202 r.yield();
203 };
204 };
205
206 // reads data from fd, then pretends to do work (by spinning),
207 // yielding periodically to the reactor.
208 auto high_handler = [&](CyclTime, int fd, unsigned) {
209 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
210 char buf[5];
211 auto rcvd = os::recv(fd, buf, 5, 0);
214 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
215 };
216
217 // reads data from fd, sends message to h0/h1, then pretends to do work (by spinning),
218 // yielding periodically to the reactor. In those yields, the messages sent to the
219 // high priority sockets should be processed.
220 auto low_handler = [&]<bool H0, bool H1>(CyclTime, int fd, unsigned) {
221 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
222
223 char buf[5];
224 auto rcvd = os::recv(fd, buf, 5, 0);
226
227 // send data to both high priority sockets
228 if constexpr (H0) {
230 }
231 if constexpr (H1) {
233 }
234
236 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
237 };
238
239 auto l0_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
240 low_handler.template operator()<true, true>(ct, fd, e);
241 };
242
243 auto l1_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
244 low_handler.template operator()<false, true>(ct, fd, e);
245 };
246
247 auto sub_l0 = r.subscribe(*l0, EpollIn, bind(&l0_handler));
248 auto sub_l1 = r.subscribe(*l1, EpollIn, bind(&l1_handler));
249 auto sub_h0 = r.subscribe(*h0, EpollIn, bind(&high_handler));
250 auto sub_h1 = r.subscribe(*h1, EpollIn, bind(&high_handler));
251
252 sub_h0.set_io_priority(Priority::High);
253 sub_h1.set_io_priority(Priority::High);
254
255 // start off with l0, l1, h0 having data to read
259
260 r.poll(CyclTime::now(), 0ms);
261
262 std::vector<Audit> valid_seq1 = {
263 // high priority always first
264 Audit{Audit::Entry, *h0},
265 Audit{Audit::Exit, *h0},
266
267 // AT THIS POINT l0/l1 handlers could be executed in any order
268 // Lets assume l0 chosen first [choices so far: (l0)]
269 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
270
271 // Reactor will poll on high priority sockets when l0 handler yields
272 // AT THIS POINT h0/h1 handlers could be executed in any order
273 // Lets assume h0 is chosen first [choices so far: (l0, h0)]
274 Audit{Audit::Entry, *h0},
275 Audit{Audit::Exit, *h0},
276 Audit{Audit::Entry, *h1},
277 Audit{Audit::Exit, *h1},
278
279 // control returned back to l0 handler
280 Audit{Audit::Exit, *l0},
281
282 // Now, l1 handler executes
283 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
284 Audit{Audit::Entry, *h1},
285 Audit{Audit::Exit, *h1},
286 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
287 };
288
289 std::vector<Audit> valid_seq2 = {
290 // high priority always first
291 Audit{Audit::Entry, *h0},
292 Audit{Audit::Exit, *h0},
293
294 // AT THIS POINT l0/l1 handlers could be executed in any order
295 // Lets assume l0 chosen first [choices so far: (l0)]
296 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
297
298 // Reactor will poll on high priority sockets when l0 handler yields
299 // AT THIS POINT h0/h1 handlers could be executed in any order
300 // Lets assume h1 chosen first [choices so far: (l0, h1)]
301 Audit{Audit::Entry, *h1},
302 Audit{Audit::Exit, *h1},
303 Audit{Audit::Entry, *h0},
304 Audit{Audit::Exit, *h0},
305
306 // control returned back to l0 handler
307 Audit{Audit::Exit, *l0},
308
309 // Now, l1 handler executes
310 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
311 Audit{Audit::Entry, *h1},
312 Audit{Audit::Exit, *h1},
313 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
314 };
315
316 std::vector<Audit> valid_seq3 = {
317 // high priority always first
318 Audit{Audit::Entry, *h0},
319 Audit{Audit::Exit, *h0},
320
321 // AT THIS POINT l0/l1 handlers could be executed in any order
322 // Lets assume l0 chosen first [choices so far: (l1)]
323 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
324 Audit{Audit::Entry, *h1},
325 Audit{Audit::Exit, *h1},
326 Audit{Audit::Exit, *l1},
327
328 // Now, l0 handler executes
329 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
330 // Reactor will poll on high priority sockets when l0 handler yields
331 // AT THIS POINT h0/h1 handlers could be executed in any order
332 // Lets assume h0 chosen first [choices so far: (l1, h0)]
333 Audit{Audit::Entry, *h0},
334 Audit{Audit::Exit, *h0},
335 Audit{Audit::Entry, *h1},
336 Audit{Audit::Exit, *h1},
337
338 // control returned back to l0 handler
339 Audit{Audit::Exit, *l0},
340 };
341
342 std::vector<Audit> valid_seq4 = {
343 // high priority always first
344 Audit{Audit::Entry, *h0},
345 Audit{Audit::Exit, *h0},
346
347 // AT THIS POINT l0/l1 handlers could be executed in any order
348 // Lets assume l0 chosen first [choices so far: (l1)]
349 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
350 Audit{Audit::Entry, *h1},
351 Audit{Audit::Exit, *h1},
352 Audit{Audit::Exit, *l1},
353
354 // Now, l0 handler executes
355 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
356 // Reactor will poll on high priority sockets when l0 handler yields
357 // AT THIS POINT h0/h1 handlers could be executed in any order
358 // Lets assume h1 chosen first [choices so far: (l1, h1)]
359 Audit{Audit::Entry, *h1},
360 Audit{Audit::Exit, *h1},
361 Audit{Audit::Entry, *h0},
362 Audit{Audit::Exit, *h0},
363
364 // control returned back to l0 handler
365 Audit{Audit::Exit, *l0},
366 };
367
369 || (audit_trail == valid_seq2)
370 || (audit_trail == valid_seq3)
371 || (audit_trail == valid_seq4);
373}
374
376{
377 using namespace literals::chrono_literals;
378
379 Reactor r{mode.v, 1024};
381
383 auto sub = r.subscribe(*socks.second, EpollIn | EpollEt, bind<&TestHandler::on_input>(h.get()));
384
385 const auto now = CyclTime::now();
386 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
387 BOOST_CHECK_EQUAL(h->matches, 0);
388
389 socks.first.send("foo", 4, 0);
390 socks.first.send("foo", 4, 0);
391 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
392 BOOST_CHECK_EQUAL(h->matches, 1);
393
394 // No notification for second message.
395 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
396 BOOST_CHECK_EQUAL(h->matches, 1);
397
398 // Revert to level-triggered.
399 sub.set_events(EpollIn);
400 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
401 BOOST_CHECK_EQUAL(h->matches, 2);
402
403 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
404 BOOST_CHECK_EQUAL(h->matches, 2);
405
406 socks.first.send("foo", 4, 0);
407 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
408 BOOST_CHECK_EQUAL(h->matches, 3);
409
410 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
411 BOOST_CHECK_EQUAL(h->matches, 3);
412}
413
415{
416 int i{0};
417 auto fn = [&i](CyclTime) { ++i; };
418
419 Reactor r{mode.v, 1024};
420
421 Hook h{bind(&fn)};
422 r.add_hook(h);
423
424 BOOST_CHECK_EQUAL(r.poll(CyclTime::now(), 0ms), 0);
426}
427
429{
430 Reactor r{mode.v, 1024};
431
432 struct counter {
433 int invocation_count{0};
434 void operator()(CyclTime, Timer&) { ++invocation_count; }
435 };
436 counter lpc;
437 counter hpc;
438
439 auto now = CyclTime::now();
440
441 // schedule 2 low priority timers for immediate execution
442 Timer lpts[] = {
443 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
444 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
445 };
446
447 Timer hpt;
448
449 int num_of_times_polled = 0;
450 const auto end = now.mono_time() + 95ms;
451
452 // using 95ms instead of 100ms, because Reactor::poll internally
453 // uses own CyclTime, not the one we pass to it.
454
455 while (now.mono_time() < end) {
456 // schedule a high priority timer for immediate execution
457 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
458
459 // low priority timers won't be executed because it will be a busy cycle
460 // due to high priority timer that is due execution.
461 now = CyclTime::now();
462 r.poll(now, 0s);
463
464 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
465 BOOST_CHECK_EQUAL(lpc.invocation_count, 0);
466 }
467
468 std::this_thread::sleep_for(10ms);
469
470 // at this point, both low priority timers are delayed by >100ms.
471 // they will now activately executed (one per cycle) even if cycle is busy.
472 for (int i = 0; i < 2; i++) {
473 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
474
475 now = CyclTime::now();
476 r.poll(now, 0s);
477
478 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
479 BOOST_CHECK_EQUAL(lpc.invocation_count, i+1);
480 }
481}
482
BOOST_CHECK_EQUAL(v.size(), 10U)
BOOST_DATA_TEST_CASE(ReactorLevelCase, param::reactor_modes, mode)
void set_high_priority_poll_threshold(Micros thresh)
Definition Reactor.hpp:189
Base class for atomic referenced counted objects.
Definition RefCount.hpp:61
STL namespace.
StreamT & operator<<(StreamT &os, PutPercentiles pp)
Definition Utility.hpp:68
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
constexpr bool operator==(const BasicHandle< PolicyT > &lhs, const BasicHandle< PolicyT > &rhs)
Definition Handle.hpp:91
std::pair< IoSock, IoSock > socketpair(ProtocolT protocol)
Definition IoSock.hpp:100
ssize_t recv(int sockfd, void *buf, std::size_t len, int flags, std::error_code &ec) noexcept
Receive a message from a socket.
Definition Socket.hpp:355
WallClock::time_point WallTime
Definition Time.hpp:112
constexpr auto bind() noexcept
Definition Slot.hpp:97