Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
StreamConnector.hpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2022 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#ifndef TOOLBOX_NET_STREAMCONNECTOR_HPP
18#define TOOLBOX_NET_STREAMCONNECTOR_HPP
19
22
23namespace toolbox {
24inline namespace net {
25
26template <typename DerivedT>
28 public:
31
32 StreamConnector() = default;
33
34 // Copy.
37
38 // Move.
41
42 /*
43 * Returns true if connection was established synchronously or false if connection is pending
44 * asynchronous completion.
45 */
46 bool connect(CyclTime now, Reactor& r, const Endpoint& ep)
47 {
48 StreamSockClnt sock{ep.protocol()};
49 static_cast<DerivedT*>(this)->on_sock_prepare(now, sock);
51 if (sock.is_ip_family()) {
52 set_tcp_no_delay(sock.get(), true);
53 if (!is_tcp_no_delay(sock.get())) {
54 throw std::runtime_error{"TCP_NODELAY option not set"};
55 }
56 }
57
58 std::error_code ec;
59 sock.connect(ep, ec);
60 if (ec) {
61 if (ec != std::errc::operation_in_progress) {
62 throw std::system_error{ec, "connect"};
63 }
64 sub_
66 ep_ = ep;
67 sock_ = std::move(sock);
68 return false;
69 }
70 static_cast<DerivedT*>(this)->on_sock_connect(now, std::move(sock), ep);
71 return true;
72 }
73
74 protected:
75 ~StreamConnector() = default;
76
77 private:
78 void on_io_event(CyclTime now, int /*fd*/, unsigned /*events*/)
79 {
80 IoSock sock{std::move(sock_)};
81 sub_.reset();
82 try {
83 const auto ec = sock.get_error();
84 if (ec) {
85 throw std::system_error{ec, "connect"};
86 }
87 static_cast<DerivedT*>(this)->on_sock_connect(now, std::move(sock), ep_);
88 } catch (const std::system_error& e) {
89 static_cast<DerivedT*>(this)->on_sock_connect_error(now, e);
90 } catch (const std::exception& e) {
91 static_cast<DerivedT*>(this)->on_sock_connect_error(now, e);
92 }
93 }
94
95 Endpoint ep_;
96 StreamSockClnt sock_;
97 Reactor::Handle sub_;
98};
99
100} // namespace net
101} // namespace toolbox
102
103#endif // TOOLBOX_NET_STREAMCONNECTOR_HPP
StreamConnector(const StreamConnector &)=delete
StreamConnector & operator=(const StreamConnector &)=delete
StreamConnector(StreamConnector &&)=delete
StreamConnector & operator=(StreamConnector &&)=delete
bool connect(CyclTime now, Reactor &r, const Endpoint &ep)
@ EpollIn
The associated file is available for read(2) operations.
Definition Epoll.hpp:115
@ EpollOut
The associated file is available for write(2) operations.
Definition Epoll.hpp:118
void set_tcp_no_delay(int sockfd, bool enabled, std::error_code &ec) noexcept
Definition Socket.hpp:736
BasicEndpoint< StreamProtocol > StreamEndpoint
Definition Endpoint.hpp:35
bool is_tcp_no_delay(int sockfd, std::error_code &ec) noexcept
Definition Socket.hpp:678
constexpr auto bind() noexcept
Definition Slot.hpp:92
void set_non_block(std::error_code &ec) noexcept
Definition Socket.hpp:817
Active Client Stream Socket. All state is in base class, so object can be sliced.