1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127 -
    std::error_code
127 +
    std::error_code open_socket(tcp_socket::implementation& impl) override;
128 -
    open_socket(tcp_socket::implementation& impl,
 
129 -
                int family, int type, int protocol) override;
 
130  

128  

131  
    epoll_scheduler& scheduler() const noexcept
129  
    epoll_scheduler& scheduler() const noexcept
132  
    {
130  
    {
133  
        return state_->sched_;
131  
        return state_->sched_;
134  
    }
132  
    }
135  
    void post(epoll_op* op);
133  
    void post(epoll_op* op);
136  
    void work_started() noexcept;
134  
    void work_started() noexcept;
137  
    void work_finished() noexcept;
135  
    void work_finished() noexcept;
138  

136  

139  
private:
137  
private:
140  
    std::unique_ptr<epoll_socket_state> state_;
138  
    std::unique_ptr<epoll_socket_state> state_;
141  
};
139  
};
142  

140  

143  
//--------------------------------------------------------------------------
141  
//--------------------------------------------------------------------------
144  
//
142  
//
145  
// Implementation
143  
// Implementation
146  
//
144  
//
147  
//--------------------------------------------------------------------------
145  
//--------------------------------------------------------------------------
148  

146  

149  
// Register an op with the reactor, handling cached edge events.
147  
// Register an op with the reactor, handling cached edge events.
150  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
148  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151  
inline void
149  
inline void
152  
epoll_socket::register_op(
150  
epoll_socket::register_op(
153  
    epoll_op& op,
151  
    epoll_op& op,
154  
    epoll_op*& desc_slot,
152  
    epoll_op*& desc_slot,
155  
    bool& ready_flag,
153  
    bool& ready_flag,
156  
    bool& cancel_flag) noexcept
154  
    bool& cancel_flag) noexcept
157  
{
155  
{
158  
    svc_.work_started();
156  
    svc_.work_started();
159  

157  

160  
    std::lock_guard lock(desc_state_.mutex);
158  
    std::lock_guard lock(desc_state_.mutex);
161  
    bool io_done = false;
159  
    bool io_done = false;
162  
    if (ready_flag)
160  
    if (ready_flag)
163  
    {
161  
    {
164  
        ready_flag = false;
162  
        ready_flag = false;
165  
        op.perform_io();
163  
        op.perform_io();
166  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
164  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167  
        if (!io_done)
165  
        if (!io_done)
168  
            op.errn = 0;
166  
            op.errn = 0;
169  
    }
167  
    }
170  

168  

171  
    if (cancel_flag)
169  
    if (cancel_flag)
172  
    {
170  
    {
173  
        cancel_flag = false;
171  
        cancel_flag = false;
174  
        op.cancelled.store(true, std::memory_order_relaxed);
172  
        op.cancelled.store(true, std::memory_order_relaxed);
175  
    }
173  
    }
176  

174  

177  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
175  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
178  
    {
176  
    {
179  
        svc_.post(&op);
177  
        svc_.post(&op);
180  
        svc_.work_finished();
178  
        svc_.work_finished();
181  
    }
179  
    }
182  
    else
180  
    else
183  
    {
181  
    {
184  
        desc_slot = &op;
182  
        desc_slot = &op;
185  
    }
183  
    }
186  
}
184  
}
187  

185  

188  
inline void
186  
inline void
189  
epoll_op::canceller::operator()() const noexcept
187  
epoll_op::canceller::operator()() const noexcept
190  
{
188  
{
191  
    op->cancel();
189  
    op->cancel();
192  
}
190  
}
193  

191  

194  
inline void
192  
inline void
195  
epoll_connect_op::cancel() noexcept
193  
epoll_connect_op::cancel() noexcept
196  
{
194  
{
197  
    if (socket_impl_)
195  
    if (socket_impl_)
198  
        socket_impl_->cancel_single_op(*this);
196  
        socket_impl_->cancel_single_op(*this);
199  
    else
197  
    else
200  
        request_cancel();
198  
        request_cancel();
201  
}
199  
}
202  

200  

203  
inline void
201  
inline void
204  
epoll_read_op::cancel() noexcept
202  
epoll_read_op::cancel() noexcept
205  
{
203  
{
206  
    if (socket_impl_)
204  
    if (socket_impl_)
207  
        socket_impl_->cancel_single_op(*this);
205  
        socket_impl_->cancel_single_op(*this);
208  
    else
206  
    else
209  
        request_cancel();
207  
        request_cancel();
210  
}
208  
}
211  

209  

212  
inline void
210  
inline void
213  
epoll_write_op::cancel() noexcept
211  
epoll_write_op::cancel() noexcept
214  
{
212  
{
215  
    if (socket_impl_)
213  
    if (socket_impl_)
216  
        socket_impl_->cancel_single_op(*this);
214  
        socket_impl_->cancel_single_op(*this);
217  
    else
215  
    else
218  
        request_cancel();
216  
        request_cancel();
219  
}
217  
}
220  

218  

221  
inline void
219  
inline void
222  
epoll_op::operator()()
220  
epoll_op::operator()()
223  
{
221  
{
224  
    stop_cb.reset();
222  
    stop_cb.reset();
225  

223  

226  
    socket_impl_->svc_.scheduler().reset_inline_budget();
224  
    socket_impl_->svc_.scheduler().reset_inline_budget();
227  

225  

228  
    if (cancelled.load(std::memory_order_acquire))
226  
    if (cancelled.load(std::memory_order_acquire))
229  
        *ec_out = capy::error::canceled;
227  
        *ec_out = capy::error::canceled;
230  
    else if (errn != 0)
228  
    else if (errn != 0)
231  
        *ec_out = make_err(errn);
229  
        *ec_out = make_err(errn);
232  
    else if (is_read_operation() && bytes_transferred == 0)
230  
    else if (is_read_operation() && bytes_transferred == 0)
233  
        *ec_out = capy::error::eof;
231  
        *ec_out = capy::error::eof;
234  
    else
232  
    else
235  
        *ec_out = {};
233  
        *ec_out = {};
236  

234  

237  
    *bytes_out = bytes_transferred;
235  
    *bytes_out = bytes_transferred;
238  

236  

239  
    // Move to stack before resuming coroutine. The coroutine might close
237  
    // Move to stack before resuming coroutine. The coroutine might close
240  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
238  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
241  
    // last ref and we destroyed it while still in operator(), we'd have
239  
    // last ref and we destroyed it while still in operator(), we'd have
242  
    // use-after-free. Moving to local ensures destruction happens at
240  
    // use-after-free. Moving to local ensures destruction happens at
243  
    // function exit, after all member accesses are complete.
241  
    // function exit, after all member accesses are complete.
244  
    capy::executor_ref saved_ex(ex);
242  
    capy::executor_ref saved_ex(ex);
245  
    std::coroutine_handle<> saved_h(h);
243  
    std::coroutine_handle<> saved_h(h);
246  
    auto prevent_premature_destruction = std::move(impl_ptr);
244  
    auto prevent_premature_destruction = std::move(impl_ptr);
247  
    dispatch_coro(saved_ex, saved_h).resume();
245  
    dispatch_coro(saved_ex, saved_h).resume();
248  
}
246  
}
249  

247  

250  
inline void
248  
inline void
251  
epoll_connect_op::operator()()
249  
epoll_connect_op::operator()()
252  
{
250  
{
253  
    stop_cb.reset();
251  
    stop_cb.reset();
254  

252  

255  
    socket_impl_->svc_.scheduler().reset_inline_budget();
253  
    socket_impl_->svc_.scheduler().reset_inline_budget();
256  

254  

257  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
255  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258  

256  

259  
    // Cache endpoints on successful connect
257  
    // Cache endpoints on successful connect
260  
    if (success && socket_impl_)
258  
    if (success && socket_impl_)
261  
    {
259  
    {
 
260 +
        // Query local endpoint via getsockname (may fail, but remote is always known)
262  
        endpoint local_ep;
261  
        endpoint local_ep;
263 -
        sockaddr_storage local_storage{};
262 +
        sockaddr_in local_addr{};
264 -
        socklen_t local_len = sizeof(local_storage);
263 +
        socklen_t local_len = sizeof(local_addr);
265  
        if (::getsockname(
264  
        if (::getsockname(
266 -
                fd, reinterpret_cast<sockaddr*>(&local_storage),
265 +
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
267 -
                &local_len) == 0)
266 +
            local_ep = from_sockaddr_in(local_addr);
268 -
            local_ep = from_sockaddr(local_storage);
267 +
        // Always cache remote endpoint; local may be default if getsockname failed
269  
        static_cast<epoll_socket*>(socket_impl_)
268  
        static_cast<epoll_socket*>(socket_impl_)
270  
            ->set_endpoints(local_ep, target_endpoint);
269  
            ->set_endpoints(local_ep, target_endpoint);
271  
    }
270  
    }
272  

271  

273  
    if (cancelled.load(std::memory_order_acquire))
272  
    if (cancelled.load(std::memory_order_acquire))
274  
        *ec_out = capy::error::canceled;
273  
        *ec_out = capy::error::canceled;
275  
    else if (errn != 0)
274  
    else if (errn != 0)
276  
        *ec_out = make_err(errn);
275  
        *ec_out = make_err(errn);
277  
    else
276  
    else
278  
        *ec_out = {};
277  
        *ec_out = {};
279  

278  

280  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
279  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
281  
    capy::executor_ref saved_ex(ex);
280  
    capy::executor_ref saved_ex(ex);
282  
    std::coroutine_handle<> saved_h(h);
281  
    std::coroutine_handle<> saved_h(h);
283  
    auto prevent_premature_destruction = std::move(impl_ptr);
282  
    auto prevent_premature_destruction = std::move(impl_ptr);
284  
    dispatch_coro(saved_ex, saved_h).resume();
283  
    dispatch_coro(saved_ex, saved_h).resume();
285  
}
284  
}
286  

285  

287  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
286  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288  
    : svc_(svc)
287  
    : svc_(svc)
289  
{
288  
{
290  
}
289  
}
291  

290  

292  
inline epoll_socket::~epoll_socket() = default;
291  
inline epoll_socket::~epoll_socket() = default;
293  

292  

294  
inline std::coroutine_handle<>
293  
inline std::coroutine_handle<>
295  
epoll_socket::connect(
294  
epoll_socket::connect(
296  
    std::coroutine_handle<> h,
295  
    std::coroutine_handle<> h,
297  
    capy::executor_ref ex,
296  
    capy::executor_ref ex,
298  
    endpoint ep,
297  
    endpoint ep,
299  
    std::stop_token token,
298  
    std::stop_token token,
300  
    std::error_code* ec)
299  
    std::error_code* ec)
301  
{
300  
{
302  
    auto& op = conn_;
301  
    auto& op = conn_;
303  

302  

304 -
    sockaddr_storage storage{};
303 +
    sockaddr_in addr = detail::to_sockaddr_in(ep);
305 -
    socklen_t addrlen =
 
306 -
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
 
307  
    int result =
304  
    int result =
308 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
305 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
309  

306  

310  
    if (result == 0)
307  
    if (result == 0)
311  
    {
308  
    {
312 -
        sockaddr_storage local_storage{};
309 +
        sockaddr_in local_addr{};
313 -
        socklen_t local_len = sizeof(local_storage);
310 +
        socklen_t local_len = sizeof(local_addr);
314  
        if (::getsockname(
311  
        if (::getsockname(
315 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
312 +
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
316 -
                &local_len) == 0)
313 +
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
317 -
            local_endpoint_ = detail::from_sockaddr(local_storage);
 
318  
        remote_endpoint_ = ep;
314  
        remote_endpoint_ = ep;
319  
    }
315  
    }
320  

316  

321  
    if (result == 0 || errno != EINPROGRESS)
317  
    if (result == 0 || errno != EINPROGRESS)
322  
    {
318  
    {
323  
        int err = (result < 0) ? errno : 0;
319  
        int err = (result < 0) ? errno : 0;
324  
        if (svc_.scheduler().try_consume_inline_budget())
320  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        {
321  
        {
326  
            *ec = err ? make_err(err) : std::error_code{};
322  
            *ec = err ? make_err(err) : std::error_code{};
327  
            return dispatch_coro(ex, h);
323  
            return dispatch_coro(ex, h);
328  
        }
324  
        }
329  
        op.reset();
325  
        op.reset();
330  
        op.h               = h;
326  
        op.h               = h;
331  
        op.ex              = ex;
327  
        op.ex              = ex;
332  
        op.ec_out          = ec;
328  
        op.ec_out          = ec;
333  
        op.fd              = fd_;
329  
        op.fd              = fd_;
334  
        op.target_endpoint = ep;
330  
        op.target_endpoint = ep;
335  
        op.start(token, this);
331  
        op.start(token, this);
336  
        op.impl_ptr = shared_from_this();
332  
        op.impl_ptr = shared_from_this();
337  
        op.complete(err, 0);
333  
        op.complete(err, 0);
338  
        svc_.post(&op);
334  
        svc_.post(&op);
339  
        return std::noop_coroutine();
335  
        return std::noop_coroutine();
340  
    }
336  
    }
341  

337  

342  
    // EINPROGRESS — register with reactor
338  
    // EINPROGRESS — register with reactor
343  
    op.reset();
339  
    op.reset();
344  
    op.h               = h;
340  
    op.h               = h;
345  
    op.ex              = ex;
341  
    op.ex              = ex;
346  
    op.ec_out          = ec;
342  
    op.ec_out          = ec;
347  
    op.fd              = fd_;
343  
    op.fd              = fd_;
348  
    op.target_endpoint = ep;
344  
    op.target_endpoint = ep;
349  
    op.start(token, this);
345  
    op.start(token, this);
350  
    op.impl_ptr = shared_from_this();
346  
    op.impl_ptr = shared_from_this();
351  

347  

352  
    register_op(
348  
    register_op(
353  
        op, desc_state_.connect_op, desc_state_.write_ready,
349  
        op, desc_state_.connect_op, desc_state_.write_ready,
354  
        desc_state_.connect_cancel_pending);
350  
        desc_state_.connect_cancel_pending);
355  
    return std::noop_coroutine();
351  
    return std::noop_coroutine();
356  
}
352  
}
357  

353  

358  
inline std::coroutine_handle<>
354  
inline std::coroutine_handle<>
359  
epoll_socket::read_some(
355  
epoll_socket::read_some(
360  
    std::coroutine_handle<> h,
356  
    std::coroutine_handle<> h,
361  
    capy::executor_ref ex,
357  
    capy::executor_ref ex,
362  
    io_buffer_param param,
358  
    io_buffer_param param,
363  
    std::stop_token token,
359  
    std::stop_token token,
364  
    std::error_code* ec,
360  
    std::error_code* ec,
365  
    std::size_t* bytes_out)
361  
    std::size_t* bytes_out)
366  
{
362  
{
367  
    auto& op = rd_;
363  
    auto& op = rd_;
368  
    op.reset();
364  
    op.reset();
369  

365  

370  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
366  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371  
    op.iovec_count =
367  
    op.iovec_count =
372  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
368  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373  

369  

374  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
370  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375  
    {
371  
    {
376  
        op.empty_buffer_read = true;
372  
        op.empty_buffer_read = true;
377  
        op.h                 = h;
373  
        op.h                 = h;
378  
        op.ex                = ex;
374  
        op.ex                = ex;
379  
        op.ec_out            = ec;
375  
        op.ec_out            = ec;
380  
        op.bytes_out         = bytes_out;
376  
        op.bytes_out         = bytes_out;
381  
        op.start(token, this);
377  
        op.start(token, this);
382  
        op.impl_ptr = shared_from_this();
378  
        op.impl_ptr = shared_from_this();
383  
        op.complete(0, 0);
379  
        op.complete(0, 0);
384  
        svc_.post(&op);
380  
        svc_.post(&op);
385  
        return std::noop_coroutine();
381  
        return std::noop_coroutine();
386  
    }
382  
    }
387  

383  

388  
    for (int i = 0; i < op.iovec_count; ++i)
384  
    for (int i = 0; i < op.iovec_count; ++i)
389  
    {
385  
    {
390  
        op.iovecs[i].iov_base = bufs[i].data();
386  
        op.iovecs[i].iov_base = bufs[i].data();
391  
        op.iovecs[i].iov_len  = bufs[i].size();
387  
        op.iovecs[i].iov_len  = bufs[i].size();
392  
    }
388  
    }
393  

389  

394  
    // Speculative read
390  
    // Speculative read
395  
    ssize_t n;
391  
    ssize_t n;
396  
    do
392  
    do
397  
    {
393  
    {
398  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
394  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
399  
    }
395  
    }
400  
    while (n < 0 && errno == EINTR);
396  
    while (n < 0 && errno == EINTR);
401  

397  

402  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
398  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403  
    {
399  
    {
404  
        int err    = (n < 0) ? errno : 0;
400  
        int err    = (n < 0) ? errno : 0;
405  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
401  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406  

402  

407  
        if (svc_.scheduler().try_consume_inline_budget())
403  
        if (svc_.scheduler().try_consume_inline_budget())
408  
        {
404  
        {
409  
            if (err)
405  
            if (err)
410  
                *ec = make_err(err);
406  
                *ec = make_err(err);
411  
            else if (n == 0)
407  
            else if (n == 0)
412  
                *ec = capy::error::eof;
408  
                *ec = capy::error::eof;
413  
            else
409  
            else
414  
                *ec = {};
410  
                *ec = {};
415  
            *bytes_out = bytes;
411  
            *bytes_out = bytes;
416  
            return dispatch_coro(ex, h);
412  
            return dispatch_coro(ex, h);
417  
        }
413  
        }
418  
        op.h         = h;
414  
        op.h         = h;
419  
        op.ex        = ex;
415  
        op.ex        = ex;
420  
        op.ec_out    = ec;
416  
        op.ec_out    = ec;
421  
        op.bytes_out = bytes_out;
417  
        op.bytes_out = bytes_out;
422  
        op.start(token, this);
418  
        op.start(token, this);
423  
        op.impl_ptr = shared_from_this();
419  
        op.impl_ptr = shared_from_this();
424  
        op.complete(err, bytes);
420  
        op.complete(err, bytes);
425  
        svc_.post(&op);
421  
        svc_.post(&op);
426  
        return std::noop_coroutine();
422  
        return std::noop_coroutine();
427  
    }
423  
    }
428  

424  

429  
    // EAGAIN — register with reactor
425  
    // EAGAIN — register with reactor
430  
    op.h         = h;
426  
    op.h         = h;
431  
    op.ex        = ex;
427  
    op.ex        = ex;
432  
    op.ec_out    = ec;
428  
    op.ec_out    = ec;
433  
    op.bytes_out = bytes_out;
429  
    op.bytes_out = bytes_out;
434  
    op.fd        = fd_;
430  
    op.fd        = fd_;
435  
    op.start(token, this);
431  
    op.start(token, this);
436  
    op.impl_ptr = shared_from_this();
432  
    op.impl_ptr = shared_from_this();
437  

433  

438  
    register_op(
434  
    register_op(
439  
        op, desc_state_.read_op, desc_state_.read_ready,
435  
        op, desc_state_.read_op, desc_state_.read_ready,
440  
        desc_state_.read_cancel_pending);
436  
        desc_state_.read_cancel_pending);
441  
    return std::noop_coroutine();
437  
    return std::noop_coroutine();
442  
}
438  
}
443  

439  

444  
inline std::coroutine_handle<>
440  
inline std::coroutine_handle<>
445  
epoll_socket::write_some(
441  
epoll_socket::write_some(
446  
    std::coroutine_handle<> h,
442  
    std::coroutine_handle<> h,
447  
    capy::executor_ref ex,
443  
    capy::executor_ref ex,
448  
    io_buffer_param param,
444  
    io_buffer_param param,
449  
    std::stop_token token,
445  
    std::stop_token token,
450  
    std::error_code* ec,
446  
    std::error_code* ec,
451  
    std::size_t* bytes_out)
447  
    std::size_t* bytes_out)
452  
{
448  
{
453  
    auto& op = wr_;
449  
    auto& op = wr_;
454  
    op.reset();
450  
    op.reset();
455  

451  

456  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
452  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457  
    op.iovec_count =
453  
    op.iovec_count =
458  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
454  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459  

455  

460  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
456  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461  
    {
457  
    {
462  
        op.h         = h;
458  
        op.h         = h;
463  
        op.ex        = ex;
459  
        op.ex        = ex;
464  
        op.ec_out    = ec;
460  
        op.ec_out    = ec;
465  
        op.bytes_out = bytes_out;
461  
        op.bytes_out = bytes_out;
466  
        op.start(token, this);
462  
        op.start(token, this);
467  
        op.impl_ptr = shared_from_this();
463  
        op.impl_ptr = shared_from_this();
468  
        op.complete(0, 0);
464  
        op.complete(0, 0);
469  
        svc_.post(&op);
465  
        svc_.post(&op);
470  
        return std::noop_coroutine();
466  
        return std::noop_coroutine();
471  
    }
467  
    }
472  

468  

473  
    for (int i = 0; i < op.iovec_count; ++i)
469  
    for (int i = 0; i < op.iovec_count; ++i)
474  
    {
470  
    {
475  
        op.iovecs[i].iov_base = bufs[i].data();
471  
        op.iovecs[i].iov_base = bufs[i].data();
476  
        op.iovecs[i].iov_len  = bufs[i].size();
472  
        op.iovecs[i].iov_len  = bufs[i].size();
477  
    }
473  
    }
478  

474  

479  
    // Speculative write
475  
    // Speculative write
480  
    msghdr msg{};
476  
    msghdr msg{};
481  
    msg.msg_iov    = op.iovecs;
477  
    msg.msg_iov    = op.iovecs;
482  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
478  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483  

479  

484  
    ssize_t n;
480  
    ssize_t n;
485  
    do
481  
    do
486  
    {
482  
    {
487  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
483  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488  
    }
484  
    }
489  
    while (n < 0 && errno == EINTR);
485  
    while (n < 0 && errno == EINTR);
490  

486  

491  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
487  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492  
    {
488  
    {
493  
        int err    = (n < 0) ? errno : 0;
489  
        int err    = (n < 0) ? errno : 0;
494  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
490  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495  

491  

496  
        if (svc_.scheduler().try_consume_inline_budget())
492  
        if (svc_.scheduler().try_consume_inline_budget())
497  
        {
493  
        {
498  
            *ec        = err ? make_err(err) : std::error_code{};
494  
            *ec        = err ? make_err(err) : std::error_code{};
499  
            *bytes_out = bytes;
495  
            *bytes_out = bytes;
500  
            return dispatch_coro(ex, h);
496  
            return dispatch_coro(ex, h);
501  
        }
497  
        }
502  
        op.h         = h;
498  
        op.h         = h;
503  
        op.ex        = ex;
499  
        op.ex        = ex;
504  
        op.ec_out    = ec;
500  
        op.ec_out    = ec;
505  
        op.bytes_out = bytes_out;
501  
        op.bytes_out = bytes_out;
506  
        op.start(token, this);
502  
        op.start(token, this);
507  
        op.impl_ptr = shared_from_this();
503  
        op.impl_ptr = shared_from_this();
508  
        op.complete(err, bytes);
504  
        op.complete(err, bytes);
509  
        svc_.post(&op);
505  
        svc_.post(&op);
510  
        return std::noop_coroutine();
506  
        return std::noop_coroutine();
511  
    }
507  
    }
512  

508  

513  
    // EAGAIN — register with reactor
509  
    // EAGAIN — register with reactor
514  
    op.h         = h;
510  
    op.h         = h;
515  
    op.ex        = ex;
511  
    op.ex        = ex;
516  
    op.ec_out    = ec;
512  
    op.ec_out    = ec;
517  
    op.bytes_out = bytes_out;
513  
    op.bytes_out = bytes_out;
518  
    op.fd        = fd_;
514  
    op.fd        = fd_;
519  
    op.start(token, this);
515  
    op.start(token, this);
520  
    op.impl_ptr = shared_from_this();
516  
    op.impl_ptr = shared_from_this();
521  

517  

522  
    register_op(
518  
    register_op(
523  
        op, desc_state_.write_op, desc_state_.write_ready,
519  
        op, desc_state_.write_op, desc_state_.write_ready,
524  
        desc_state_.write_cancel_pending);
520  
        desc_state_.write_cancel_pending);
525  
    return std::noop_coroutine();
521  
    return std::noop_coroutine();
526  
}
522  
}
527  

523  

528  
inline std::error_code
524  
inline std::error_code
529  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
525  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530  
{
526  
{
531  
    int how;
527  
    int how;
532  
    switch (what)
528  
    switch (what)
533  
    {
529  
    {
534  
    case tcp_socket::shutdown_receive:
530  
    case tcp_socket::shutdown_receive:
535  
        how = SHUT_RD;
531  
        how = SHUT_RD;
536  
        break;
532  
        break;
537  
    case tcp_socket::shutdown_send:
533  
    case tcp_socket::shutdown_send:
538  
        how = SHUT_WR;
534  
        how = SHUT_WR;
539  
        break;
535  
        break;
540  
    case tcp_socket::shutdown_both:
536  
    case tcp_socket::shutdown_both:
541  
        how = SHUT_RDWR;
537  
        how = SHUT_RDWR;
542  
        break;
538  
        break;
543  
    default:
539  
    default:
544  
        return make_err(EINVAL);
540  
        return make_err(EINVAL);
545  
    }
541  
    }
546  
    if (::shutdown(fd_, how) != 0)
542  
    if (::shutdown(fd_, how) != 0)
547  
        return make_err(errno);
543  
        return make_err(errno);
548  
    return {};
544  
    return {};
549  
}
545  
}
550  

546  

551  
inline std::error_code
547  
inline std::error_code
552 -
epoll_socket::set_option(
548 +
epoll_socket::set_no_delay(bool value) noexcept
553 -
    int level, int optname,
 
554 -
    void const* data, std::size_t size) noexcept
 
555  
{
549  
{
556 -
    if (::setsockopt(fd_, level, optname, data,
550 +
    int flag = value ? 1 : 0;
557 -
            static_cast<socklen_t>(size)) != 0)
551 +
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
558  
        return make_err(errno);
552  
        return make_err(errno);
559  
    return {};
553  
    return {};
560  
}
554  
}
561  

555  

 
556 +
inline bool
 
557 +
epoll_socket::no_delay(std::error_code& ec) const noexcept
 
558 +
{
 
559 +
    int flag      = 0;
 
560 +
    socklen_t len = sizeof(flag);
 
561 +
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
562 +
    {
 
563 +
        ec = make_err(errno);
 
564 +
        return false;
 
565 +
    }
 
566 +
    ec = {};
 
567 +
    return flag != 0;
 
568 +
}
 
569 +

562  
inline std::error_code
570  
inline std::error_code
563 -
epoll_socket::get_option(
571 +
epoll_socket::set_keep_alive(bool value) noexcept
564 -
    int level, int optname,
 
565 -
    void* data, std::size_t* size) const noexcept
 
566  
{
572  
{
567 -
    socklen_t len = static_cast<socklen_t>(*size);
573 +
    int flag = value ? 1 : 0;
568 -
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
574 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
575 +
        return make_err(errno);
 
576 +
    return {};
 
577 +
}
 
578 +

 
579 +
inline bool
 
580 +
epoll_socket::keep_alive(std::error_code& ec) const noexcept
 
581 +
{
 
582 +
    int flag      = 0;
 
583 +
    socklen_t len = sizeof(flag);
 
584 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
585 +
    {
 
586 +
        ec = make_err(errno);
 
587 +
        return false;
 
588 +
    }
 
589 +
    ec = {};
 
590 +
    return flag != 0;
 
591 +
}
 
592 +

 
593 +
inline std::error_code
 
594 +
epoll_socket::set_receive_buffer_size(int size) noexcept
 
595 +
{
 
596 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
597 +
        return make_err(errno);
 
598 +
    return {};
 
599 +
}
 
600 +

 
601 +
inline int
 
602 +
epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
603 +
{
 
604 +
    int size      = 0;
 
605 +
    socklen_t len = sizeof(size);
 
606 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
607 +
    {
 
608 +
        ec = make_err(errno);
 
609 +
        return 0;
 
610 +
    }
 
611 +
    ec = {};
 
612 +
    return size;
 
613 +
}
 
614 +

 
615 +
inline std::error_code
 
616 +
epoll_socket::set_send_buffer_size(int size) noexcept
 
617 +
{
 
618 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
569 -
    *size = static_cast<std::size_t>(len);
 
570  
        return make_err(errno);
619  
        return make_err(errno);
571  
    return {};
620  
    return {};
572  
}
621  
}
573  

622  

 
623 +
inline int
 
624 +
epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
 
625 +
{
 
626 +
    int size      = 0;
 
627 +
    socklen_t len = sizeof(size);
 
628 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
629 +
    {
 
630 +
        ec = make_err(errno);
 
631 +
        return 0;
 
632 +
    }
 
633 +
    ec = {};
 
634 +
    return size;
 
635 +
}
 
636 +

 
637 +
inline std::error_code
 
638 +
epoll_socket::set_linger(bool enabled, int timeout) noexcept
 
639 +
{
 
640 +
    if (timeout < 0)
 
641 +
        return make_err(EINVAL);
 
642 +
    struct ::linger lg;
 
643 +
    lg.l_onoff  = enabled ? 1 : 0;
 
644 +
    lg.l_linger = timeout;
 
645 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
 
646 +
        return make_err(errno);
 
647 +
    return {};
 
648 +
}
 
649 +

 
650 +
inline tcp_socket::linger_options
 
651 +
epoll_socket::linger(std::error_code& ec) const noexcept
 
652 +
{
 
653 +
    struct ::linger lg{};
 
654 +
    socklen_t len = sizeof(lg);
 
655 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
656 +
    {
 
657 +
        ec = make_err(errno);
 
658 +
        return {};
 
659 +
    }
 
660 +
    ec = {};
 
661 +
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
662 +
}
 
663 +

574  
inline void
664  
inline void
575  
epoll_socket::cancel() noexcept
665  
epoll_socket::cancel() noexcept
576  
{
666  
{
577  
    auto self = weak_from_this().lock();
667  
    auto self = weak_from_this().lock();
578  
    if (!self)
668  
    if (!self)
579  
        return;
669  
        return;
580  

670  

581  
    conn_.request_cancel();
671  
    conn_.request_cancel();
582  
    rd_.request_cancel();
672  
    rd_.request_cancel();
583  
    wr_.request_cancel();
673  
    wr_.request_cancel();
584  

674  

585  
    epoll_op* conn_claimed = nullptr;
675  
    epoll_op* conn_claimed = nullptr;
586  
    epoll_op* rd_claimed   = nullptr;
676  
    epoll_op* rd_claimed   = nullptr;
587  
    epoll_op* wr_claimed   = nullptr;
677  
    epoll_op* wr_claimed   = nullptr;
588  
    {
678  
    {
589  
        std::lock_guard lock(desc_state_.mutex);
679  
        std::lock_guard lock(desc_state_.mutex);
590  
        if (desc_state_.connect_op == &conn_)
680  
        if (desc_state_.connect_op == &conn_)
591  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
681  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592  
        else
682  
        else
593  
            desc_state_.connect_cancel_pending = true;
683  
            desc_state_.connect_cancel_pending = true;
594  
        if (desc_state_.read_op == &rd_)
684  
        if (desc_state_.read_op == &rd_)
595  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
685  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596  
        else
686  
        else
597  
            desc_state_.read_cancel_pending = true;
687  
            desc_state_.read_cancel_pending = true;
598  
        if (desc_state_.write_op == &wr_)
688  
        if (desc_state_.write_op == &wr_)
599  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
689  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600  
        else
690  
        else
601  
            desc_state_.write_cancel_pending = true;
691  
            desc_state_.write_cancel_pending = true;
602  
    }
692  
    }
603  

693  

604  
    if (conn_claimed)
694  
    if (conn_claimed)
605  
    {
695  
    {
606  
        conn_.impl_ptr = self;
696  
        conn_.impl_ptr = self;
607  
        svc_.post(&conn_);
697  
        svc_.post(&conn_);
608  
        svc_.work_finished();
698  
        svc_.work_finished();
609  
    }
699  
    }
610  
    if (rd_claimed)
700  
    if (rd_claimed)
611  
    {
701  
    {
612  
        rd_.impl_ptr = self;
702  
        rd_.impl_ptr = self;
613  
        svc_.post(&rd_);
703  
        svc_.post(&rd_);
614  
        svc_.work_finished();
704  
        svc_.work_finished();
615  
    }
705  
    }
616  
    if (wr_claimed)
706  
    if (wr_claimed)
617  
    {
707  
    {
618  
        wr_.impl_ptr = self;
708  
        wr_.impl_ptr = self;
619  
        svc_.post(&wr_);
709  
        svc_.post(&wr_);
620  
        svc_.work_finished();
710  
        svc_.work_finished();
621  
    }
711  
    }
622  
}
712  
}
623  

713  

624  
inline void
714  
inline void
625  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
715  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
626  
{
716  
{
627  
    auto self = weak_from_this().lock();
717  
    auto self = weak_from_this().lock();
628  
    if (!self)
718  
    if (!self)
629  
        return;
719  
        return;
630  

720  

631  
    op.request_cancel();
721  
    op.request_cancel();
632  

722  

633  
    epoll_op** desc_op_ptr = nullptr;
723  
    epoll_op** desc_op_ptr = nullptr;
634  
    if (&op == &conn_)
724  
    if (&op == &conn_)
635  
        desc_op_ptr = &desc_state_.connect_op;
725  
        desc_op_ptr = &desc_state_.connect_op;
636  
    else if (&op == &rd_)
726  
    else if (&op == &rd_)
637  
        desc_op_ptr = &desc_state_.read_op;
727  
        desc_op_ptr = &desc_state_.read_op;
638  
    else if (&op == &wr_)
728  
    else if (&op == &wr_)
639  
        desc_op_ptr = &desc_state_.write_op;
729  
        desc_op_ptr = &desc_state_.write_op;
640  

730  

641  
    if (desc_op_ptr)
731  
    if (desc_op_ptr)
642  
    {
732  
    {
643  
        epoll_op* claimed = nullptr;
733  
        epoll_op* claimed = nullptr;
644  
        {
734  
        {
645  
            std::lock_guard lock(desc_state_.mutex);
735  
            std::lock_guard lock(desc_state_.mutex);
646  
            if (*desc_op_ptr == &op)
736  
            if (*desc_op_ptr == &op)
647  
                claimed = std::exchange(*desc_op_ptr, nullptr);
737  
                claimed = std::exchange(*desc_op_ptr, nullptr);
648  
            else if (&op == &conn_)
738  
            else if (&op == &conn_)
649  
                desc_state_.connect_cancel_pending = true;
739  
                desc_state_.connect_cancel_pending = true;
650  
            else if (&op == &rd_)
740  
            else if (&op == &rd_)
651  
                desc_state_.read_cancel_pending = true;
741  
                desc_state_.read_cancel_pending = true;
652  
            else if (&op == &wr_)
742  
            else if (&op == &wr_)
653  
                desc_state_.write_cancel_pending = true;
743  
                desc_state_.write_cancel_pending = true;
654  
        }
744  
        }
655  
        if (claimed)
745  
        if (claimed)
656  
        {
746  
        {
657  
            op.impl_ptr = self;
747  
            op.impl_ptr = self;
658  
            svc_.post(&op);
748  
            svc_.post(&op);
659  
            svc_.work_finished();
749  
            svc_.work_finished();
660  
        }
750  
        }
661  
    }
751  
    }
662  
}
752  
}
663  

753  

664  
inline void
754  
inline void
665  
epoll_socket::close_socket() noexcept
755  
epoll_socket::close_socket() noexcept
666  
{
756  
{
667  
    auto self = weak_from_this().lock();
757  
    auto self = weak_from_this().lock();
668  
    if (self)
758  
    if (self)
669  
    {
759  
    {
670  
        conn_.request_cancel();
760  
        conn_.request_cancel();
671  
        rd_.request_cancel();
761  
        rd_.request_cancel();
672  
        wr_.request_cancel();
762  
        wr_.request_cancel();
673  

763  

674  
        epoll_op* conn_claimed = nullptr;
764  
        epoll_op* conn_claimed = nullptr;
675  
        epoll_op* rd_claimed   = nullptr;
765  
        epoll_op* rd_claimed   = nullptr;
676  
        epoll_op* wr_claimed   = nullptr;
766  
        epoll_op* wr_claimed   = nullptr;
677  
        {
767  
        {
678  
            std::lock_guard lock(desc_state_.mutex);
768  
            std::lock_guard lock(desc_state_.mutex);
679  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
769  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
770  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
681  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
771  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
682  
            desc_state_.read_ready             = false;
772  
            desc_state_.read_ready             = false;
683  
            desc_state_.write_ready            = false;
773  
            desc_state_.write_ready            = false;
684  
            desc_state_.read_cancel_pending    = false;
774  
            desc_state_.read_cancel_pending    = false;
685  
            desc_state_.write_cancel_pending   = false;
775  
            desc_state_.write_cancel_pending   = false;
686  
            desc_state_.connect_cancel_pending = false;
776  
            desc_state_.connect_cancel_pending = false;
687  
        }
777  
        }
688  

778  

689  
        if (conn_claimed)
779  
        if (conn_claimed)
690  
        {
780  
        {
691  
            conn_.impl_ptr = self;
781  
            conn_.impl_ptr = self;
692  
            svc_.post(&conn_);
782  
            svc_.post(&conn_);
693  
            svc_.work_finished();
783  
            svc_.work_finished();
694  
        }
784  
        }
695  
        if (rd_claimed)
785  
        if (rd_claimed)
696  
        {
786  
        {
697  
            rd_.impl_ptr = self;
787  
            rd_.impl_ptr = self;
698  
            svc_.post(&rd_);
788  
            svc_.post(&rd_);
699  
            svc_.work_finished();
789  
            svc_.work_finished();
700  
        }
790  
        }
701  
        if (wr_claimed)
791  
        if (wr_claimed)
702  
        {
792  
        {
703  
            wr_.impl_ptr = self;
793  
            wr_.impl_ptr = self;
704  
            svc_.post(&wr_);
794  
            svc_.post(&wr_);
705  
            svc_.work_finished();
795  
            svc_.work_finished();
706  
        }
796  
        }
707  

797  

708  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
798  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709  
            desc_state_.impl_ref_ = self;
799  
            desc_state_.impl_ref_ = self;
710  
    }
800  
    }
711  

801  

712  
    if (fd_ >= 0)
802  
    if (fd_ >= 0)
713  
    {
803  
    {
714  
        if (desc_state_.registered_events != 0)
804  
        if (desc_state_.registered_events != 0)
715  
            svc_.scheduler().deregister_descriptor(fd_);
805  
            svc_.scheduler().deregister_descriptor(fd_);
716  
        ::close(fd_);
806  
        ::close(fd_);
717  
        fd_ = -1;
807  
        fd_ = -1;
718  
    }
808  
    }
719  

809  

720  
    desc_state_.fd                = -1;
810  
    desc_state_.fd                = -1;
721  
    desc_state_.registered_events = 0;
811  
    desc_state_.registered_events = 0;
722  

812  

723  
    local_endpoint_  = endpoint{};
813  
    local_endpoint_  = endpoint{};
724  
    remote_endpoint_ = endpoint{};
814  
    remote_endpoint_ = endpoint{};
725  
}
815  
}
726  

816  

727  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
817  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728  
    : state_(
818  
    : state_(
729  
          std::make_unique<epoll_socket_state>(
819  
          std::make_unique<epoll_socket_state>(
730  
              ctx.use_service<epoll_scheduler>()))
820  
              ctx.use_service<epoll_scheduler>()))
731  
{
821  
{
732  
}
822  
}
733  

823  

734  
inline epoll_socket_service::~epoll_socket_service() {}
824  
inline epoll_socket_service::~epoll_socket_service() {}
735  

825  

736  
inline void
826  
inline void
737  
epoll_socket_service::shutdown()
827  
epoll_socket_service::shutdown()
738  
{
828  
{
739  
    std::lock_guard lock(state_->mutex_);
829  
    std::lock_guard lock(state_->mutex_);
740  

830  

741  
    while (auto* impl = state_->socket_list_.pop_front())
831  
    while (auto* impl = state_->socket_list_.pop_front())
742  
        impl->close_socket();
832  
        impl->close_socket();
743  

833  

744  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
834  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745  
    // drains completed_ops_, calling destroy() on each queued op. If we
835  
    // drains completed_ops_, calling destroy() on each queued op. If we
746  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
836  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
747  
    // last ref to an impl whose embedded descriptor_state is still linked
837  
    // last ref to an impl whose embedded descriptor_state is still linked
748  
    // in the queue — use-after-free on the next pop(). Letting ~state_
838  
    // in the queue — use-after-free on the next pop(). Letting ~state_
749  
    // release the ptrs (during service destruction, after scheduler
839  
    // release the ptrs (during service destruction, after scheduler
750  
    // shutdown) keeps every impl alive until all ops have been drained.
840  
    // shutdown) keeps every impl alive until all ops have been drained.
751  
}
841  
}
752  

842  

753  
inline io_object::implementation*
843  
inline io_object::implementation*
754  
epoll_socket_service::construct()
844  
epoll_socket_service::construct()
755  
{
845  
{
756  
    auto impl = std::make_shared<epoll_socket>(*this);
846  
    auto impl = std::make_shared<epoll_socket>(*this);
757  
    auto* raw = impl.get();
847  
    auto* raw = impl.get();
758  

848  

759  
    {
849  
    {
760  
        std::lock_guard lock(state_->mutex_);
850  
        std::lock_guard lock(state_->mutex_);
761  
        state_->socket_list_.push_back(raw);
851  
        state_->socket_list_.push_back(raw);
762  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
852  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
763  
    }
853  
    }
764  

854  

765  
    return raw;
855  
    return raw;
766  
}
856  
}
767  

857  

768  
inline void
858  
inline void
769  
epoll_socket_service::destroy(io_object::implementation* impl)
859  
epoll_socket_service::destroy(io_object::implementation* impl)
770  
{
860  
{
771  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
861  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
772  
    epoll_impl->close_socket();
862  
    epoll_impl->close_socket();
773  
    std::lock_guard lock(state_->mutex_);
863  
    std::lock_guard lock(state_->mutex_);
774  
    state_->socket_list_.remove(epoll_impl);
864  
    state_->socket_list_.remove(epoll_impl);
775  
    state_->socket_ptrs_.erase(epoll_impl);
865  
    state_->socket_ptrs_.erase(epoll_impl);
776  
}
866  
}
777  

867  

778  
inline std::error_code
868  
inline std::error_code
779 -
epoll_socket_service::open_socket(
869 +
epoll_socket_service::open_socket(tcp_socket::implementation& impl)
780 -
    tcp_socket::implementation& impl,
 
781 -
    int family, int type, int protocol)
 
782  
{
870  
{
783  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
871  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784  
    epoll_impl->close_socket();
872  
    epoll_impl->close_socket();
785  

873  

786 -
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
874 +
    int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
787  
    if (fd < 0)
875  
    if (fd < 0)
788 -

 
789 -
    if (family == AF_INET6)
 
790 -
    {
 
791 -
        int one = 1;
 
792 -
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
793 -
    }
 
794  
        return make_err(errno);
876  
        return make_err(errno);
795  

877  

796  
    epoll_impl->fd_ = fd;
878  
    epoll_impl->fd_ = fd;
797  

879  

798  
    // Register fd with epoll (edge-triggered mode)
880  
    // Register fd with epoll (edge-triggered mode)
799  
    epoll_impl->desc_state_.fd = fd;
881  
    epoll_impl->desc_state_.fd = fd;
800  
    {
882  
    {
801  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
883  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
802  
        epoll_impl->desc_state_.read_op    = nullptr;
884  
        epoll_impl->desc_state_.read_op    = nullptr;
803  
        epoll_impl->desc_state_.write_op   = nullptr;
885  
        epoll_impl->desc_state_.write_op   = nullptr;
804  
        epoll_impl->desc_state_.connect_op = nullptr;
886  
        epoll_impl->desc_state_.connect_op = nullptr;
805  
    }
887  
    }
806  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
888  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807  

889  

808  
    return {};
890  
    return {};
809  
}
891  
}
810  

892  

811  
inline void
893  
inline void
812  
epoll_socket_service::close(io_object::handle& h)
894  
epoll_socket_service::close(io_object::handle& h)
813  
{
895  
{
814  
    static_cast<epoll_socket*>(h.get())->close_socket();
896  
    static_cast<epoll_socket*>(h.get())->close_socket();
815  
}
897  
}
816  

898  

817  
inline void
899  
inline void
818  
epoll_socket_service::post(epoll_op* op)
900  
epoll_socket_service::post(epoll_op* op)
819  
{
901  
{
820  
    state_->sched_.post(op);
902  
    state_->sched_.post(op);
821  
}
903  
}
822  

904  

823  
inline void
905  
inline void
824  
epoll_socket_service::work_started() noexcept
906  
epoll_socket_service::work_started() noexcept
825  
{
907  
{
826  
    state_->sched_.work_started();
908  
    state_->sched_.work_started();
827  
}
909  
}
828  

910  

829  
inline void
911  
inline void
830  
epoll_socket_service::work_finished() noexcept
912  
epoll_socket_service::work_finished() noexcept
831  
{
913  
{
832  
    state_->sched_.work_finished();
914  
    state_->sched_.work_finished();
833  
}
915  
}
834  

916  

835  
} // namespace boost::corosio::detail
917  
} // namespace boost::corosio::detail
836  

918  

837  
#endif // BOOST_COROSIO_HAS_EPOLL
919  
#endif // BOOST_COROSIO_HAS_EPOLL
838  

920  

839  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
921  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP