Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.ut.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
22
23#include <boost/test/unit_test.hpp>
24#include <thread>
25#include <string_view>
26
27using namespace std;
28using namespace toolbox;
29
30namespace {
31
32struct TestHandler : RefCount<TestHandler, ThreadUnsafePolicy> {
33 void on_input(CyclTime /*now*/, int fd, unsigned /*events*/)
34 {
35 char buf[4];
36 os::recv(fd, buf, 4, 0);
37 if (strcmp(buf, "foo") == 0) {
38 ++matches;
39 }
40 }
41 int matches{};
42};
43
44} // namespace
45
47
49{
50 using namespace literals::chrono_literals;
51
52 Reactor r{1024};
54
56 const auto sub = r.subscribe(*socks.second, EpollIn, bind<&TestHandler::on_input>(h.get()));
57
58 const auto now = CyclTime::now();
59 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
60 BOOST_CHECK_EQUAL(h->matches, 0);
61
62 socks.first.send("foo", 4, 0);
63 socks.first.send("foo", 4, 0);
64 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
65 BOOST_CHECK_EQUAL(h->matches, 1);
66 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
67 BOOST_CHECK_EQUAL(h->matches, 2);
68
69 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
70 BOOST_CHECK_EQUAL(h->matches, 2);
71
72 socks.first.send("foo", 4, 0);
73 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
74 BOOST_CHECK_EQUAL(h->matches, 3);
75
76 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
77 BOOST_CHECK_EQUAL(h->matches, 3);
78}
79
81{
82 using namespace literals::chrono_literals;
83
84 Reactor r{1024};
85
87
88 std::vector<int> fd_process_order;
89 auto handler = [&](CyclTime, int fd, unsigned) {
90 char buf[5];
91 auto rcvd = os::recv(fd, buf, 5, 0);
93
94 std::string_view bsv(buf, buf+sizeof(buf));
95 BOOST_CHECK_EQUAL(bsv, "Hello");
96 fd_process_order.push_back(fd);
97 };
98
99 auto sub1 = r.subscribe(*first_sock, EpollIn, bind(&handler));
100 auto sub2 = r.subscribe(*second_sock, EpollIn, bind(&handler));
101
102 // test send data from first_sock --> second_sock
103 first_sock.send("Hello", 5, 0);
104 r.poll(CyclTime::now(), 0ms);
107 fd_process_order.clear();
108
109 // test send data from second_sock --> first_sock
110 second_sock.send("Hello", 5, 0);
111 r.poll(CyclTime::now(), 0ms);
114 fd_process_order.clear();
115
116 // send data to both sockets -- but from `first_sock` first
117 // Set priority of `first_sock` to be High.
118 // `second_sock` will receive data first, but because of High priority
119 // of `first_sock`, it will be processed first.
120 sub1.set_io_priority(Priority::High);
121 sub2.set_io_priority(Priority::Low);
122 first_sock.send("Hello", 5, 0);
123 std::this_thread::sleep_for(100ms);
124 second_sock.send("Hello", 5, 0);
125 r.poll(CyclTime::now(), 0ms);
129 fd_process_order.clear();
130
131 // Do same again, except switch priorities.
132 sub1.set_io_priority(Priority::Low);
133 sub2.set_io_priority(Priority::High);
134 first_sock.send("Hello", 5, 0);
135 std::this_thread::sleep_for(100ms);
136 second_sock.send("Hello", 5, 0);
137 r.poll(CyclTime::now(), 0ms);
141 fd_process_order.clear();
142}
143
145{
146 Reactor r{1024};
148
149 // install high priority user hook
150 std::size_t hook_invocation_count = 0;
151 auto high_prio_hook = [&](CyclTime) { hook_invocation_count++; return 0; };
152 r.set_user_high_priority_hook(bind(&high_prio_hook));
153
154 auto [s1, s2] = socketpair(UnixStreamProtocol{});
155
156 // This is handler for data on s1. It is an expensive function taking ~100ms
157 std::size_t yield_count = 0;
158 auto on_data_received = [&](CyclTime, int, unsigned) {
159 WallTime now = WallClock::now();
160 WallTime end = now + 500ms;
161 while (now < end) {
162 // wait for 100us, then yield
163 auto next_stop = now + 1ms;
164 while (now < next_stop) {
165 now = WallClock::now();
166 }
167 r.yield();
168 yield_count++;
169 };
170 };
171
172 // data to s1 will be sent => on_data_received will be invoked by the reactor.
173 // every 100us on_data_received will yield to the reactor, at which point the reactor
174 // should invoke our custom high priority hook.
175 auto handle = r.subscribe(*s1, EpollIn, bind(&on_data_received));
176
177 s2.send("Hello", 5, 0);
178 r.poll(CyclTime::now(), 0ms);
179
182}
183
185{
186 using namespace literals::chrono_literals;
187
188 Reactor r{1024};
190
191 // h0, h1 will be high priority
192 // l0, l1 will be low priority
193 auto [h0, h1] = socketpair(UnixStreamProtocol{});
194 auto [l0, l1] = socketpair(UnixStreamProtocol{});
195
196 auto send_data_to = [&](IoSock& sock) {
197 IoSock& corresponding = (sock == h0) ? h1
198 : (sock == h1) ? h0
199 : (sock == l0) ? l1
200 : l0;
201 corresponding.send("Hello", 5, 0);
202 };
203
204 // this will track entry/exit of handlers of each of the sockets
205 struct Audit {
206 enum { Entry, Exit } what;
207 int fd;
208 bool operator==(const Audit& o) const = default;
209 };
210 std::vector<Audit> audit_trail;
211
212 // each invocation will take ~100ms
213 auto spin_and_yield_periodically = [&r]() {
214 WallTime now = WallClock::now();
215 WallTime end = now + 100ms;
216 while (now < end) {
217 // wait for 10us, then yield
218 auto next_stop = now + 10us;
219 while (now < next_stop) {
220 now = WallClock::now();
221 }
222 r.yield();
223 };
224 };
225
226 // reads data from fd, then pretends to do work (by spinning),
227 // yielding periodically to the reactor.
228 auto high_handler = [&](CyclTime, int fd, unsigned) {
229 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
230 char buf[5];
231 auto rcvd = os::recv(fd, buf, 5, 0);
234 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
235 };
236
237 // reads data from fd, sends message to h0/h1, then pretends to do work (by spinning),
238 // yielding periodically to the reactor. In those yields, the messages sent to the
239 // high priority sockets should be processed.
240 auto low_handler = [&]<bool H0, bool H1>(CyclTime, int fd, unsigned) {
241 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
242
243 char buf[5];
244 auto rcvd = os::recv(fd, buf, 5, 0);
246
247 // send data to both high priority sockets
248 if constexpr (H0) {
250 }
251 if constexpr (H1) {
253 }
254
256 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
257 };
258
259 auto l0_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
260 low_handler.template operator()<true, true>(ct, fd, e);
261 };
262
263 auto l1_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
264 low_handler.template operator()<false, true>(ct, fd, e);
265 };
266
267 auto sub_l0 = r.subscribe(*l0, EpollIn, bind(&l0_handler));
268 auto sub_l1 = r.subscribe(*l1, EpollIn, bind(&l1_handler));
269 auto sub_h0 = r.subscribe(*h0, EpollIn, bind(&high_handler));
270 auto sub_h1 = r.subscribe(*h1, EpollIn, bind(&high_handler));
271
272 sub_h0.set_io_priority(Priority::High);
273 sub_h1.set_io_priority(Priority::High);
274
275 // start off with l0, l1, h0 having data to read
279
280 r.poll(CyclTime::now(), 0ms);
281
282 std::vector<Audit> valid_seq1 = {
283 // high priority always first
284 Audit{Audit::Entry, *h0},
285 Audit{Audit::Exit, *h0},
286
287 // AT THIS POINT l0/l1 handlers could be executed in any order
288 // Lets assume l0 chosen first [choices so far: (l0)]
289 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
290
291 // Reactor will poll on high priority sockets when l0 handler yields
292 // AT THIS POINT h0/h1 handlers could be executed in any order
293 // Lets assume h0 is chosen first [choices so far: (l0, h0)]
294 Audit{Audit::Entry, *h0},
295 Audit{Audit::Exit, *h0},
296 Audit{Audit::Entry, *h1},
297 Audit{Audit::Exit, *h1},
298
299 // control returned back to l0 handler
300 Audit{Audit::Exit, *l0},
301
302 // Now, l1 handler executes
303 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
304 Audit{Audit::Entry, *h1},
305 Audit{Audit::Exit, *h1},
306 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
307 };
308
309 std::vector<Audit> valid_seq2 = {
310 // high priority always first
311 Audit{Audit::Entry, *h0},
312 Audit{Audit::Exit, *h0},
313
314 // AT THIS POINT l0/l1 handlers could be executed in any order
315 // Lets assume l0 chosen first [choices so far: (l0)]
316 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
317
318 // Reactor will poll on high priority sockets when l0 handler yields
319 // AT THIS POINT h0/h1 handlers could be executed in any order
320 // Lets assume h1 chosen first [choices so far: (l0, h1)]
321 Audit{Audit::Entry, *h1},
322 Audit{Audit::Exit, *h1},
323 Audit{Audit::Entry, *h0},
324 Audit{Audit::Exit, *h0},
325
326 // control returned back to l0 handler
327 Audit{Audit::Exit, *l0},
328
329 // Now, l1 handler executes
330 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
331 Audit{Audit::Entry, *h1},
332 Audit{Audit::Exit, *h1},
333 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
334 };
335
336 std::vector<Audit> valid_seq3 = {
337 // high priority always first
338 Audit{Audit::Entry, *h0},
339 Audit{Audit::Exit, *h0},
340
341 // AT THIS POINT l0/l1 handlers could be executed in any order
342 // Lets assume l0 chosen first [choices so far: (l1)]
343 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
344 Audit{Audit::Entry, *h1},
345 Audit{Audit::Exit, *h1},
346 Audit{Audit::Exit, *l1},
347
348 // Now, l0 handler executes
349 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
350 // Reactor will poll on high priority sockets when l0 handler yields
351 // AT THIS POINT h0/h1 handlers could be executed in any order
352 // Lets assume h0 chosen first [choices so far: (l1, h0)]
353 Audit{Audit::Entry, *h0},
354 Audit{Audit::Exit, *h0},
355 Audit{Audit::Entry, *h1},
356 Audit{Audit::Exit, *h1},
357
358 // control returned back to l0 handler
359 Audit{Audit::Exit, *l0},
360 };
361
362 std::vector<Audit> valid_seq4 = {
363 // high priority always first
364 Audit{Audit::Entry, *h0},
365 Audit{Audit::Exit, *h0},
366
367 // AT THIS POINT l0/l1 handlers could be executed in any order
368 // Lets assume l0 chosen first [choices so far: (l1)]
369 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
370 Audit{Audit::Entry, *h1},
371 Audit{Audit::Exit, *h1},
372 Audit{Audit::Exit, *l1},
373
374 // Now, l0 handler executes
375 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
376 // Reactor will poll on high priority sockets when l0 handler yields
377 // AT THIS POINT h0/h1 handlers could be executed in any order
378 // Lets assume h1 chosen first [choices so far: (l1, h1)]
379 Audit{Audit::Entry, *h1},
380 Audit{Audit::Exit, *h1},
381 Audit{Audit::Entry, *h0},
382 Audit{Audit::Exit, *h0},
383
384 // control returned back to l0 handler
385 Audit{Audit::Exit, *l0},
386 };
387
389 || (audit_trail == valid_seq2)
390 || (audit_trail == valid_seq3)
391 || (audit_trail == valid_seq4);
393}
394
396{
397 using namespace literals::chrono_literals;
398
399 Reactor r{1024};
401
403 auto sub = r.subscribe(*socks.second, EpollIn | EpollEt, bind<&TestHandler::on_input>(h.get()));
404
405 const auto now = CyclTime::now();
406 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
407 BOOST_CHECK_EQUAL(h->matches, 0);
408
409 socks.first.send("foo", 4, 0);
410 socks.first.send("foo", 4, 0);
411 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
412 BOOST_CHECK_EQUAL(h->matches, 1);
413
414 // No notification for second message.
415 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
416 BOOST_CHECK_EQUAL(h->matches, 1);
417
418 // Revert to level-triggered.
419 sub.set_events(EpollIn);
420 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
421 BOOST_CHECK_EQUAL(h->matches, 2);
422
423 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
424 BOOST_CHECK_EQUAL(h->matches, 2);
425
426 socks.first.send("foo", 4, 0);
427 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
428 BOOST_CHECK_EQUAL(h->matches, 3);
429
430 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
431 BOOST_CHECK_EQUAL(h->matches, 3);
432}
433
435{
436 int i{0};
437 auto fn = [&i](CyclTime) { ++i; };
438
439 Reactor r{1024};
440
441 Hook h{bind(&fn)};
442 r.add_hook(h);
443
444 BOOST_CHECK_EQUAL(r.poll(CyclTime::now(), 0ms), 0);
446}
447
449{
450 Reactor r{1024};
451
452 struct counter {
453 int invocation_count{0};
454 void operator()(CyclTime, Timer&) { ++invocation_count; }
455 };
456 counter lpc;
457 counter hpc;
458
459 auto now = CyclTime::now();
460
461 // schedule 2 low priority timers for immediate execution
462 Timer lpts[] = {
463 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
464 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
465 };
466
467 Timer hpt;
468
469 int num_of_times_polled = 0;
470 const auto end = now.mono_time() + 95ms;
471
472 // using 95ms instead of 100ms, because Reactor::poll internally
473 // uses own CyclTime, not the one we pass to it.
474
475 while (now.mono_time() < end) {
476
477 // schedule a high priority timer for immediate execution
478 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
479
480 // low priority timers won't be executed because it will be a busy cycle
481 // due to high priority timer that is due execution.
482 r.poll(now, 0s);
483
484 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
485 BOOST_CHECK_EQUAL(lpc.invocation_count, 0);
486
487 now = CyclTime::now();
488 }
489
490 std::this_thread::sleep_for(10ms);
491
492 // at this point, both low priority timers are delayed by >100ms.
493 // they will now activately executed (one per cycle) even if cycle is busy.
494 for (int i = 0; i < 2; i++) {
495 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
496
497 r.poll(now, 0s);
498
499 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
500 BOOST_CHECK_EQUAL(lpc.invocation_count, i+1);
501
502 now = CyclTime::now();
503 }
504}
505
BOOST_CHECK_EQUAL(v.size(), 10U)
BOOST_AUTO_TEST_CASE(ReactorLevelCase)
void set_user_hook_poll_threshold(Micros thresh)
Definition Reactor.hpp:181
void set_high_priority_poll_threshold(Micros thresh)
Definition Reactor.hpp:178
Base class for atomic referenced counted objects.
Definition RefCount.hpp:61
STL namespace.
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
constexpr bool operator==(const BasicHandle< PolicyT > &lhs, const BasicHandle< PolicyT > &rhs)
Definition Handle.hpp:91
std::pair< IoSock, IoSock > socketpair(ProtocolT protocol)
Definition IoSock.hpp:100
ssize_t recv(int sockfd, void *buf, std::size_t len, int flags, std::error_code &ec) noexcept
Receive a message from a socket.
Definition Socket.hpp:355
WallClock::time_point WallTime
Definition Time.hpp:112
constexpr auto bind() noexcept
Definition Slot.hpp:97