Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Reactor.ut.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Reactor.hpp"
18
22
23#include <boost/test/unit_test.hpp>
24#include <thread>
25
26using namespace std;
27using namespace toolbox;
28
29namespace {
30
31struct TestHandler : RefCount<TestHandler, ThreadUnsafePolicy> {
32 void on_input(CyclTime /*now*/, int fd, unsigned /*events*/)
33 {
34 char buf[4];
35 os::recv(fd, buf, 4, 0);
36 if (strcmp(buf, "foo") == 0) {
37 ++matches;
38 }
39 }
40 int matches{};
41};
42
43} // namespace
44
46
48{
49 using namespace literals::chrono_literals;
50
51 Reactor r{1024};
53
55 const auto sub = r.subscribe(*socks.second, EpollIn, bind<&TestHandler::on_input>(h.get()));
56
57 const auto now = CyclTime::now();
58 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
59 BOOST_CHECK_EQUAL(h->matches, 0);
60
61 socks.first.send("foo", 4, 0);
62 socks.first.send("foo", 4, 0);
63 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
64 BOOST_CHECK_EQUAL(h->matches, 1);
65 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
66 BOOST_CHECK_EQUAL(h->matches, 2);
67
68 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
69 BOOST_CHECK_EQUAL(h->matches, 2);
70
71 socks.first.send("foo", 4, 0);
72 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
73 BOOST_CHECK_EQUAL(h->matches, 3);
74
75 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
76 BOOST_CHECK_EQUAL(h->matches, 3);
77}
78
80{
81 using namespace literals::chrono_literals;
82
83 Reactor r{1024};
85
87 auto sub = r.subscribe(*socks.second, EpollIn | EpollEt, bind<&TestHandler::on_input>(h.get()));
88
89 const auto now = CyclTime::now();
90 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
91 BOOST_CHECK_EQUAL(h->matches, 0);
92
93 socks.first.send("foo", 4, 0);
94 socks.first.send("foo", 4, 0);
95 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
96 BOOST_CHECK_EQUAL(h->matches, 1);
97
98 // No notification for second message.
99 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
100 BOOST_CHECK_EQUAL(h->matches, 1);
101
102 // Revert to level-triggered.
103 sub.set_events(EpollIn);
104 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
105 BOOST_CHECK_EQUAL(h->matches, 2);
106
107 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
108 BOOST_CHECK_EQUAL(h->matches, 2);
109
110 socks.first.send("foo", 4, 0);
111 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 1);
112 BOOST_CHECK_EQUAL(h->matches, 3);
113
114 BOOST_CHECK_EQUAL(r.poll(now, 0ms), 0);
115 BOOST_CHECK_EQUAL(h->matches, 3);
116}
117
119{
120 int i{0};
121 auto fn = [&i](CyclTime) { ++i; };
122
123 Reactor r{1024};
124
125 Hook h{bind(&fn)};
126 r.add_hook(h);
127
128 BOOST_CHECK_EQUAL(r.poll(CyclTime::now(), 0ms), 0);
130}
131
133{
134 Reactor r{1024};
135
136 struct counter {
137 int invocation_count{0};
138 void operator()(CyclTime, Timer&) { ++invocation_count; }
139 };
140 counter lpc;
141 counter hpc;
142
143 auto now = CyclTime::now();
144
145 // schedule 2 low priority timers for immediate execution
146 Timer lpts[] = {
147 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
148 r.timer(now.mono_time(), Priority::Low, bind<&counter::operator()>(&lpc)),
149 };
150
151 Timer hpt;
152
153 int num_of_times_polled = 0;
154 const auto end = now.mono_time() + 95ms;
155
156 // using 95ms instead of 100ms, because Reactor::poll internally
157 // uses own CyclTime, not the one we pass to it.
158
159 while (now.mono_time() < end) {
160
161 // schedule a high priority timer for immediate execution
162 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
163
164 // low priority timers won't be executed because it will be a busy cycle
165 // due to high priority timer that is due execution.
166 r.poll(now, 0s);
167
168 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
169 BOOST_CHECK_EQUAL(lpc.invocation_count, 0);
170
171 now = CyclTime::now();
172 }
173
174 std::this_thread::sleep_for(10ms);
175
176 // at this point, both low priority timers are delayed by >100ms.
177 // they will now activately executed (one per cycle) even if cycle is busy.
178 for (int i = 0; i < 2; i++) {
179 hpt = r.timer(now.mono_time(), Priority::High, bind<&counter::operator()>(&hpc));
180
181 r.poll(now, 0s);
182
183 BOOST_CHECK_EQUAL(hpc.invocation_count, ++num_of_times_polled);
184 BOOST_CHECK_EQUAL(lpc.invocation_count, i+1);
185
186 now = CyclTime::now();
187 }
188}
189
BOOST_CHECK_EQUAL(v.size(), 10U)
BOOST_AUTO_TEST_CASE(ReactorLevelCase)
Base class for atomic referenced counted objects.
Definition RefCount.hpp:61
STL namespace.
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
std::pair< IoSock, IoSock > socketpair(ProtocolT protocol)
Definition IoSock.hpp:90
ssize_t recv(int sockfd, void *buf, std::size_t len, int flags, std::error_code &ec) noexcept
Receive a message from a socket.
Definition Socket.hpp:354
constexpr auto bind() noexcept
Definition Slot.hpp:92