Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
EchoClnt.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2022 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include <toolbox/io.hpp>
18#include <toolbox/net.hpp>
19#include <toolbox/sys.hpp>
20#include <toolbox/util.hpp>
21
22using namespace std;
23using namespace toolbox;
24
25namespace {
26
27constexpr auto PingInterval = 100ms;
28
29class EchoConn {
30
31 // Automatically unlink when object is destroyed.
32 using AutoUnlinkOption = boost::intrusive::link_mode<boost::intrusive::auto_unlink>;
33
34 public:
35 EchoConn(CyclTime now, Reactor& r, IoSock&& sock, const StreamEndpoint& ep)
36 : sock_{std::move(sock)}
37 , ep_{ep}
38 {
39 sub_ = r.subscribe(sock_.get(), EpollIn, bind<&EchoConn::on_input>(this));
40 tmr_ = r.timer(now.mono_time(), PingInterval, Priority::Low,
42 }
43 void dispose(CyclTime /*now*/) noexcept
44 {
45 TOOLBOX_INFO << "connection closed";
46 delete this;
47 }
48 boost::intrusive::list_member_hook<AutoUnlinkOption> list_hook;
49
50 private:
51 ~EchoConn() = default;
52 void on_input(CyclTime now, int fd, unsigned events)
53 {
54 try {
55 if (events & (EpollIn | EpollHup)) {
56 const auto size = os::read(fd, buf_.prepare(2944));
57 if (size == 0) {
58 dispose(now);
59 return;
60 }
61 // Commit actual bytes read.
62 buf_.commit(size);
63
64 // Parse each buffered line.
65 auto fn = [this](std::string_view line) {
66 ++count_;
67 // Echo bytes back to client.
68 TOOLBOX_INFO << "received: " << line;
69 };
70 buf_.consume(parse_line(buf_.str(), fn));
71 if (count_ == 5) {
72 dispose(now);
73 return;
74 }
75 }
76 } catch (const std::exception& e) {
77 TOOLBOX_ERROR << "exception on input: " << e.what();
78 dispose(now);
79 }
80 }
81 void on_timer(CyclTime now, Timer& /*tmr*/)
82 {
83 try {
84 if (sock_.send("ping\n", 5, 0) < 5) {
85 throw runtime_error{"partial write"};
86 }
87 } catch (const std::exception& e) {
88 TOOLBOX_ERROR << "exception on timer: " << e.what();
89 dispose(now);
90 }
91 }
92 IoSock sock_;
93 const StreamEndpoint ep_;
94 Reactor::Handle sub_;
95 Buffer buf_;
96 Timer tmr_;
97 int count_{0};
98};
99
100class EchoClnt : public StreamConnector<EchoClnt> {
101
103 using ConstantTimeSizeOption = boost::intrusive::constant_time_size<false>;
104 using MemberHookOption = boost::intrusive::member_hook<EchoConn, decltype(EchoConn::list_hook),
105 &EchoConn::list_hook>;
106 using ConnList = boost::intrusive::list<EchoConn, ConstantTimeSizeOption, MemberHookOption>;
107
108 public:
109 EchoClnt(CyclTime now, Reactor& reactor, Resolver& resolver, const string& uri)
110 : reactor_{reactor}
111 , resolver_{resolver}
112 , uri_{uri}
114 {
115 // Immediate and then at 5s intervals.
116 tmr_ = reactor_.timer(now.mono_time(), 5s, Priority::Low, bind<&EchoClnt::on_timer>(this));
117 }
118 ~EchoClnt()
119 {
120 const auto now = CyclTime::current();
121 conn_list_.clear_and_dispose([now](auto* conn) { conn->dispose(now); });
122 }
123
124 private:
125 void on_sock_prepare(CyclTime /*now*/, IoSock& sock)
126 {
127 if (sock.is_ip_family()) {
128 // Set the number of SYN retransmits that TCP should send before aborting the attempt to
129 // connect.
130 set_tcp_syn_nt(sock.get(), 1);
131 }
132 }
133 void on_sock_connect(CyclTime now, IoSock&& sock, const Endpoint& ep)
134 {
135 TOOLBOX_INFO << "connection opened: " << ep;
136 inprogress_ = false;
137
138 // High performance TCP servers could use a custom allocator.
139 auto* const conn = new EchoConn{now, reactor_, std::move(sock), ep};
140 conn_list_.push_back(*conn);
141 }
142 void on_sock_connect_error(CyclTime /*now*/, const std::exception& e)
143 {
144 TOOLBOX_ERROR << "could not connect: " << e.what();
145 aifuture_ = resolver_.resolve(uri_, SOCK_STREAM);
146 inprogress_ = false;
147 }
148 void on_timer(CyclTime now, Timer& /*tmr*/)
149 {
150 if (!conn_list_.empty() || inprogress_) {
151 return;
152 }
153 if (aifuture_.valid()) {
154 if (!is_ready(aifuture_)) {
155 TOOLBOX_INFO << "address pending";
156 return;
157 }
158 try {
159 ep_ = get_endpoint<Endpoint>(aifuture_);
160 } catch (const std::exception& e) {
161 TOOLBOX_ERROR << "could not resolve address: " << e.what();
162 aifuture_ = resolver_.resolve(uri_, SOCK_STREAM);
163 return;
164 }
165 TOOLBOX_INFO << "address resolved: " << ep_;
166 }
167 TOOLBOX_INFO << "reconnecting";
168 if (!connect(now, reactor_, ep_)) {
169 inprogress_ = true;
170 }
171 }
172 Reactor& reactor_;
173 Resolver& resolver_;
174 const string uri_;
175 Timer tmr_;
176 AddrInfoFuture aifuture_;
177 Endpoint ep_;
178 bool inprogress_{false};
179 // List of active connections.
180 ConnList conn_list_;
181};
182} // namespace
183
184int main()
185{
186 int ret = 1;
187 try {
188
189 const auto start_time = CyclTime::now();
190
191 Reactor reactor{1024};
193 EchoClnt echo_clnt{start_time, reactor, resolver, "tcp4://127.0.0.1:7777"};
194
195 // Start service threads.
197 ReactorRunner reactor_runner{reactor, 100, "reactor"s};
198 Runner resolver_runner{resolver, "resolver"s};
199
200 // Wait for termination.
202 for (;;) {
203 switch (const auto sig = sig_wait()) {
204 case SIGHUP:
205 TOOLBOX_INFO << "received SIGHUP";
206 continue;
207 case SIGINT:
208 TOOLBOX_INFO << "received SIGINT";
209 break;
210 case SIGTERM:
211 TOOLBOX_INFO << "received SIGTERM";
212 break;
213 default:
214 TOOLBOX_INFO << "received signal: " << sig;
215 continue;
216 }
217 break;
218 }
219 ret = 0;
220
221 } catch (const std::exception& e) {
222 TOOLBOX_ERROR << "exception on main thread: " << e.what();
223 }
224 return ret;
225}
int main()
Definition EchoClnt.cpp:184
#define TOOLBOX_INFO
Definition Log.hpp:97
#define TOOLBOX_ERROR
Definition Log.hpp:93
Handle subscribe(int fd, unsigned events, IoSlot slot)
Definition Reactor.cpp:42
MonoTime mono_time() const noexcept
Definition Time.hpp:140
STL namespace.
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicEndpoint< StreamProtocol > StreamEndpoint
Definition Endpoint.hpp:35
void set_tcp_syn_nt(int sockfd, int retrans, std::error_code &ec) noexcept
Definition Socket.hpp:764
std::future< AddrInfoPtr > AddrInfoFuture
Definition Resolver.hpp:30
StreamEndpoint parse_stream_endpoint(std::string_view uri)
Definition Endpoint.hpp:57
bool is_ready(std::future< ResultT > &future)
Returns true if future is ready.
Definition Resolver.hpp:78
ssize_t read(int fd, void *buf, std::size_t len, std::error_code &ec) noexcept
Read from a file descriptor.
Definition File.hpp:146
void connect(int sockfd, const sockaddr &addr, socklen_t addrlen, std::error_code &ec) noexcept
Initiate a connection on a socket.
Definition Socket.hpp:291
constexpr std::size_t parse_line(std::string_view buf, FnT fn)
Definition Tokeniser.hpp:86
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:92