Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Conn.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2022 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_HTTP_CONN_HPP
18#define TOOLBOX_HTTP_CONN_HPP
19
28
29namespace toolbox {
30inline namespace http {
31class App;
32
33template <typename RequestT, typename AppT>
35: public Allocator
36, public BasicDisposer<BasicConn<RequestT, AppT>>
37, BasicParser<BasicConn<RequestT, AppT>> {
38
39 friend class BasicDisposer<BasicConn<RequestT, AppT>>;
40 friend class BasicParser<BasicConn<RequestT, AppT>>;
41
42 using Request = RequestT;
43 using App = AppT;
44 // Automatically unlink when object is destroyed.
45 using AutoUnlinkOption = boost::intrusive::link_mode<boost::intrusive::auto_unlink>;
46
47 static constexpr auto IdleTimeout = 5s;
48
52
53 public:
56
59 , reactor_{r}
60 , sock_{std::move(sock)}
61 , ep_{ep}
62 , app_{app}
63 {
64 sub_ = r.subscribe(*sock_, EpollIn, bind<&BasicConn::on_io_event>(this));
65 schedule_timeout(now);
66 app.on_http_connect(now, ep_);
67 }
68
69 // Copy.
70 BasicConn(const BasicConn&) = delete;
71 BasicConn& operator=(const BasicConn&) = delete;
72
73 // Move.
74 BasicConn(BasicConn&&) = delete;
76
77 const Endpoint& endpoint() const noexcept { return ep_; }
78 void clear() noexcept { req_.clear(); }
79 boost::intrusive::list_member_hook<AutoUnlinkOption> list_hook;
80
81 protected:
82 void dispose_now(CyclTime now) noexcept
83 {
84 app_.on_http_disconnect(now, ep_); // noexcept
85 // Best effort to drain any data still pending in the write buffer before the socket is
86 // closed.
87 if (!out_.empty()) {
88 std::error_code ec;
89 os::write(sock_.get(), out_.data(), ec); // noexcept
90 }
91 delete this;
92 }
93
94 private:
95 ~BasicConn() = default;
96 bool on_http_message_begin(CyclTime /*now*/) noexcept
97 {
98 in_progress_ = true;
99 req_.clear();
100 return true;
101 }
102 bool on_http_url(CyclTime now, std::string_view sv) noexcept
103 {
104 bool ret{false};
105 try {
106 req_.append_url(sv);
107 ret = true;
108 } catch (const std::exception& e) {
109 app_.on_http_error(now, ep_, e, os_);
110 this->dispose(now);
111 }
112 return ret;
113 }
114 bool on_http_status(CyclTime /*now*/, std::string_view /*sv*/) noexcept
115 {
116 // Only supported for HTTP responses.
117 return false;
118 }
119 bool on_http_header_field(CyclTime now, std::string_view sv, First first) noexcept
120 {
121 bool ret{false};
122 try {
123 req_.append_header_field(sv, first);
124 ret = true;
125 } catch (const std::exception& e) {
126 app_.on_http_error(now, ep_, e, os_);
127 this->dispose(now);
128 }
129 return ret;
130 }
131 bool on_http_header_value(CyclTime now, std::string_view sv, First first) noexcept
132 {
133 bool ret{false};
134 try {
135 req_.append_header_value(sv, first);
136 ret = true;
137 } catch (const std::exception& e) {
138 app_.on_http_error(now, ep_, e, os_);
139 this->dispose(now);
140 }
141 return ret;
142 }
143 bool on_http_headers_end(CyclTime /*now*/) noexcept
144 {
145 req_.set_method(method());
146 return true;
147 }
148 bool on_http_body(CyclTime now, std::string_view sv) noexcept
149 {
150 bool ret{false};
151 try {
152 req_.append_body(sv);
153 ret = true;
154 } catch (const std::exception& e) {
155 app_.on_http_error(now, ep_, e, os_);
156 this->dispose(now);
157 }
158 return ret;
159 }
160 bool on_http_message_end(CyclTime now) noexcept
161 {
162 bool ret{false};
163 try {
164 in_progress_ = false;
165 req_.flush(); // May throw.
166 app_.on_http_message(now, ep_, req_, os_);
167 ret = true;
168 } catch (const std::exception& e) {
169 app_.on_http_error(now, ep_, e, os_);
170 this->dispose(now);
171 }
172 return ret;
173 }
174 bool on_http_chunk_header(CyclTime /*now*/, std::size_t /*len*/) noexcept { return true; }
175 bool on_http_chunk_end(CyclTime /*now*/) noexcept { return true; }
176 void on_timeout_timer(CyclTime now, Timer& /*tmr*/)
177 {
178 auto lock = this->lock_this(now);
179 app_.on_http_timeout(now, ep_);
180 this->dispose(now);
181 }
182 void on_io_event(CyclTime now, int fd, unsigned events)
183 {
184 auto lock = this->lock_this(now);
185 try {
186 if (events & (EpollIn | EpollHup)) {
187 if (!drain_input(now, fd)) {
188 this->dispose(now);
189 return;
190 }
191 }
192 // Do not attempt to flush the output buffer if it is empty or if we are still waiting
193 // for the socket to become writable.
194 if (out_.empty() || (write_blocked_ && !(events & EpollOut))) {
195 return;
196 }
197 flush_output(now);
198 } catch (const Exception&) {
199 // Do not call on_http_error() here, because it will have already been called in one of
200 // the noexcept parser callback functions.
201 } catch (const std::exception& e) {
202 app_.on_http_error(now, ep_, e, os_);
203 this->dispose(now);
204 }
205 }
206 bool drain_input(CyclTime now, int fd)
207 {
208 // Limit the number of reads to avoid starvation.
209 for (int i{0}; i < 4; ++i) {
210 std::error_code ec;
211 const auto buf = in_.prepare(2944);
212 const auto size = os::read(fd, buf, ec);
213 if (ec) {
214 // No data available in socket buffer.
215 if (ec == std::errc::operation_would_block) {
216 break;
217 }
218 throw std::system_error{ec, "read"};
219 }
220 if (size == 0) {
221 // N.B. the socket may still be writable if the peer has performed a shutdown on the
222 // write side of the socket only.
223 flush_input(now);
224 return false;
225 }
226 // Commit actual bytes read.
227 in_.commit(size);
228 // Assume that the TCP stream has been drained if we read less than the requested
229 // amount.
230 if (static_cast<size_t>(size) < buffer_size(buf)) {
231 break;
232 }
233 }
234 flush_input(now);
235 // Reset timer.
236 schedule_timeout(now);
237 return true;
238 }
239 void flush_input(CyclTime now) { in_.consume(parse(now, in_.data())); }
240 void flush_output(CyclTime now)
241 {
242 // Attempt to flush buffered data.
243 out_.consume(sock_.write(out_.data()));
244 if (out_.empty()) {
245 if (!in_progress_ && !should_keep_alive()) {
246 this->dispose(now);
247 return;
248 }
249 if (write_blocked_) {
250 // Restore read-only state after the buffer has been drained.
251 sub_.set_events(EpollIn);
252 write_blocked_ = false;
253 }
254 } else if (!write_blocked_) {
255 // Set the state to read-write if the entire buffer could not be written.
256 sub_.set_events(EpollIn | EpollOut);
257 write_blocked_ = true;
258 }
259 }
260 void schedule_timeout(CyclTime now)
261 {
262 const auto timeout = std::chrono::ceil<Seconds>(now.mono_time() + IdleTimeout);
263 tmr_ = reactor_.timer(timeout, Priority::Low, bind<&BasicConn::on_timeout_timer>(this));
264 }
265
266 Reactor& reactor_;
267 IoSock sock_;
268 Endpoint ep_;
269 App& app_;
270 Reactor::Handle sub_;
271 Timer tmr_;
272 Buffer in_, out_;
273 Request req_;
274 OStream os_{out_};
275 bool in_progress_{false}, write_blocked_{false};
276};
277
279
280} // namespace http
281} // namespace toolbox
282
283#endif // TOOLBOX_HTTP_CONN_HPP
BasicConn & operator=(BasicConn &&)=delete
BasicConn(BasicConn &&)=delete
BasicConn & operator=(const BasicConn &)=delete
void dispose_now(CyclTime now) noexcept
Definition Conn.hpp:82
boost::intrusive::list_member_hook< AutoUnlinkOption > list_hook
Definition Conn.hpp:79
void clear() noexcept
Definition Conn.hpp:78
StreamEndpoint Endpoint
Definition Conn.hpp:55
BasicConn(const BasicConn &)=delete
const Endpoint & endpoint() const noexcept
Definition Conn.hpp:77
BasicConn(CyclTime now, Reactor &r, IoSock &&sock, const Endpoint &ep, App &app)
Definition Conn.hpp:57
std::size_t parse(CyclTime, ConstBuffer buf)
Definition Parser.hpp:66
MutableBuffer prepare(std::size_t size)
Returns write buffer of at least size bytes.
Definition Buffer.hpp:86
void consume(std::size_t count) noexcept
Remove characters from the read sequence.
Definition Buffer.cpp:22
bool empty() const noexcept
Returns true if read buffer is empty.
Definition Buffer.hpp:71
ConstBuffer data() const noexcept
Returns available data as a buffer.
Definition Buffer.hpp:48
void commit(std::size_t count) noexcept
Move characters from the write sequence to the read sequence.
Definition Buffer.hpp:80
Timer timer(MonoTime expiry, Duration interval, Priority priority, TimerSlot slot)
Throws std::bad_alloc only.
Definition Reactor.hpp:148
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
@ EpollOut
The associated file is available for write(2) operations.
Definition Epoll.hpp:118
BasicEndpoint< StreamProtocol > StreamEndpoint
Definition Endpoint.hpp:36
ssize_t read(int fd, void *buf, std::size_t len, std::error_code &ec) noexcept
Read from a file descriptor.
Definition File.hpp:163
ssize_t write(int fd, const void *buf, std::size_t len, std::error_code &ec) noexcept
Write to a file descriptor.
Definition File.hpp:195
constexpr std::size_t size(const detail::Struct< detail::Member< TagsT, ValuesT >... > &s)
Definition Struct.hpp:98
std::string_view sv
Definition Tokeniser.hpp:26
constexpr auto bind() noexcept
Definition Slot.hpp:97
ssize_t write(const void *buf, std::size_t len, std::error_code &ec) noexcept
Definition IoSock.hpp:71