Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.ut.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
22
23#include <boost/test/unit_test.hpp>
24#include <thread>
25#include <string_view>
26
27using namespace std;
28using namespace toolbox;
29
30namespace {
31
32struct TestHandler : RefCount<TestHandler, ThreadUnsafePolicy> {
33 void on_input(CyclTime /*now*/, int fd, unsigned /*events*/)
34 {
35 char buf[4];
36 os::recv(fd, buf, 4, 0);
37 if (strcmp(buf, "foo") == 0) {
38 ++matches;
39 }
40 }
41 int matches{};
42};
43
44} // namespace
45
47
49{
50 using namespace literals::chrono_literals;
51
52 Reactor r{1024};
54
56 const auto sub = r.subscribe(*socks.second, EpollIn, bind<&TestHandler::on_input>(h.get()));
57
58 const auto now = CyclTime::now();
59 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
60 BOOST_CHECK_EQUAL(h->matches, 0);
61
62 socks.first.send("foo", 4, 0);
63 socks.first.send("foo", 4, 0);
64 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
65 BOOST_CHECK_EQUAL(h->matches, 1);
66 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
67 BOOST_CHECK_EQUAL(h->matches, 2);
68
69 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
70 BOOST_CHECK_EQUAL(h->matches, 2);
71
72 socks.first.send("foo", 4, 0);
73 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
74 BOOST_CHECK_EQUAL(h->matches, 3);
75
76 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
77 BOOST_CHECK_EQUAL(h->matches, 3);
78}
79
81{
82 using namespace literals::chrono_literals;
83
84 Reactor r{1024};
85
87
88 std::vector<int> fd_process_order;
89 auto handler = [&](CyclTime, int fd, unsigned) {
90 char buf[5];
91 auto rcvd = os::recv(fd, buf, 5, 0);
93
94 std::string_view bsv(buf, buf+sizeof(buf));
95 BOOST_CHECK_EQUAL(bsv, "Hello");
96 fd_process_order.push_back(fd);
97 };
98
99 auto sub1 = r.subscribe(*first_sock, EpollIn, bind(&handler));
100 auto sub2 = r.subscribe(*second_sock, EpollIn, bind(&handler));
101
102 // test send data from first_sock --> second_sock
103 first_sock.send("Hello", 5, 0);
104 r.poll(CyclTime::now(), 0ms);
107 fd_process_order.clear();
108
109 // test send data from second_sock --> first_sock
110 second_sock.send("Hello", 5, 0);
111 r.poll(CyclTime::now(), 0ms);
114 fd_process_order.clear();
115
116 // send data to both sockets -- but from `first_sock` first
117 // Set priority of `first_sock` to be High.
118 // `second_sock` will receive data first, but because of High priority
119 // of `first_sock`, it will be processed first.
120 sub1.set_io_priority(Priority::High);
121 sub2.set_io_priority(Priority::Low);
122 first_sock.send("Hello", 5, 0);
123 std::this_thread::sleep_for(100ms);
124 second_sock.send("Hello", 5, 0);
125 r.poll(CyclTime::now(), 0ms);
129 fd_process_order.clear();
130
131 // Do same again, except switch priorities.
132 sub1.set_io_priority(Priority::Low);
133 sub2.set_io_priority(Priority::High);
134 first_sock.send("Hello", 5, 0);
135 std::this_thread::sleep_for(100ms);
136 second_sock.send("Hello", 5, 0);
137 r.poll(CyclTime::now(), 0ms);
141 fd_process_order.clear();
142}
143
145{
146 using namespace literals::chrono_literals;
147
148 Reactor r{1024};
150
151 // h0, h1 will be high priority
152 // l0, l1 will be low priority
153 auto [h0, h1] = socketpair(UnixStreamProtocol{});
154 auto [l0, l1] = socketpair(UnixStreamProtocol{});
155
156 auto send_data_to = [&](IoSock& sock) {
157 IoSock& corresponding = (sock == h0) ? h1
158 : (sock == h1) ? h0
159 : (sock == l0) ? l1
160 : l0;
161 corresponding.send("Hello", 5, 0);
162 };
163
164 // this will track entry/exit of handlers of each of the sockets
165 struct Audit {
166 enum { Entry, Exit } what;
167 int fd;
168 bool operator==(const Audit& o) const = default;
169 };
170 std::vector<Audit> audit_trail;
171
172 // each invocation will take ~100ms
173 auto spin_and_yield_periodically = [&r]() {
174 WallTime now = WallClock::now();
175 WallTime end = now + 100ms;
176 while (now < end) {
177 // wait for 10us, then yield
178 auto next_stop = now + 10us;
179 while (now < next_stop) {
180 now = WallClock::now();
181 }
182 r.yield();
183 };
184 };
185
186 // reads data from fd, then pretends to do work (by spinning),
187 // yielding periodically to the reactor.
188 auto high_handler = [&](CyclTime, int fd, unsigned) {
189 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
190 char buf[5];
191 auto rcvd = os::recv(fd, buf, 5, 0);
194 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
195 };
196
197 // reads data from fd, sends message to h0/h1, then pretends to do work (by spinning),
198 // yielding periodically to the reactor. In those yields, the messages sent to the
199 // high priority sockets should be processed.
200 auto low_handler = [&]<bool H0, bool H1>(CyclTime, int fd, unsigned) {
201 audit_trail.push_back({.what = Audit::Entry, .fd = fd});
202
203 char buf[5];
204 auto rcvd = os::recv(fd, buf, 5, 0);
206
207 // send data to both high priority sockets
208 if constexpr (H0) {
210 }
211 if constexpr (H1) {
213 }
214
216 audit_trail.push_back({.what = Audit::Exit, .fd = fd});
217 };
218
219 auto l0_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
220 low_handler.template operator()<true, true>(ct, fd, e);
221 };
222
223 auto l1_handler = [&low_handler](CyclTime ct, int fd, unsigned int e) {
224 low_handler.template operator()<false, true>(ct, fd, e);
225 };
226
227 auto sub_l0 = r.subscribe(*l0, EpollIn, bind(&l0_handler));
228 auto sub_l1 = r.subscribe(*l1, EpollIn, bind(&l1_handler));
229 auto sub_h0 = r.subscribe(*h0, EpollIn, bind(&high_handler));
230 auto sub_h1 = r.subscribe(*h1, EpollIn, bind(&high_handler));
231
232 sub_h0.set_io_priority(Priority::High);
233 sub_h1.set_io_priority(Priority::High);
234
235 // start off with l0, l1, h0 having data to read
239
240 r.poll(CyclTime::now(), 0ms);
241
242 std::vector<Audit> valid_seq1 = {
243 // high priority always first
244 Audit{Audit::Entry, *h0},
245 Audit{Audit::Exit, *h0},
246
247 // AT THIS POINT l0/l1 handlers could be executed in any order
248 // Lets assume l0 chosen first [choices so far: (l0)]
249 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
250
251 // Reactor will poll on high priority sockets when l0 handler yields
252 // AT THIS POINT h0/h1 handlers could be executed in any order
253 // Lets assume h0 is chosen first [choices so far: (l0, h0)]
254 Audit{Audit::Entry, *h0},
255 Audit{Audit::Exit, *h0},
256 Audit{Audit::Entry, *h1},
257 Audit{Audit::Exit, *h1},
258
259 // control returned back to l0 handler
260 Audit{Audit::Exit, *l0},
261
262 // Now, l1 handler executes
263 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
264 Audit{Audit::Entry, *h1},
265 Audit{Audit::Exit, *h1},
266 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
267 };
268
269 std::vector<Audit> valid_seq2 = {
270 // high priority always first
271 Audit{Audit::Entry, *h0},
272 Audit{Audit::Exit, *h0},
273
274 // AT THIS POINT l0/l1 handlers could be executed in any order
275 // Lets assume l0 chosen first [choices so far: (l0)]
276 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
277
278 // Reactor will poll on high priority sockets when l0 handler yields
279 // AT THIS POINT h0/h1 handlers could be executed in any order
280 // Lets assume h1 chosen first [choices so far: (l0, h1)]
281 Audit{Audit::Entry, *h1},
282 Audit{Audit::Exit, *h1},
283 Audit{Audit::Entry, *h0},
284 Audit{Audit::Exit, *h0},
285
286 // control returned back to l0 handler
287 Audit{Audit::Exit, *l0},
288
289 // Now, l1 handler executes
290 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
291 Audit{Audit::Entry, *h1},
292 Audit{Audit::Exit, *h1},
293 Audit{Audit::Exit, *l1}, // l1 handler sends data to h1
294 };
295
296 std::vector<Audit> valid_seq3 = {
297 // high priority always first
298 Audit{Audit::Entry, *h0},
299 Audit{Audit::Exit, *h0},
300
301 // AT THIS POINT l0/l1 handlers could be executed in any order
302 // Lets assume l0 chosen first [choices so far: (l1)]
303 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
304 Audit{Audit::Entry, *h1},
305 Audit{Audit::Exit, *h1},
306 Audit{Audit::Exit, *l1},
307
308 // Now, l0 handler executes
309 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
310 // Reactor will poll on high priority sockets when l0 handler yields
311 // AT THIS POINT h0/h1 handlers could be executed in any order
312 // Lets assume h0 chosen first [choices so far: (l1, h0)]
313 Audit{Audit::Entry, *h0},
314 Audit{Audit::Exit, *h0},
315 Audit{Audit::Entry, *h1},
316 Audit{Audit::Exit, *h1},
317
318 // control returned back to l0 handler
319 Audit{Audit::Exit, *l0},
320 };
321
322 std::vector<Audit> valid_seq4 = {
323 // high priority always first
324 Audit{Audit::Entry, *h0},
325 Audit{Audit::Exit, *h0},
326
327 // AT THIS POINT l0/l1 handlers could be executed in any order
328 // Lets assume l0 chosen first [choices so far: (l1)]
329 Audit{Audit::Entry, *l1}, // l1 handler sends data to h1
330 Audit{Audit::Entry, *h1},
331 Audit{Audit::Exit, *h1},
332 Audit{Audit::Exit, *l1},
333
334 // Now, l0 handler executes
335 Audit{Audit::Entry, *l0}, // l0 handler sends data to h0 and h1
336 // Reactor will poll on high priority sockets when l0 handler yields
337 // AT THIS POINT h0/h1 handlers could be executed in any order
338 // Lets assume h1 chosen first [choices so far: (l1, h1)]
339 Audit{Audit::Entry, *h1},
340 Audit{Audit::Exit, *h1},
341 Audit{Audit::Entry, *h0},
342 Audit{Audit::Exit, *h0},
343
344 // control returned back to l0 handler
345 Audit{Audit::Exit, *l0},
346 };
347
349 || (audit_trail == valid_seq2)
350 || (audit_trail == valid_seq3)
351 || (audit_trail == valid_seq4);
353}
354
356{
357 using namespace literals::chrono_literals;
358
359 Reactor r{1024};
361
363 auto sub = r.subscribe(*socks.second, EpollIn | EpollEt, bind<&TestHandler::on_input>(h.get()));
364
365 const auto now = CyclTime::now();
366 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
367 BOOST_CHECK_EQUAL(h->matches, 0);
368
369 socks.first.send("foo", 4, 0);
370 socks.first.send("foo", 4, 0);
371 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
372 BOOST_CHECK_EQUAL(h->matches, 1);
373
374 // No notification for second message.
375 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
376 BOOST_CHECK_EQUAL(h->matches, 1);
377
378 // Revert to level-triggered.
379 sub.set_events(EpollIn);
380 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
381 BOOST_CHECK_EQUAL(h->matches, 2);
382
383 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
384 BOOST_CHECK_EQUAL(h->matches, 2);
385
386 socks.first.send("foo", 4, 0);
387 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
388 BOOST_CHECK_EQUAL(h->matches, 3);
389
390 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
391 BOOST_CHECK_EQUAL(h->matches, 3);
392}
393
395{
396 int i{0};
397 auto fn = [&i](CyclTime) { ++i; };
398
399 Reactor r{1024};
400
401 Hook h{bind(&fn)};
402 r.add_hook(h);
403
404 BOOST_CHECK_EQUAL(r.poll(CyclTime::now(), 0ms), 0);
406}
407
409{
410 Reactor r{1024};
411
412 struct counter {
413 int invocation_count{0};
414 void operator()(CyclTime, Timer&) { ++invocation_count; }
415 };
416 counter lpc;
417 counter hpc;
418
419 auto now = CyclTime::now();
420
421 // schedule 2 low priority timers for immediate execution
422 Timer lpts[] = {
423 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
424 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
425 };
426
427 Timer hpt;
428
429 int num_of_times_polled = 0;
430 const auto end = now.mono_time() + 95ms;
431
432 // using 95ms instead of 100ms, because Reactor::poll internally
433 // uses own CyclTime, not the one we pass to it.
434
435 while (now.mono_time() < end) {
436
437 // schedule a high priority timer for immediate execution
438 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
439
440 // low priority timers won't be executed because it will be a busy cycle
441 // due to high priority timer that is due execution.
442 r.poll(now, 0s);
443
444 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
445 BOOST_CHECK_EQUAL(lpc.invocation_count, 0);
446
447 now = CyclTime::now();
448 }
449
450 std::this_thread::sleep_for(10ms);
451
452 // at this point, both low priority timers are delayed by >100ms.
453 // they will now activately executed (one per cycle) even if cycle is busy.
454 for (int i = 0; i < 2; i++) {
455 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
456
457 r.poll(now, 0s);
458
459 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
460 BOOST_CHECK_EQUAL(lpc.invocation_count, i+1);
461
462 now = CyclTime::now();
463 }
464}
465
BOOST_CHECK_EQUAL(v.size(), 10U)
BOOST_AUTO_TEST_CASE(ReactorLevelCase)
void set_high_priority_poll_threshold(Micros thresh)
Definition Reactor.hpp:176
Base class for atomic referenced counted objects.
Definition RefCount.hpp:61
STL namespace.
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
constexpr bool operator==(const BasicHandle< PolicyT > &lhs, const BasicHandle< PolicyT > &rhs)
Definition Handle.hpp:91
std::pair< IoSock, IoSock > socketpair(ProtocolT protocol)
Definition IoSock.hpp:90
ssize_t recv(int sockfd, void *buf, std::size_t len, int flags, std::error_code &ec) noexcept
Receive a message from a socket.
Definition Socket.hpp:354
WallClock::time_point WallTime
Definition Time.hpp:112
constexpr auto bind() noexcept
Definition Slot.hpp:97