Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Conn.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2022 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_HTTP_CONN_HPP
18#define TOOLBOX_HTTP_CONN_HPP
19
27
28namespace toolbox {
29inline namespace http {
30class App;
31
32template <typename RequestT, typename AppT>
34: public Allocator
35, public BasicDisposer<BasicConn<RequestT, AppT>>
36, BasicParser<BasicConn<RequestT, AppT>> {
37
38 friend class BasicDisposer<BasicConn<RequestT, AppT>>;
39 friend class BasicParser<BasicConn<RequestT, AppT>>;
40
41 using Request = RequestT;
42 using App = AppT;
43 // Automatically unlink when object is destroyed.
44 using AutoUnlinkOption = boost::intrusive::link_mode<boost::intrusive::auto_unlink>;
45
46 static constexpr auto IdleTimeout = 5s;
47
51
52 public:
55
58 , reactor_{r}
59 , sock_{std::move(sock)}
60 , ep_{ep}
61 , app_{app}
62 {
63 sub_ = r.subscribe(*sock_, EpollIn, bind<&BasicConn::on_io_event>(this));
64 schedule_timeout(now);
65 app.on_http_connect(now, ep_);
66 }
67
68 // Copy.
69 BasicConn(const BasicConn&) = delete;
70 BasicConn& operator=(const BasicConn&) = delete;
71
72 // Move.
73 BasicConn(BasicConn&&) = delete;
75
76 const Endpoint& endpoint() const noexcept { return ep_; }
77 void clear() noexcept { req_.clear(); }
78 boost::intrusive::list_member_hook<AutoUnlinkOption> list_hook;
79
80 protected:
81 void dispose_now(CyclTime now) noexcept
82 {
83 app_.on_http_disconnect(now, ep_); // noexcept
84 // Best effort to drain any data still pending in the write buffer before the socket is
85 // closed.
86 if (!out_.empty()) {
87 std::error_code ec;
88 os::write(sock_.get(), out_.data(), ec); // noexcept
89 }
90 delete this;
91 }
92
93 private:
94 ~BasicConn() = default;
95 bool on_http_message_begin(CyclTime /*now*/) noexcept
96 {
97 in_progress_ = true;
98 req_.clear();
99 return true;
100 }
101 bool on_http_url(CyclTime now, std::string_view sv) noexcept
102 {
103 bool ret{false};
104 try {
105 req_.append_url(sv);
106 ret = true;
107 } catch (const std::exception& e) {
108 app_.on_http_error(now, ep_, e, os_);
109 this->dispose(now);
110 }
111 return ret;
112 }
113 bool on_http_status(CyclTime /*now*/, std::string_view /*sv*/) noexcept
114 {
115 // Only supported for HTTP responses.
116 return false;
117 }
118 bool on_http_header_field(CyclTime now, std::string_view sv, First first) noexcept
119 {
120 bool ret{false};
121 try {
122 req_.append_header_field(sv, first);
123 ret = true;
124 } catch (const std::exception& e) {
125 app_.on_http_error(now, ep_, e, os_);
126 this->dispose(now);
127 }
128 return ret;
129 }
130 bool on_http_header_value(CyclTime now, std::string_view sv, First first) noexcept
131 {
132 bool ret{false};
133 try {
134 req_.append_header_value(sv, first);
135 ret = true;
136 } catch (const std::exception& e) {
137 app_.on_http_error(now, ep_, e, os_);
138 this->dispose(now);
139 }
140 return ret;
141 }
142 bool on_http_headers_end(CyclTime /*now*/) noexcept
143 {
144 req_.set_method(method());
145 return true;
146 }
147 bool on_http_body(CyclTime now, std::string_view sv) noexcept
148 {
149 bool ret{false};
150 try {
151 req_.append_body(sv);
152 ret = true;
153 } catch (const std::exception& e) {
154 app_.on_http_error(now, ep_, e, os_);
155 this->dispose(now);
156 }
157 return ret;
158 }
159 bool on_http_message_end(CyclTime now) noexcept
160 {
161 bool ret{false};
162 try {
163 in_progress_ = false;
164 req_.flush(); // May throw.
165 app_.on_http_message(now, ep_, req_, os_);
166 ret = true;
167 } catch (const std::exception& e) {
168 app_.on_http_error(now, ep_, e, os_);
169 this->dispose(now);
170 }
171 return ret;
172 }
173 bool on_http_chunk_header(CyclTime /*now*/, std::size_t /*len*/) noexcept { return true; }
174 bool on_http_chunk_end(CyclTime /*now*/) noexcept { return true; }
175 void on_timeout_timer(CyclTime now, Timer& /*tmr*/)
176 {
177 auto lock = this->lock_this(now);
178 app_.on_http_timeout(now, ep_);
179 this->dispose(now);
180 }
181 void on_io_event(CyclTime now, int fd, unsigned events)
182 {
183 auto lock = this->lock_this(now);
184 try {
185 if (events & (EpollIn | EpollHup)) {
186 if (!drain_input(now, fd)) {
187 this->dispose(now);
188 return;
189 }
190 }
191 // Do not attempt to flush the output buffer if it is empty or if we are still waiting
192 // for the socket to become writable.
193 if (out_.empty() || (write_blocked_ && !(events & EpollOut))) {
194 return;
195 }
196 flush_output(now);
197 } catch (const Exception&) {
198 // Do not call on_http_error() here, because it will have already been called in one of
199 // the noexcept parser callback functions.
200 } catch (const std::exception& e) {
201 app_.on_http_error(now, ep_, e, os_);
202 this->dispose(now);
203 }
204 }
205 bool drain_input(CyclTime now, int fd)
206 {
207 // Limit the number of reads to avoid starvation.
208 for (int i{0}; i < 4; ++i) {
209 std::error_code ec;
210 const auto buf = in_.prepare(2944);
211 const auto size = os::read(fd, buf, ec);
212 if (ec) {
213 // No data available in socket buffer.
214 if (ec == std::errc::operation_would_block) {
215 break;
216 }
217 throw std::system_error{ec, "read"};
218 }
219 if (size == 0) {
220 // N.B. the socket may still be writable if the peer has performed a shutdown on the
221 // write side of the socket only.
222 flush_input(now);
223 return false;
224 }
225 // Commit actual bytes read.
226 in_.commit(size);
227 // Assume that the TCP stream has been drained if we read less than the requested
228 // amount.
229 if (static_cast<size_t>(size) < buffer_size(buf)) {
230 break;
231 }
232 }
233 flush_input(now);
234 // Reset timer.
235 schedule_timeout(now);
236 return true;
237 }
238 void flush_input(CyclTime now) { in_.consume(parse(now, in_.data())); }
239 void flush_output(CyclTime now)
240 {
241 // Attempt to flush buffered data.
242 out_.consume(sock_.write(out_.data()));
243 if (out_.empty()) {
244 if (!in_progress_ && !should_keep_alive()) {
245 this->dispose(now);
246 return;
247 }
248 if (write_blocked_) {
249 // Restore read-only state after the buffer has been drained.
250 sub_.set_events(EpollIn);
251 write_blocked_ = false;
252 }
253 } else if (!write_blocked_) {
254 // Set the state to read-write if the entire buffer could not be written.
255 sub_.set_events(EpollIn | EpollOut);
256 write_blocked_ = true;
257 }
258 }
259 void schedule_timeout(CyclTime now)
260 {
261 const auto timeout = std::chrono::ceil<Seconds>(now.mono_time() + IdleTimeout);
262 tmr_ = reactor_.timer(timeout, Priority::Low, bind<&BasicConn::on_timeout_timer>(this));
263 }
264
265 Reactor& reactor_;
266 IoSock sock_;
267 Endpoint ep_;
268 App& app_;
269 Reactor::Handle sub_;
270 Timer tmr_;
271 Buffer in_, out_;
272 Request req_;
273 OStream os_{out_};
274 bool in_progress_{false}, write_blocked_{false};
275};
276
278
279} // namespace http
280} // namespace toolbox
281
282#endif // TOOLBOX_HTTP_CONN_HPP
BasicConn & operator=(BasicConn &&)=delete
BasicConn(BasicConn &&)=delete
BasicConn & operator=(const BasicConn &)=delete
void dispose_now(CyclTime now) noexcept
Definition Conn.hpp:81
boost::intrusive::list_member_hook< AutoUnlinkOption > list_hook
Definition Conn.hpp:78
void clear() noexcept
Definition Conn.hpp:77
StreamEndpoint Endpoint
Definition Conn.hpp:54
BasicConn(const BasicConn &)=delete
const Endpoint & endpoint() const noexcept
Definition Conn.hpp:76
BasicConn(CyclTime now, Reactor &r, IoSock &&sock, const Endpoint &ep, App &app)
Definition Conn.hpp:56
std::size_t parse(CyclTime, ConstBuffer buf)
Definition Parser.hpp:66
MutableBuffer prepare(std::size_t size)
Returns write buffer of at least size bytes.
Definition Buffer.hpp:87
void consume(std::size_t count) noexcept
Remove characters from the read sequence.
Definition Buffer.cpp:22
bool empty() const noexcept
Returns true if read buffer is empty.
Definition Buffer.hpp:72
ConstBuffer data() const noexcept
Returns available data as a buffer.
Definition Buffer.hpp:49
void commit(std::size_t count) noexcept
Move characters from the write sequence to the read sequence.
Definition Buffer.hpp:81
Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:142
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
@ EpollOut
The associated file is available for write(2) operations.
Definition Epoll.hpp:118
BasicEndpoint< StreamProtocol > StreamEndpoint
Definition Endpoint.hpp:35
ssize_t read(int fd, void *buf, std::size_t len, std::error_code &ec) noexcept
Read from a file descriptor.
Definition File.hpp:146
ssize_t write(int fd, const void *buf, std::size_t len, std::error_code &ec) noexcept
Write to a file descriptor.
Definition File.hpp:178
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
std::string_view sv
Definition Tokeniser.hpp:26
constexpr auto bind() noexcept
Definition Slot.hpp:92
ssize_t write(const void *buf, std::size_t len, std::error_code &ec) noexcept
Definition IoSock.hpp:61