Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
EchoServ.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2022 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include <toolbox/io.hpp>
18#include <toolbox/net.hpp>
19#include <toolbox/sys.hpp>
20#include <toolbox/util.hpp>
21
22using namespace std;
23using namespace toolbox;
24
25namespace {
26
27constexpr auto IdleTimeout = 5s;
28
29class EchoConn {
30
31 // Automatically unlink when object is destroyed.
32 using AutoUnlinkOption = boost::intrusive::link_mode<boost::intrusive::auto_unlink>;
33
34 public:
35 template <typename EndpointT>
36 EchoConn(CyclTime now, Reactor& r, IoSock&& sock, const EndpointT& ep)
37 : reactor_{r}
38 , sock_{std::move(sock)}
39 , ep_{ep}
40 {
41 sub_ = r.subscribe(sock_.get(), EpollIn, bind<&EchoConn::on_input>(this));
42 tmr_ = r.timer(now.mono_time() + IdleTimeout, Priority::Low,
44 }
45 void dispose(CyclTime /*now*/) noexcept
46 {
47 TOOLBOX_INFO << "connection closed";
48 delete this;
49 }
50 boost::intrusive::list_member_hook<AutoUnlinkOption> list_hook;
51
52 private:
53 ~EchoConn() = default;
54 void on_input(CyclTime now, int fd, unsigned events)
55 {
56 try {
57 if (events & (EpollIn | EpollHup)) {
58 const auto size = os::read(fd, buf_.prepare(2944));
59 if (size == 0) {
60 dispose(now);
61 return;
62 }
63 // Commit actual bytes read.
64 buf_.commit(size);
65
66 // Parse each buffered line.
67 auto fn = [fd](std::string_view line) {
68 // Echo bytes back to client.
69 std::string buf{line};
70 buf += '\n';
71 if (os::write(fd, {buf.data(), buf.size()}) < buf.size()) {
72 throw runtime_error{"partial write"};
73 }
74 };
75 buf_.consume(parse_line(buf_.str(), fn));
76
77 // Reset timer.
78 tmr_.cancel();
79 tmr_ = reactor_.timer(now.mono_time() + IdleTimeout, Priority::Low,
81 }
82 } catch (const std::exception& e) {
83 TOOLBOX_ERROR << "exception on input: " << e.what();
84 dispose(now);
85 }
86 }
87 void on_timer(CyclTime now, Timer& /*tmr*/)
88 {
89 TOOLBOX_INFO << "timeout";
90 dispose(now);
91 }
92 Reactor& reactor_;
93 IoSock sock_;
94 const StreamEndpoint ep_;
95 Reactor::Handle sub_;
96 Buffer buf_;
97 Timer tmr_;
98};
99
100class EchoServ : public StreamAcceptor<EchoServ> {
101
103 using ConstantTimeSizeOption = boost::intrusive::constant_time_size<false>;
104 using MemberHookOption = boost::intrusive::member_hook<EchoConn, decltype(EchoConn::list_hook),
105 &EchoConn::list_hook>;
106 using ConnList = boost::intrusive::list<EchoConn, ConstantTimeSizeOption, MemberHookOption>;
107
108 public:
109 EchoServ(CyclTime /*now*/, Reactor& r, const Endpoint& ep)
111 , reactor_{r}
112 {
113 }
114 ~EchoServ()
115 {
116 const auto now = CyclTime::current();
117 conn_list_.clear_and_dispose([now](auto* conn) { conn->dispose(now); });
118 }
119
120 private:
121 void on_sock_prepare(CyclTime /*now*/, IoSock& /*sock*/) {}
122 void on_sock_accept(CyclTime now, IoSock&& sock, const Endpoint& ep)
123 {
124 TOOLBOX_INFO << "connection opened: " << ep;
125
126 // High performance TCP servers could use a custom allocator.
127 auto* const conn = new EchoConn{now, reactor_, std::move(sock), ep};
128 conn_list_.push_back(*conn);
129 }
130 Reactor& reactor_;
131 // List of active connections.
132 ConnList conn_list_;
133};
134} // namespace
135
136int main()
137{
138 int ret = 1;
139 try {
140
141 const auto start_time = CyclTime::now();
142
143 Reactor reactor{1024};
144 const TcpEndpoint ep{TcpProtocol::v4(), 7777};
145 EchoServ echo_serv{start_time, reactor, ep};
146
147 // Start service threads.
149 ReactorRunner reactor_runner{reactor, 100, "reactor"s};
150
151 // Wait for termination.
153 for (;;) {
154 switch (const auto sig = sig_wait()) {
155 case SIGHUP:
156 TOOLBOX_INFO << "received SIGHUP";
157 continue;
158 case SIGINT:
159 TOOLBOX_INFO << "received SIGINT";
160 break;
161 case SIGTERM:
162 TOOLBOX_INFO << "received SIGTERM";
163 break;
164 default:
165 TOOLBOX_INFO << "received signal: " << sig;
166 continue;
167 }
168 break;
169 }
170 ret = 0;
171
172 } catch (const std::exception& e) {
173 TOOLBOX_ERROR << "exception on main thread: " << e.what();
174 }
175 return ret;
176}
int main()
Definition EchoServ.cpp:136
#define TOOLBOX_INFO
Definition Log.hpp:97
#define TOOLBOX_ERROR
Definition Log.hpp:93
MonoTime mono_time() const noexcept
Definition Time.hpp:140
STL namespace.
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
BasicEndpoint< StreamProtocol > StreamEndpoint
Definition Endpoint.hpp:35
IpEndpoint< TcpProtocol > TcpEndpoint
Definition Endpoint.hpp:41
ssize_t read(int fd, void *buf, std::size_t len, std::error_code &ec) noexcept
Read from a file descriptor.
Definition File.hpp:146
ssize_t write(int fd, const void *buf, std::size_t len, std::error_code &ec) noexcept
Write to a file descriptor.
Definition File.hpp:178
constexpr std::size_t parse_line(std::string_view buf, FnT fn)
Definition Tokeniser.hpp:86
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
constexpr auto bind() noexcept
Definition Slot.hpp:92