1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118 -
    std::error_code
118 +
    std::error_code open_socket(tcp_socket::implementation& impl) override;
119 -
    open_socket(tcp_socket::implementation& impl,
 
120 -
                int family, int type, int protocol) override;
 
121  

119  

122  
    select_scheduler& scheduler() const noexcept
120  
    select_scheduler& scheduler() const noexcept
123  
    {
121  
    {
124  
        return state_->sched_;
122  
        return state_->sched_;
125  
    }
123  
    }
126  
    void post(select_op* op);
124  
    void post(select_op* op);
127  
    void work_started() noexcept;
125  
    void work_started() noexcept;
128  
    void work_finished() noexcept;
126  
    void work_finished() noexcept;
129  

127  

130  
private:
128  
private:
131  
    std::unique_ptr<select_socket_state> state_;
129  
    std::unique_ptr<select_socket_state> state_;
132  
};
130  
};
133  

131  

134  
// Backward compatibility alias
132  
// Backward compatibility alias
135  
using select_sockets = select_socket_service;
133  
using select_sockets = select_socket_service;
136  

134  

137  
inline void
135  
inline void
138  
select_op::canceller::operator()() const noexcept
136  
select_op::canceller::operator()() const noexcept
139  
{
137  
{
140  
    op->cancel();
138  
    op->cancel();
141  
}
139  
}
142  

140  

143  
inline void
141  
inline void
144  
select_connect_op::cancel() noexcept
142  
select_connect_op::cancel() noexcept
145  
{
143  
{
146  
    if (socket_impl_)
144  
    if (socket_impl_)
147  
        socket_impl_->cancel_single_op(*this);
145  
        socket_impl_->cancel_single_op(*this);
148  
    else
146  
    else
149  
        request_cancel();
147  
        request_cancel();
150  
}
148  
}
151  

149  

152  
inline void
150  
inline void
153  
select_read_op::cancel() noexcept
151  
select_read_op::cancel() noexcept
154  
{
152  
{
155  
    if (socket_impl_)
153  
    if (socket_impl_)
156  
        socket_impl_->cancel_single_op(*this);
154  
        socket_impl_->cancel_single_op(*this);
157  
    else
155  
    else
158  
        request_cancel();
156  
        request_cancel();
159  
}
157  
}
160  

158  

161  
inline void
159  
inline void
162  
select_write_op::cancel() noexcept
160  
select_write_op::cancel() noexcept
163  
{
161  
{
164  
    if (socket_impl_)
162  
    if (socket_impl_)
165  
        socket_impl_->cancel_single_op(*this);
163  
        socket_impl_->cancel_single_op(*this);
166  
    else
164  
    else
167  
        request_cancel();
165  
        request_cancel();
168  
}
166  
}
169  

167  

170  
inline void
168  
inline void
171  
select_connect_op::operator()()
169  
select_connect_op::operator()()
172  
{
170  
{
173  
    stop_cb.reset();
171  
    stop_cb.reset();
174  

172  

175  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
173  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176  

174  

177  
    // Cache endpoints on successful connect
175  
    // Cache endpoints on successful connect
178  
    if (success && socket_impl_)
176  
    if (success && socket_impl_)
179  
    {
177  
    {
 
178 +
        // Query local endpoint via getsockname (may fail, but remote is always known)
180  
        endpoint local_ep;
179  
        endpoint local_ep;
181 -
        sockaddr_storage local_storage{};
180 +
        sockaddr_in local_addr{};
182 -
        socklen_t local_len = sizeof(local_storage);
181 +
        socklen_t local_len = sizeof(local_addr);
183  
        if (::getsockname(
182  
        if (::getsockname(
184 -
                fd, reinterpret_cast<sockaddr*>(&local_storage),
183 +
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
185 -
                &local_len) == 0)
184 +
            local_ep = from_sockaddr_in(local_addr);
186 -
            local_ep = from_sockaddr(local_storage);
185 +
        // Always cache remote endpoint; local may be default if getsockname failed
187  
        static_cast<select_socket*>(socket_impl_)
186  
        static_cast<select_socket*>(socket_impl_)
188  
            ->set_endpoints(local_ep, target_endpoint);
187  
            ->set_endpoints(local_ep, target_endpoint);
189  
    }
188  
    }
190  

189  

191  
    if (ec_out)
190  
    if (ec_out)
192  
    {
191  
    {
193  
        if (cancelled.load(std::memory_order_acquire))
192  
        if (cancelled.load(std::memory_order_acquire))
194  
            *ec_out = capy::error::canceled;
193  
            *ec_out = capy::error::canceled;
195  
        else if (errn != 0)
194  
        else if (errn != 0)
196  
            *ec_out = make_err(errn);
195  
            *ec_out = make_err(errn);
197  
        else
196  
        else
198  
            *ec_out = {};
197  
            *ec_out = {};
199  
    }
198  
    }
200  

199  

201  
    if (bytes_out)
200  
    if (bytes_out)
202  
        *bytes_out = bytes_transferred;
201  
        *bytes_out = bytes_transferred;
203  

202  

204  
    // Move to stack before destroying the frame
203  
    // Move to stack before destroying the frame
205  
    capy::executor_ref saved_ex(ex);
204  
    capy::executor_ref saved_ex(ex);
206  
    std::coroutine_handle<> saved_h(h);
205  
    std::coroutine_handle<> saved_h(h);
207  
    impl_ptr.reset();
206  
    impl_ptr.reset();
208  
    dispatch_coro(saved_ex, saved_h).resume();
207  
    dispatch_coro(saved_ex, saved_h).resume();
209  
}
208  
}
210  

209  

211  
inline select_socket::select_socket(select_socket_service& svc) noexcept
210  
inline select_socket::select_socket(select_socket_service& svc) noexcept
212  
    : svc_(svc)
211  
    : svc_(svc)
213  
{
212  
{
214  
}
213  
}
215  

214  

216  
inline std::coroutine_handle<>
215  
inline std::coroutine_handle<>
217  
select_socket::connect(
216  
select_socket::connect(
218  
    std::coroutine_handle<> h,
217  
    std::coroutine_handle<> h,
219  
    capy::executor_ref ex,
218  
    capy::executor_ref ex,
220  
    endpoint ep,
219  
    endpoint ep,
221  
    std::stop_token token,
220  
    std::stop_token token,
222  
    std::error_code* ec)
221  
    std::error_code* ec)
223  
{
222  
{
224  
    auto& op = conn_;
223  
    auto& op = conn_;
225  
    op.reset();
224  
    op.reset();
226  
    op.h               = h;
225  
    op.h               = h;
227  
    op.ex              = ex;
226  
    op.ex              = ex;
228  
    op.ec_out          = ec;
227  
    op.ec_out          = ec;
229  
    op.fd              = fd_;
228  
    op.fd              = fd_;
230  
    op.target_endpoint = ep; // Store target for endpoint caching
229  
    op.target_endpoint = ep; // Store target for endpoint caching
231  
    op.start(token, this);
230  
    op.start(token, this);
232  

231  

233 -
    sockaddr_storage storage{};
232 +
    sockaddr_in addr = detail::to_sockaddr_in(ep);
234 -
    socklen_t addrlen =
 
235 -
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
 
236  
    int result =
233  
    int result =
237 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
234 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
238  

235  

239  
    if (result == 0)
236  
    if (result == 0)
240  
    {
237  
    {
241 -
        // Sync success — cache endpoints immediately
238 +
        // Sync success - cache endpoints immediately
242 -
        sockaddr_storage local_storage{};
239 +
        sockaddr_in local_addr{};
243 -
        socklen_t local_len = sizeof(local_storage);
240 +
        socklen_t local_len = sizeof(local_addr);
244  
        if (::getsockname(
241  
        if (::getsockname(
245 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
242 +
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
246 -
                &local_len) == 0)
243 +
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
247 -
            local_endpoint_ = detail::from_sockaddr(local_storage);
 
248  
        remote_endpoint_ = ep;
244  
        remote_endpoint_ = ep;
249  

245  

250  
        op.complete(0, 0);
246  
        op.complete(0, 0);
251  
        op.impl_ptr = shared_from_this();
247  
        op.impl_ptr = shared_from_this();
252  
        svc_.post(&op);
248  
        svc_.post(&op);
253  
        // completion is always posted to scheduler queue, never inline.
249  
        // completion is always posted to scheduler queue, never inline.
254  
        return std::noop_coroutine();
250  
        return std::noop_coroutine();
255  
    }
251  
    }
256  

252  

257  
    if (errno == EINPROGRESS)
253  
    if (errno == EINPROGRESS)
258  
    {
254  
    {
259  
        svc_.work_started();
255  
        svc_.work_started();
260  
        op.impl_ptr = shared_from_this();
256  
        op.impl_ptr = shared_from_this();
261  

257  

262  
        // Set registering BEFORE register_fd to close the race window where
258  
        // Set registering BEFORE register_fd to close the race window where
263  
        // reactor sees an event before we set registered. The reactor treats
259  
        // reactor sees an event before we set registered. The reactor treats
264  
        // registering the same as registered when claiming the op.
260  
        // registering the same as registered when claiming the op.
265  
        op.registered.store(
261  
        op.registered.store(
266  
            select_registration_state::registering, std::memory_order_release);
262  
            select_registration_state::registering, std::memory_order_release);
267  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
263  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268  

264  

269  
        // Transition to registered. If this fails, reactor or cancel already
265  
        // Transition to registered. If this fails, reactor or cancel already
270  
        // claimed the op (state is now unregistered), so we're done. However,
266  
        // claimed the op (state is now unregistered), so we're done. However,
271  
        // we must still deregister the fd because cancel's deregister_fd may
267  
        // we must still deregister the fd because cancel's deregister_fd may
272  
        // have run before our register_fd, leaving the fd orphaned.
268  
        // have run before our register_fd, leaving the fd orphaned.
273  
        auto expected = select_registration_state::registering;
269  
        auto expected = select_registration_state::registering;
274  
        if (!op.registered.compare_exchange_strong(
270  
        if (!op.registered.compare_exchange_strong(
275  
                expected, select_registration_state::registered,
271  
                expected, select_registration_state::registered,
276  
                std::memory_order_acq_rel))
272  
                std::memory_order_acq_rel))
277  
        {
273  
        {
278  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
274  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279  
            // completion is always posted to scheduler queue, never inline.
275  
            // completion is always posted to scheduler queue, never inline.
280  
            return std::noop_coroutine();
276  
            return std::noop_coroutine();
281  
        }
277  
        }
282  

278  

283  
        // If cancelled was set before we registered, handle it now.
279  
        // If cancelled was set before we registered, handle it now.
284  
        if (op.cancelled.load(std::memory_order_acquire))
280  
        if (op.cancelled.load(std::memory_order_acquire))
285  
        {
281  
        {
286  
            auto prev = op.registered.exchange(
282  
            auto prev = op.registered.exchange(
287  
                select_registration_state::unregistered,
283  
                select_registration_state::unregistered,
288  
                std::memory_order_acq_rel);
284  
                std::memory_order_acq_rel);
289  
            if (prev != select_registration_state::unregistered)
285  
            if (prev != select_registration_state::unregistered)
290  
            {
286  
            {
291  
                svc_.scheduler().deregister_fd(
287  
                svc_.scheduler().deregister_fd(
292  
                    fd_, select_scheduler::event_write);
288  
                    fd_, select_scheduler::event_write);
293  
                op.impl_ptr = shared_from_this();
289  
                op.impl_ptr = shared_from_this();
294  
                svc_.post(&op);
290  
                svc_.post(&op);
295  
                svc_.work_finished();
291  
                svc_.work_finished();
296  
            }
292  
            }
297  
        }
293  
        }
298  
        // completion is always posted to scheduler queue, never inline.
294  
        // completion is always posted to scheduler queue, never inline.
299  
        return std::noop_coroutine();
295  
        return std::noop_coroutine();
300  
    }
296  
    }
301  

297  

302  
    op.complete(errno, 0);
298  
    op.complete(errno, 0);
303  
    op.impl_ptr = shared_from_this();
299  
    op.impl_ptr = shared_from_this();
304  
    svc_.post(&op);
300  
    svc_.post(&op);
305  
    // completion is always posted to scheduler queue, never inline.
301  
    // completion is always posted to scheduler queue, never inline.
306  
    return std::noop_coroutine();
302  
    return std::noop_coroutine();
307  
}
303  
}
308  

304  

309  
inline std::coroutine_handle<>
305  
inline std::coroutine_handle<>
310  
select_socket::read_some(
306  
select_socket::read_some(
311  
    std::coroutine_handle<> h,
307  
    std::coroutine_handle<> h,
312  
    capy::executor_ref ex,
308  
    capy::executor_ref ex,
313  
    io_buffer_param param,
309  
    io_buffer_param param,
314  
    std::stop_token token,
310  
    std::stop_token token,
315  
    std::error_code* ec,
311  
    std::error_code* ec,
316  
    std::size_t* bytes_out)
312  
    std::size_t* bytes_out)
317  
{
313  
{
318  
    auto& op = rd_;
314  
    auto& op = rd_;
319  
    op.reset();
315  
    op.reset();
320  
    op.h         = h;
316  
    op.h         = h;
321  
    op.ex        = ex;
317  
    op.ex        = ex;
322  
    op.ec_out    = ec;
318  
    op.ec_out    = ec;
323  
    op.bytes_out = bytes_out;
319  
    op.bytes_out = bytes_out;
324  
    op.fd        = fd_;
320  
    op.fd        = fd_;
325  
    op.start(token, this);
321  
    op.start(token, this);
326  

322  

327  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
323  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
328  
    op.iovec_count =
324  
    op.iovec_count =
329  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
325  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330  

326  

331  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
327  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332  
    {
328  
    {
333  
        op.empty_buffer_read = true;
329  
        op.empty_buffer_read = true;
334  
        op.complete(0, 0);
330  
        op.complete(0, 0);
335  
        op.impl_ptr = shared_from_this();
331  
        op.impl_ptr = shared_from_this();
336  
        svc_.post(&op);
332  
        svc_.post(&op);
337  
        return std::noop_coroutine();
333  
        return std::noop_coroutine();
338  
    }
334  
    }
339  

335  

340  
    for (int i = 0; i < op.iovec_count; ++i)
336  
    for (int i = 0; i < op.iovec_count; ++i)
341  
    {
337  
    {
342  
        op.iovecs[i].iov_base = bufs[i].data();
338  
        op.iovecs[i].iov_base = bufs[i].data();
343  
        op.iovecs[i].iov_len  = bufs[i].size();
339  
        op.iovecs[i].iov_len  = bufs[i].size();
344  
    }
340  
    }
345  

341  

346  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
342  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347  

343  

348  
    if (n > 0)
344  
    if (n > 0)
349  
    {
345  
    {
350  
        op.complete(0, static_cast<std::size_t>(n));
346  
        op.complete(0, static_cast<std::size_t>(n));
351  
        op.impl_ptr = shared_from_this();
347  
        op.impl_ptr = shared_from_this();
352  
        svc_.post(&op);
348  
        svc_.post(&op);
353  
        return std::noop_coroutine();
349  
        return std::noop_coroutine();
354  
    }
350  
    }
355  

351  

356  
    if (n == 0)
352  
    if (n == 0)
357  
    {
353  
    {
358  
        op.complete(0, 0);
354  
        op.complete(0, 0);
359  
        op.impl_ptr = shared_from_this();
355  
        op.impl_ptr = shared_from_this();
360  
        svc_.post(&op);
356  
        svc_.post(&op);
361  
        return std::noop_coroutine();
357  
        return std::noop_coroutine();
362  
    }
358  
    }
363  

359  

364  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
360  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
365  
    {
361  
    {
366  
        svc_.work_started();
362  
        svc_.work_started();
367  
        op.impl_ptr = shared_from_this();
363  
        op.impl_ptr = shared_from_this();
368  

364  

369  
        // Set registering BEFORE register_fd to close the race window where
365  
        // Set registering BEFORE register_fd to close the race window where
370  
        // reactor sees an event before we set registered.
366  
        // reactor sees an event before we set registered.
371  
        op.registered.store(
367  
        op.registered.store(
372  
            select_registration_state::registering, std::memory_order_release);
368  
            select_registration_state::registering, std::memory_order_release);
373  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
369  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374  

370  

375  
        // Transition to registered. If this fails, reactor or cancel already
371  
        // Transition to registered. If this fails, reactor or cancel already
376  
        // claimed the op (state is now unregistered), so we're done. However,
372  
        // claimed the op (state is now unregistered), so we're done. However,
377  
        // we must still deregister the fd because cancel's deregister_fd may
373  
        // we must still deregister the fd because cancel's deregister_fd may
378  
        // have run before our register_fd, leaving the fd orphaned.
374  
        // have run before our register_fd, leaving the fd orphaned.
379  
        auto expected = select_registration_state::registering;
375  
        auto expected = select_registration_state::registering;
380  
        if (!op.registered.compare_exchange_strong(
376  
        if (!op.registered.compare_exchange_strong(
381  
                expected, select_registration_state::registered,
377  
                expected, select_registration_state::registered,
382  
                std::memory_order_acq_rel))
378  
                std::memory_order_acq_rel))
383  
        {
379  
        {
384  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
380  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385  
            return std::noop_coroutine();
381  
            return std::noop_coroutine();
386  
        }
382  
        }
387  

383  

388  
        // If cancelled was set before we registered, handle it now.
384  
        // If cancelled was set before we registered, handle it now.
389  
        if (op.cancelled.load(std::memory_order_acquire))
385  
        if (op.cancelled.load(std::memory_order_acquire))
390  
        {
386  
        {
391  
            auto prev = op.registered.exchange(
387  
            auto prev = op.registered.exchange(
392  
                select_registration_state::unregistered,
388  
                select_registration_state::unregistered,
393  
                std::memory_order_acq_rel);
389  
                std::memory_order_acq_rel);
394  
            if (prev != select_registration_state::unregistered)
390  
            if (prev != select_registration_state::unregistered)
395  
            {
391  
            {
396  
                svc_.scheduler().deregister_fd(
392  
                svc_.scheduler().deregister_fd(
397  
                    fd_, select_scheduler::event_read);
393  
                    fd_, select_scheduler::event_read);
398  
                op.impl_ptr = shared_from_this();
394  
                op.impl_ptr = shared_from_this();
399  
                svc_.post(&op);
395  
                svc_.post(&op);
400  
                svc_.work_finished();
396  
                svc_.work_finished();
401  
            }
397  
            }
402  
        }
398  
        }
403  
        return std::noop_coroutine();
399  
        return std::noop_coroutine();
404  
    }
400  
    }
405  

401  

406  
    op.complete(errno, 0);
402  
    op.complete(errno, 0);
407  
    op.impl_ptr = shared_from_this();
403  
    op.impl_ptr = shared_from_this();
408  
    svc_.post(&op);
404  
    svc_.post(&op);
409  
    return std::noop_coroutine();
405  
    return std::noop_coroutine();
410  
}
406  
}
411  

407  

412  
inline std::coroutine_handle<>
408  
inline std::coroutine_handle<>
413  
select_socket::write_some(
409  
select_socket::write_some(
414  
    std::coroutine_handle<> h,
410  
    std::coroutine_handle<> h,
415  
    capy::executor_ref ex,
411  
    capy::executor_ref ex,
416  
    io_buffer_param param,
412  
    io_buffer_param param,
417  
    std::stop_token token,
413  
    std::stop_token token,
418  
    std::error_code* ec,
414  
    std::error_code* ec,
419  
    std::size_t* bytes_out)
415  
    std::size_t* bytes_out)
420  
{
416  
{
421  
    auto& op = wr_;
417  
    auto& op = wr_;
422  
    op.reset();
418  
    op.reset();
423  
    op.h         = h;
419  
    op.h         = h;
424  
    op.ex        = ex;
420  
    op.ex        = ex;
425  
    op.ec_out    = ec;
421  
    op.ec_out    = ec;
426  
    op.bytes_out = bytes_out;
422  
    op.bytes_out = bytes_out;
427  
    op.fd        = fd_;
423  
    op.fd        = fd_;
428  
    op.start(token, this);
424  
    op.start(token, this);
429  

425  

430  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
426  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
431  
    op.iovec_count =
427  
    op.iovec_count =
432  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
428  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433  

429  

434  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
430  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435  
    {
431  
    {
436  
        op.complete(0, 0);
432  
        op.complete(0, 0);
437  
        op.impl_ptr = shared_from_this();
433  
        op.impl_ptr = shared_from_this();
438  
        svc_.post(&op);
434  
        svc_.post(&op);
439  
        return std::noop_coroutine();
435  
        return std::noop_coroutine();
440  
    }
436  
    }
441  

437  

442  
    for (int i = 0; i < op.iovec_count; ++i)
438  
    for (int i = 0; i < op.iovec_count; ++i)
443  
    {
439  
    {
444  
        op.iovecs[i].iov_base = bufs[i].data();
440  
        op.iovecs[i].iov_base = bufs[i].data();
445  
        op.iovecs[i].iov_len  = bufs[i].size();
441  
        op.iovecs[i].iov_len  = bufs[i].size();
446  
    }
442  
    }
447  

443  

448  
    msghdr msg{};
444  
    msghdr msg{};
449  
    msg.msg_iov    = op.iovecs;
445  
    msg.msg_iov    = op.iovecs;
450  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
446  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451  

447  

452  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
448  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453  

449  

454  
    if (n > 0)
450  
    if (n > 0)
455  
    {
451  
    {
456  
        op.complete(0, static_cast<std::size_t>(n));
452  
        op.complete(0, static_cast<std::size_t>(n));
457  
        op.impl_ptr = shared_from_this();
453  
        op.impl_ptr = shared_from_this();
458  
        svc_.post(&op);
454  
        svc_.post(&op);
459  
        return std::noop_coroutine();
455  
        return std::noop_coroutine();
460  
    }
456  
    }
461  

457  

462  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
458  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
463  
    {
459  
    {
464  
        svc_.work_started();
460  
        svc_.work_started();
465  
        op.impl_ptr = shared_from_this();
461  
        op.impl_ptr = shared_from_this();
466  

462  

467  
        // Set registering BEFORE register_fd to close the race window where
463  
        // Set registering BEFORE register_fd to close the race window where
468  
        // reactor sees an event before we set registered.
464  
        // reactor sees an event before we set registered.
469  
        op.registered.store(
465  
        op.registered.store(
470  
            select_registration_state::registering, std::memory_order_release);
466  
            select_registration_state::registering, std::memory_order_release);
471  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
467  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472  

468  

473  
        // Transition to registered. If this fails, reactor or cancel already
469  
        // Transition to registered. If this fails, reactor or cancel already
474  
        // claimed the op (state is now unregistered), so we're done. However,
470  
        // claimed the op (state is now unregistered), so we're done. However,
475  
        // we must still deregister the fd because cancel's deregister_fd may
471  
        // we must still deregister the fd because cancel's deregister_fd may
476  
        // have run before our register_fd, leaving the fd orphaned.
472  
        // have run before our register_fd, leaving the fd orphaned.
477  
        auto expected = select_registration_state::registering;
473  
        auto expected = select_registration_state::registering;
478  
        if (!op.registered.compare_exchange_strong(
474  
        if (!op.registered.compare_exchange_strong(
479  
                expected, select_registration_state::registered,
475  
                expected, select_registration_state::registered,
480  
                std::memory_order_acq_rel))
476  
                std::memory_order_acq_rel))
481  
        {
477  
        {
482  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
478  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483  
            return std::noop_coroutine();
479  
            return std::noop_coroutine();
484  
        }
480  
        }
485  

481  

486  
        // If cancelled was set before we registered, handle it now.
482  
        // If cancelled was set before we registered, handle it now.
487  
        if (op.cancelled.load(std::memory_order_acquire))
483  
        if (op.cancelled.load(std::memory_order_acquire))
488  
        {
484  
        {
489  
            auto prev = op.registered.exchange(
485  
            auto prev = op.registered.exchange(
490  
                select_registration_state::unregistered,
486  
                select_registration_state::unregistered,
491  
                std::memory_order_acq_rel);
487  
                std::memory_order_acq_rel);
492  
            if (prev != select_registration_state::unregistered)
488  
            if (prev != select_registration_state::unregistered)
493  
            {
489  
            {
494  
                svc_.scheduler().deregister_fd(
490  
                svc_.scheduler().deregister_fd(
495  
                    fd_, select_scheduler::event_write);
491  
                    fd_, select_scheduler::event_write);
496  
                op.impl_ptr = shared_from_this();
492  
                op.impl_ptr = shared_from_this();
497  
                svc_.post(&op);
493  
                svc_.post(&op);
498  
                svc_.work_finished();
494  
                svc_.work_finished();
499  
            }
495  
            }
500  
        }
496  
        }
501  
        return std::noop_coroutine();
497  
        return std::noop_coroutine();
502  
    }
498  
    }
503  

499  

504  
    op.complete(errno ? errno : EIO, 0);
500  
    op.complete(errno ? errno : EIO, 0);
505  
    op.impl_ptr = shared_from_this();
501  
    op.impl_ptr = shared_from_this();
506  
    svc_.post(&op);
502  
    svc_.post(&op);
507  
    return std::noop_coroutine();
503  
    return std::noop_coroutine();
508  
}
504  
}
509  

505  

510  
inline std::error_code
506  
inline std::error_code
511  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
507  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512  
{
508  
{
513  
    int how;
509  
    int how;
514  
    switch (what)
510  
    switch (what)
515  
    {
511  
    {
516  
    case tcp_socket::shutdown_receive:
512  
    case tcp_socket::shutdown_receive:
517  
        how = SHUT_RD;
513  
        how = SHUT_RD;
518  
        break;
514  
        break;
519  
    case tcp_socket::shutdown_send:
515  
    case tcp_socket::shutdown_send:
520  
        how = SHUT_WR;
516  
        how = SHUT_WR;
521  
        break;
517  
        break;
522  
    case tcp_socket::shutdown_both:
518  
    case tcp_socket::shutdown_both:
523  
        how = SHUT_RDWR;
519  
        how = SHUT_RDWR;
524  
        break;
520  
        break;
525  
    default:
521  
    default:
526  
        return make_err(EINVAL);
522  
        return make_err(EINVAL);
527  
    }
523  
    }
528  
    if (::shutdown(fd_, how) != 0)
524  
    if (::shutdown(fd_, how) != 0)
529  
        return make_err(errno);
525  
        return make_err(errno);
530  
    return {};
526  
    return {};
531  
}
527  
}
532  

528  

533  
inline std::error_code
529  
inline std::error_code
534 -
select_socket::set_option(
530 +
select_socket::set_no_delay(bool value) noexcept
535 -
    int level, int optname,
 
536 -
    void const* data, std::size_t size) noexcept
 
537  
{
531  
{
538 -
    if (::setsockopt(fd_, level, optname, data,
532 +
    int flag = value ? 1 : 0;
539 -
            static_cast<socklen_t>(size)) != 0)
533 +
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
540  
        return make_err(errno);
534  
        return make_err(errno);
541  
    return {};
535  
    return {};
542  
}
536  
}
543  

537  

 
538 +
inline bool
 
539 +
select_socket::no_delay(std::error_code& ec) const noexcept
 
540 +
{
 
541 +
    int flag      = 0;
 
542 +
    socklen_t len = sizeof(flag);
 
543 +
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
544 +
    {
 
545 +
        ec = make_err(errno);
 
546 +
        return false;
 
547 +
    }
 
548 +
    ec = {};
 
549 +
    return flag != 0;
 
550 +
}
 
551 +

544  
inline std::error_code
552  
inline std::error_code
545 -
select_socket::get_option(
553 +
select_socket::set_keep_alive(bool value) noexcept
546 -
    int level, int optname,
 
547 -
    void* data, std::size_t* size) const noexcept
 
548  
{
554  
{
549 -
    socklen_t len = static_cast<socklen_t>(*size);
555 +
    int flag = value ? 1 : 0;
550 -
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
556 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
557 +
        return make_err(errno);
 
558 +
    return {};
 
559 +
}
 
560 +

 
561 +
inline bool
 
562 +
select_socket::keep_alive(std::error_code& ec) const noexcept
 
563 +
{
 
564 +
    int flag      = 0;
 
565 +
    socklen_t len = sizeof(flag);
 
566 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
567 +
    {
 
568 +
        ec = make_err(errno);
 
569 +
        return false;
 
570 +
    }
 
571 +
    ec = {};
 
572 +
    return flag != 0;
 
573 +
}
 
574 +

 
575 +
inline std::error_code
 
576 +
select_socket::set_receive_buffer_size(int size) noexcept
 
577 +
{
 
578 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
579 +
        return make_err(errno);
 
580 +
    return {};
 
581 +
}
 
582 +

 
583 +
inline int
 
584 +
select_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
585 +
{
 
586 +
    int size      = 0;
 
587 +
    socklen_t len = sizeof(size);
 
588 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
589 +
    {
 
590 +
        ec = make_err(errno);
 
591 +
        return 0;
 
592 +
    }
 
593 +
    ec = {};
 
594 +
    return size;
 
595 +
}
 
596 +

 
597 +
inline std::error_code
 
598 +
select_socket::set_send_buffer_size(int size) noexcept
 
599 +
{
 
600 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
 
601 +
        return make_err(errno);
 
602 +
    return {};
 
603 +
}
 
604 +

 
605 +
inline int
 
606 +
select_socket::send_buffer_size(std::error_code& ec) const noexcept
 
607 +
{
 
608 +
    int size      = 0;
 
609 +
    socklen_t len = sizeof(size);
 
610 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
611 +
    {
 
612 +
        ec = make_err(errno);
 
613 +
        return 0;
 
614 +
    }
 
615 +
    ec = {};
 
616 +
    return size;
 
617 +
}
 
618 +

 
619 +
inline std::error_code
 
620 +
select_socket::set_linger(bool enabled, int timeout) noexcept
 
621 +
{
 
622 +
    if (timeout < 0)
 
623 +
        return make_err(EINVAL);
 
624 +
    struct ::linger lg;
 
625 +
    lg.l_onoff  = enabled ? 1 : 0;
 
626 +
    lg.l_linger = timeout;
 
627 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
551 -
    *size = static_cast<std::size_t>(len);
 
552  
        return make_err(errno);
628  
        return make_err(errno);
553  
    return {};
629  
    return {};
554  
}
630  
}
555  

631  

 
632 +
inline tcp_socket::linger_options
 
633 +
select_socket::linger(std::error_code& ec) const noexcept
 
634 +
{
 
635 +
    struct ::linger lg{};
 
636 +
    socklen_t len = sizeof(lg);
 
637 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
638 +
    {
 
639 +
        ec = make_err(errno);
 
640 +
        return {};
 
641 +
    }
 
642 +
    ec = {};
 
643 +
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
644 +
}
 
645 +

556  
inline void
646  
inline void
557  
select_socket::cancel() noexcept
647  
select_socket::cancel() noexcept
558  
{
648  
{
559  
    auto self = weak_from_this().lock();
649  
    auto self = weak_from_this().lock();
560  
    if (!self)
650  
    if (!self)
561  
        return;
651  
        return;
562  

652  

563  
    auto cancel_op = [this, &self](select_op& op, int events) {
653  
    auto cancel_op = [this, &self](select_op& op, int events) {
564  
        auto prev = op.registered.exchange(
654  
        auto prev = op.registered.exchange(
565  
            select_registration_state::unregistered, std::memory_order_acq_rel);
655  
            select_registration_state::unregistered, std::memory_order_acq_rel);
566  
        op.request_cancel();
656  
        op.request_cancel();
567  
        if (prev != select_registration_state::unregistered)
657  
        if (prev != select_registration_state::unregistered)
568  
        {
658  
        {
569  
            svc_.scheduler().deregister_fd(fd_, events);
659  
            svc_.scheduler().deregister_fd(fd_, events);
570  
            op.impl_ptr = self;
660  
            op.impl_ptr = self;
571  
            svc_.post(&op);
661  
            svc_.post(&op);
572  
            svc_.work_finished();
662  
            svc_.work_finished();
573  
        }
663  
        }
574  
    };
664  
    };
575  

665  

576  
    cancel_op(conn_, select_scheduler::event_write);
666  
    cancel_op(conn_, select_scheduler::event_write);
577  
    cancel_op(rd_, select_scheduler::event_read);
667  
    cancel_op(rd_, select_scheduler::event_read);
578  
    cancel_op(wr_, select_scheduler::event_write);
668  
    cancel_op(wr_, select_scheduler::event_write);
579  
}
669  
}
580  

670  

581  
inline void
671  
inline void
582  
select_socket::cancel_single_op(select_op& op) noexcept
672  
select_socket::cancel_single_op(select_op& op) noexcept
583  
{
673  
{
584  
    auto self = weak_from_this().lock();
674  
    auto self = weak_from_this().lock();
585  
    if (!self)
675  
    if (!self)
586  
        return;
676  
        return;
587  

677  

588  
    // Called from stop_token callback to cancel a specific pending operation.
678  
    // Called from stop_token callback to cancel a specific pending operation.
589  
    auto prev = op.registered.exchange(
679  
    auto prev = op.registered.exchange(
590  
        select_registration_state::unregistered, std::memory_order_acq_rel);
680  
        select_registration_state::unregistered, std::memory_order_acq_rel);
591  
    op.request_cancel();
681  
    op.request_cancel();
592  

682  

593  
    if (prev != select_registration_state::unregistered)
683  
    if (prev != select_registration_state::unregistered)
594  
    {
684  
    {
595  
        // Determine which event type to deregister
685  
        // Determine which event type to deregister
596  
        int events = 0;
686  
        int events = 0;
597  
        if (&op == &conn_ || &op == &wr_)
687  
        if (&op == &conn_ || &op == &wr_)
598  
            events = select_scheduler::event_write;
688  
            events = select_scheduler::event_write;
599  
        else if (&op == &rd_)
689  
        else if (&op == &rd_)
600  
            events = select_scheduler::event_read;
690  
            events = select_scheduler::event_read;
601  

691  

602  
        svc_.scheduler().deregister_fd(fd_, events);
692  
        svc_.scheduler().deregister_fd(fd_, events);
603  

693  

604  
        op.impl_ptr = self;
694  
        op.impl_ptr = self;
605  
        svc_.post(&op);
695  
        svc_.post(&op);
606  
        svc_.work_finished();
696  
        svc_.work_finished();
607  
    }
697  
    }
608  
}
698  
}
609  

699  

610  
inline void
700  
inline void
611  
select_socket::close_socket() noexcept
701  
select_socket::close_socket() noexcept
612  
{
702  
{
613  
    auto self = weak_from_this().lock();
703  
    auto self = weak_from_this().lock();
614  
    if (self)
704  
    if (self)
615  
    {
705  
    {
616  
        auto cancel_op = [this, &self](select_op& op, int events) {
706  
        auto cancel_op = [this, &self](select_op& op, int events) {
617  
            auto prev = op.registered.exchange(
707  
            auto prev = op.registered.exchange(
618  
                select_registration_state::unregistered,
708  
                select_registration_state::unregistered,
619  
                std::memory_order_acq_rel);
709  
                std::memory_order_acq_rel);
620  
            op.request_cancel();
710  
            op.request_cancel();
621  
            if (prev != select_registration_state::unregistered)
711  
            if (prev != select_registration_state::unregistered)
622  
            {
712  
            {
623  
                svc_.scheduler().deregister_fd(fd_, events);
713  
                svc_.scheduler().deregister_fd(fd_, events);
624  
                op.impl_ptr = self;
714  
                op.impl_ptr = self;
625  
                svc_.post(&op);
715  
                svc_.post(&op);
626  
                svc_.work_finished();
716  
                svc_.work_finished();
627  
            }
717  
            }
628  
        };
718  
        };
629  

719  

630  
        cancel_op(conn_, select_scheduler::event_write);
720  
        cancel_op(conn_, select_scheduler::event_write);
631  
        cancel_op(rd_, select_scheduler::event_read);
721  
        cancel_op(rd_, select_scheduler::event_read);
632  
        cancel_op(wr_, select_scheduler::event_write);
722  
        cancel_op(wr_, select_scheduler::event_write);
633  
    }
723  
    }
634  

724  

635  
    if (fd_ >= 0)
725  
    if (fd_ >= 0)
636  
    {
726  
    {
637  
        svc_.scheduler().deregister_fd(
727  
        svc_.scheduler().deregister_fd(
638  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
728  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
639  
        ::close(fd_);
729  
        ::close(fd_);
640  
        fd_ = -1;
730  
        fd_ = -1;
641  
    }
731  
    }
642  

732  

643  
    local_endpoint_  = endpoint{};
733  
    local_endpoint_  = endpoint{};
644  
    remote_endpoint_ = endpoint{};
734  
    remote_endpoint_ = endpoint{};
645  
}
735  
}
646  

736  

647  
inline select_socket_service::select_socket_service(
737  
inline select_socket_service::select_socket_service(
648  
    capy::execution_context& ctx)
738  
    capy::execution_context& ctx)
649  
    : state_(
739  
    : state_(
650  
          std::make_unique<select_socket_state>(
740  
          std::make_unique<select_socket_state>(
651  
              ctx.use_service<select_scheduler>()))
741  
              ctx.use_service<select_scheduler>()))
652  
{
742  
{
653  
}
743  
}
654  

744  

655  
inline select_socket_service::~select_socket_service() {}
745  
inline select_socket_service::~select_socket_service() {}
656  

746  

657  
inline void
747  
inline void
658  
select_socket_service::shutdown()
748  
select_socket_service::shutdown()
659  
{
749  
{
660  
    std::lock_guard lock(state_->mutex_);
750  
    std::lock_guard lock(state_->mutex_);
661  

751  

662  
    while (auto* impl = state_->socket_list_.pop_front())
752  
    while (auto* impl = state_->socket_list_.pop_front())
663  
        impl->close_socket();
753  
        impl->close_socket();
664  

754  

665  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
755  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666  
    // drains completed_ops_, calling destroy() on each queued op. Letting
756  
    // drains completed_ops_, calling destroy() on each queued op. Letting
667  
    // ~state_ release the ptrs (during service destruction, after scheduler
757  
    // ~state_ release the ptrs (during service destruction, after scheduler
668  
    // shutdown) keeps every impl alive until all ops have been drained.
758  
    // shutdown) keeps every impl alive until all ops have been drained.
669  
}
759  
}
670  

760  

671  
inline io_object::implementation*
761  
inline io_object::implementation*
672  
select_socket_service::construct()
762  
select_socket_service::construct()
673  
{
763  
{
674  
    auto impl = std::make_shared<select_socket>(*this);
764  
    auto impl = std::make_shared<select_socket>(*this);
675  
    auto* raw = impl.get();
765  
    auto* raw = impl.get();
676  

766  

677  
    {
767  
    {
678  
        std::lock_guard lock(state_->mutex_);
768  
        std::lock_guard lock(state_->mutex_);
679  
        state_->socket_list_.push_back(raw);
769  
        state_->socket_list_.push_back(raw);
680  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
770  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
681  
    }
771  
    }
682  

772  

683  
    return raw;
773  
    return raw;
684  
}
774  
}
685  

775  

686  
inline void
776  
inline void
687  
select_socket_service::destroy(io_object::implementation* impl)
777  
select_socket_service::destroy(io_object::implementation* impl)
688  
{
778  
{
689  
    auto* select_impl = static_cast<select_socket*>(impl);
779  
    auto* select_impl = static_cast<select_socket*>(impl);
690  
    select_impl->close_socket();
780  
    select_impl->close_socket();
691  
    std::lock_guard lock(state_->mutex_);
781  
    std::lock_guard lock(state_->mutex_);
692  
    state_->socket_list_.remove(select_impl);
782  
    state_->socket_list_.remove(select_impl);
693  
    state_->socket_ptrs_.erase(select_impl);
783  
    state_->socket_ptrs_.erase(select_impl);
694  
}
784  
}
695  

785  

696  
inline std::error_code
786  
inline std::error_code
697 -
select_socket_service::open_socket(
787 +
select_socket_service::open_socket(tcp_socket::implementation& impl)
698 -
    tcp_socket::implementation& impl,
 
699 -
    int family, int type, int protocol)
 
700  
{
788  
{
701  
    auto* select_impl = static_cast<select_socket*>(&impl);
789  
    auto* select_impl = static_cast<select_socket*>(&impl);
702  
    select_impl->close_socket();
790  
    select_impl->close_socket();
703  

791  

704 -
    int fd = ::socket(family, type, protocol);
792 +
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
705  
    if (fd < 0)
793  
    if (fd < 0)
706 -

 
707 -
    if (family == AF_INET6)
 
708 -
    {
 
709 -
        int one = 1;
 
710 -
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
711 -
    }
 
712  
        return make_err(errno);
794  
        return make_err(errno);
713  

795  

714  
    // Set non-blocking and close-on-exec
796  
    // Set non-blocking and close-on-exec
715  
    int flags = ::fcntl(fd, F_GETFL, 0);
797  
    int flags = ::fcntl(fd, F_GETFL, 0);
716  
    if (flags == -1)
798  
    if (flags == -1)
717  
    {
799  
    {
718  
        int errn = errno;
800  
        int errn = errno;
719  
        ::close(fd);
801  
        ::close(fd);
720  
        return make_err(errn);
802  
        return make_err(errn);
721  
    }
803  
    }
722  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
804  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723  
    {
805  
    {
724  
        int errn = errno;
806  
        int errn = errno;
725  
        ::close(fd);
807  
        ::close(fd);
726  
        return make_err(errn);
808  
        return make_err(errn);
727  
    }
809  
    }
728  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
810  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729  
    {
811  
    {
730  
        int errn = errno;
812  
        int errn = errno;
731  
        ::close(fd);
813  
        ::close(fd);
732  
        return make_err(errn);
814  
        return make_err(errn);
733  
    }
815  
    }
734  

816  

735  
    // Check fd is within select() limits
817  
    // Check fd is within select() limits
736  
    if (fd >= FD_SETSIZE)
818  
    if (fd >= FD_SETSIZE)
737  
    {
819  
    {
738  
        ::close(fd);
820  
        ::close(fd);
739  
        return make_err(EMFILE); // Too many open files
821  
        return make_err(EMFILE); // Too many open files
740  
    }
822  
    }
741  

823  

742  
    select_impl->fd_ = fd;
824  
    select_impl->fd_ = fd;
743  
    return {};
825  
    return {};
744  
}
826  
}
745  

827  

746  
inline void
828  
inline void
747  
select_socket_service::close(io_object::handle& h)
829  
select_socket_service::close(io_object::handle& h)
748  
{
830  
{
749  
    static_cast<select_socket*>(h.get())->close_socket();
831  
    static_cast<select_socket*>(h.get())->close_socket();
750  
}
832  
}
751  

833  

752  
inline void
834  
inline void
753  
select_socket_service::post(select_op* op)
835  
select_socket_service::post(select_op* op)
754  
{
836  
{
755  
    state_->sched_.post(op);
837  
    state_->sched_.post(op);
756  
}
838  
}
757  

839  

758  
inline void
840  
inline void
759  
select_socket_service::work_started() noexcept
841  
select_socket_service::work_started() noexcept
760  
{
842  
{
761  
    state_->sched_.work_started();
843  
    state_->sched_.work_started();
762  
}
844  
}
763  

845  

764  
inline void
846  
inline void
765  
select_socket_service::work_finished() noexcept
847  
select_socket_service::work_finished() noexcept
766  
{
848  
{
767  
    state_->sched_.work_finished();
849  
    state_->sched_.work_finished();
768  
}
850  
}
769  

851  

770  
} // namespace boost::corosio::detail
852  
} // namespace boost::corosio::detail
771  

853  

772  
#endif // BOOST_COROSIO_HAS_SELECT
854  
#endif // BOOST_COROSIO_HAS_SELECT
773  

855  

774  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
856  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP