1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/native/detail/select/select_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <sys/socket.h>
33  
#include <sys/socket.h>
34  
#include <unistd.h>
34  
#include <unistd.h>
35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <fcntl.h>
36  
#include <fcntl.h>
37  

37  

38  
#include <algorithm>
38  
#include <algorithm>
39  
#include <atomic>
39  
#include <atomic>
40  
#include <chrono>
40  
#include <chrono>
41  
#include <condition_variable>
41  
#include <condition_variable>
42  
#include <cstddef>
42  
#include <cstddef>
43  
#include <limits>
43  
#include <limits>
44  
#include <mutex>
44  
#include <mutex>
45  
#include <unordered_map>
45  
#include <unordered_map>
46  

46  

47  
namespace boost::corosio::detail {
47  
namespace boost::corosio::detail {
48  

48  

49  
struct select_op;
49  
struct select_op;
50  

50  

51  
/** POSIX scheduler using select() for I/O multiplexing.
51  
/** POSIX scheduler using select() for I/O multiplexing.
52  

52  

53  
    This scheduler implements the scheduler interface using the POSIX select()
53  
    This scheduler implements the scheduler interface using the POSIX select()
54  
    call for I/O event notification. It uses a single reactor model
54  
    call for I/O event notification. It uses a single reactor model
55  
    where one thread runs select() while other threads wait on a condition
55  
    where one thread runs select() while other threads wait on a condition
56  
    variable for handler work. This design provides:
56  
    variable for handler work. This design provides:
57  

57  

58  
    - Handler parallelism: N posted handlers can execute on N threads
58  
    - Handler parallelism: N posted handlers can execute on N threads
59  
    - No thundering herd: condition_variable wakes exactly one thread
59  
    - No thundering herd: condition_variable wakes exactly one thread
60  
    - Portability: Works on all POSIX systems
60  
    - Portability: Works on all POSIX systems
61  

61  

62  
    The design mirrors epoll_scheduler for behavioral consistency:
62  
    The design mirrors epoll_scheduler for behavioral consistency:
63  
    - Same single-reactor thread coordination model
63  
    - Same single-reactor thread coordination model
64  
    - Same work counting semantics
64  
    - Same work counting semantics
65  
    - Same timer integration pattern
65  
    - Same timer integration pattern
66  

66  

67  
    Known Limitations:
67  
    Known Limitations:
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
69  
    - O(n) scanning: rebuilds fd_sets each iteration
69  
    - O(n) scanning: rebuilds fd_sets each iteration
70  
    - Level-triggered only (no edge-triggered mode)
70  
    - Level-triggered only (no edge-triggered mode)
71  

71  

72  
    @par Thread Safety
72  
    @par Thread Safety
73  
    All public member functions are thread-safe.
73  
    All public member functions are thread-safe.
74  
*/
74  
*/
75  
class BOOST_COROSIO_DECL select_scheduler final
75  
class BOOST_COROSIO_DECL select_scheduler final
76  
    : public native_scheduler
76  
    : public native_scheduler
77  
    , public capy::execution_context::service
77  
    , public capy::execution_context::service
78  
{
78  
{
79  
public:
79  
public:
80  
    using key_type = scheduler;
80  
    using key_type = scheduler;
81  

81  

82  
    /** Construct the scheduler.
82  
    /** Construct the scheduler.
83  

83  

84  
        Creates a self-pipe for reactor interruption.
84  
        Creates a self-pipe for reactor interruption.
85  

85  

86  
        @param ctx Reference to the owning execution_context.
86  
        @param ctx Reference to the owning execution_context.
87  
        @param concurrency_hint Hint for expected thread count (unused).
87  
        @param concurrency_hint Hint for expected thread count (unused).
88  
    */
88  
    */
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90  

90  

91  
    ~select_scheduler() override;
91  
    ~select_scheduler() override;
92  

92  

93  
    select_scheduler(select_scheduler const&)            = delete;
93  
    select_scheduler(select_scheduler const&)            = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
95  

95  

96  
    void shutdown() override;
96  
    void shutdown() override;
97  
    void post(std::coroutine_handle<> h) const override;
97  
    void post(std::coroutine_handle<> h) const override;
98  
    void post(scheduler_op* h) const override;
98  
    void post(scheduler_op* h) const override;
99  
    bool running_in_this_thread() const noexcept override;
99  
    bool running_in_this_thread() const noexcept override;
100  
    void stop() override;
100  
    void stop() override;
101  
    bool stopped() const noexcept override;
101  
    bool stopped() const noexcept override;
102  
    void restart() override;
102  
    void restart() override;
103  
    std::size_t run() override;
103  
    std::size_t run() override;
104  
    std::size_t run_one() override;
104  
    std::size_t run_one() override;
105  
    std::size_t wait_one(long usec) override;
105  
    std::size_t wait_one(long usec) override;
106  
    std::size_t poll() override;
106  
    std::size_t poll() override;
107  
    std::size_t poll_one() override;
107  
    std::size_t poll_one() override;
108  

108  

109  
    /** Return the maximum file descriptor value supported.
109  
    /** Return the maximum file descriptor value supported.
110  

110  

111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
113  
        will fail with EINVAL.
113  
        will fail with EINVAL.
114  

114  

115  
        @return The maximum supported file descriptor value.
115  
        @return The maximum supported file descriptor value.
116  
    */
116  
    */
117  
    static constexpr int max_fd() noexcept
117  
    static constexpr int max_fd() noexcept
118  
    {
118  
    {
119  
        return FD_SETSIZE - 1;
119  
        return FD_SETSIZE - 1;
120  
    }
120  
    }
121  

121  

122  
    /** Register a file descriptor for monitoring.
122  
    /** Register a file descriptor for monitoring.
123  

123  

124  
        @param fd The file descriptor to register.
124  
        @param fd The file descriptor to register.
125  
        @param op The operation associated with this fd.
125  
        @param op The operation associated with this fd.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
127  
    */
127  
    */
128  
    void register_fd(int fd, select_op* op, int events) const;
128  
    void register_fd(int fd, select_op* op, int events) const;
129  

129  

130  
    /** Unregister a file descriptor from monitoring.
130  
    /** Unregister a file descriptor from monitoring.
131  

131  

132  
        @param fd The file descriptor to unregister.
132  
        @param fd The file descriptor to unregister.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134  
    */
134  
    */
135  
    void deregister_fd(int fd, int events) const;
135  
    void deregister_fd(int fd, int events) const;
136  

136  

137  
    void work_started() noexcept override;
137  
    void work_started() noexcept override;
138  
    void work_finished() noexcept override;
138  
    void work_finished() noexcept override;
139  

139  

140  
    // Event flags for register_fd/deregister_fd
140  
    // Event flags for register_fd/deregister_fd
141  
    static constexpr int event_read  = 1;
141  
    static constexpr int event_read  = 1;
142  
    static constexpr int event_write = 2;
142  
    static constexpr int event_write = 2;
143  

143  

144  
private:
144  
private:
145  
    std::size_t do_one(long timeout_us);
145  
    std::size_t do_one(long timeout_us);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148  
    void interrupt_reactor() const;
148  
    void interrupt_reactor() const;
149  
    long calculate_timeout(long requested_timeout_us) const;
149  
    long calculate_timeout(long requested_timeout_us) const;
150  

150  

151  
    // Self-pipe for interrupting select()
151  
    // Self-pipe for interrupting select()
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
153  

153  

154  
    mutable std::mutex mutex_;
154  
    mutable std::mutex mutex_;
155  
    mutable std::condition_variable wakeup_event_;
155  
    mutable std::condition_variable wakeup_event_;
156  
    mutable op_queue completed_ops_;
156  
    mutable op_queue completed_ops_;
157  
    mutable std::atomic<long> outstanding_work_;
157  
    mutable std::atomic<long> outstanding_work_;
158 -
    bool shutdown_;
 
159  
    std::atomic<bool> stopped_;
158  
    std::atomic<bool> stopped_;
160  

159  

161  
    // Per-fd state for tracking registered operations
160  
    // Per-fd state for tracking registered operations
162  
    struct fd_state
161  
    struct fd_state
163  
    {
162  
    {
164  
        select_op* read_op  = nullptr;
163  
        select_op* read_op  = nullptr;
165  
        select_op* write_op = nullptr;
164  
        select_op* write_op = nullptr;
166  
    };
165  
    };
167  
    mutable std::unordered_map<int, fd_state> registered_fds_;
166  
    mutable std::unordered_map<int, fd_state> registered_fds_;
168  
    mutable int max_fd_ = -1;
167  
    mutable int max_fd_ = -1;
169  

168  

170  
    // Single reactor thread coordination
169  
    // Single reactor thread coordination
171  
    mutable bool reactor_running_     = false;
170  
    mutable bool reactor_running_     = false;
172  
    mutable bool reactor_interrupted_ = false;
171  
    mutable bool reactor_interrupted_ = false;
173  
    mutable int idle_thread_count_    = 0;
172  
    mutable int idle_thread_count_    = 0;
174  

173  

175  
    // Sentinel operation for interleaving reactor runs with handler execution.
174  
    // Sentinel operation for interleaving reactor runs with handler execution.
176  
    // Ensures the reactor runs periodically even when handlers are continuously
175  
    // Ensures the reactor runs periodically even when handlers are continuously
177  
    // posted, preventing timer starvation.
176  
    // posted, preventing timer starvation.
178  
    struct task_op final : scheduler_op
177  
    struct task_op final : scheduler_op
179  
    {
178  
    {
180  
        void operator()() override {}
179  
        void operator()() override {}
181  
        void destroy() override {}
180  
        void destroy() override {}
182  
    };
181  
    };
183  
    task_op task_op_;
182  
    task_op task_op_;
184  
};
183  
};
185  

184  

186  
/*
185  
/*
187  
    select Scheduler - Single Reactor Model
186  
    select Scheduler - Single Reactor Model
188  
    =======================================
187  
    =======================================
189  

188  

190  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
189  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
191  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
190  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
192  
    one thread becomes the "reactor" while others wait on a condition variable.
191  
    one thread becomes the "reactor" while others wait on a condition variable.
193  

192  

194  
    Thread Model
193  
    Thread Model
195  
    ------------
194  
    ------------
196  
    - ONE thread runs select() at a time (the reactor thread)
195  
    - ONE thread runs select() at a time (the reactor thread)
197  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
196  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
198  
    - When work is posted, exactly one waiting thread wakes via notify_one()
197  
    - When work is posted, exactly one waiting thread wakes via notify_one()
199  

198  

200  
    Key Differences from epoll
199  
    Key Differences from epoll
201  
    --------------------------
200  
    --------------------------
202  
    - Uses self-pipe instead of eventfd for interruption (more portable)
201  
    - Uses self-pipe instead of eventfd for interruption (more portable)
203  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
202  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
204  
    - FD_SETSIZE limit (~1024 fds on most systems)
203  
    - FD_SETSIZE limit (~1024 fds on most systems)
205  
    - Level-triggered only (no edge-triggered mode)
204  
    - Level-triggered only (no edge-triggered mode)
206  

205  

207  
    Self-Pipe Pattern
206  
    Self-Pipe Pattern
208  
    -----------------
207  
    -----------------
209  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
208  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
210  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
209  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
211  
    always in the read_fds set, so select() returns immediately. We drain the
210  
    always in the read_fds set, so select() returns immediately. We drain the
212  
    pipe to clear the readable state.
211  
    pipe to clear the readable state.
213  

212  

214  
    fd-to-op Mapping
213  
    fd-to-op Mapping
215  
    ----------------
214  
    ----------------
216  
    We use an unordered_map<int, fd_state> to track which operations are
215  
    We use an unordered_map<int, fd_state> to track which operations are
217  
    registered for each fd. This allows O(1) lookup when select() returns
216  
    registered for each fd. This allows O(1) lookup when select() returns
218  
    ready fds. Each fd can have at most one read op and one write op registered.
217  
    ready fds. Each fd can have at most one read op and one write op registered.
219  
*/
218  
*/
220  

219  

221  
namespace select {
220  
namespace select {
222  

221  

223  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
222  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
224  
{
223  
{
225  
    select_scheduler const* key;
224  
    select_scheduler const* key;
226  
    scheduler_context* next;
225  
    scheduler_context* next;
227  
};
226  
};
228  

227  

229  
inline thread_local_ptr<scheduler_context> context_stack;
228  
inline thread_local_ptr<scheduler_context> context_stack;
230  

229  

231  
struct thread_context_guard
230  
struct thread_context_guard
232  
{
231  
{
233  
    scheduler_context frame_;
232  
    scheduler_context frame_;
234  

233  

235  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
234  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
236  
        : frame_{ctx, context_stack.get()}
235  
        : frame_{ctx, context_stack.get()}
237  
    {
236  
    {
238  
        context_stack.set(&frame_);
237  
        context_stack.set(&frame_);
239  
    }
238  
    }
240  

239  

241  
    ~thread_context_guard() noexcept
240  
    ~thread_context_guard() noexcept
242  
    {
241  
    {
243  
        context_stack.set(frame_.next);
242  
        context_stack.set(frame_.next);
244  
    }
243  
    }
245  
};
244  
};
246  

245  

247  
struct work_guard
246  
struct work_guard
248  
{
247  
{
249  
    select_scheduler* self;
248  
    select_scheduler* self;
250  
    ~work_guard()
249  
    ~work_guard()
251  
    {
250  
    {
252  
        self->work_finished();
251  
        self->work_finished();
253  
    }
252  
    }
254  
};
253  
};
255  

254  

256  
} // namespace select
255  
} // namespace select
257  

256  

258  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
257  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
259  
    : pipe_fds_{-1, -1}
258  
    : pipe_fds_{-1, -1}
260  
    , outstanding_work_(0)
259  
    , outstanding_work_(0)
261 -
    , shutdown_(false)
 
262  
    , stopped_(false)
260  
    , stopped_(false)
263  
    , max_fd_(-1)
261  
    , max_fd_(-1)
264  
    , reactor_running_(false)
262  
    , reactor_running_(false)
265  
    , reactor_interrupted_(false)
263  
    , reactor_interrupted_(false)
266  
    , idle_thread_count_(0)
264  
    , idle_thread_count_(0)
267  
{
265  
{
268  
    // Create self-pipe for interrupting select()
266  
    // Create self-pipe for interrupting select()
269  
    if (::pipe(pipe_fds_) < 0)
267  
    if (::pipe(pipe_fds_) < 0)
270  
        detail::throw_system_error(make_err(errno), "pipe");
268  
        detail::throw_system_error(make_err(errno), "pipe");
271  

269  

272  
    // Set both ends to non-blocking and close-on-exec
270  
    // Set both ends to non-blocking and close-on-exec
273  
    for (int i = 0; i < 2; ++i)
271  
    for (int i = 0; i < 2; ++i)
274  
    {
272  
    {
275  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
273  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
276  
        if (flags == -1)
274  
        if (flags == -1)
277  
        {
275  
        {
278  
            int errn = errno;
276  
            int errn = errno;
279  
            ::close(pipe_fds_[0]);
277  
            ::close(pipe_fds_[0]);
280  
            ::close(pipe_fds_[1]);
278  
            ::close(pipe_fds_[1]);
281  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
279  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
282  
        }
280  
        }
283  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
281  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
284  
        {
282  
        {
285  
            int errn = errno;
283  
            int errn = errno;
286  
            ::close(pipe_fds_[0]);
284  
            ::close(pipe_fds_[0]);
287  
            ::close(pipe_fds_[1]);
285  
            ::close(pipe_fds_[1]);
288  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
286  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
289  
        }
287  
        }
290  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
288  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
291  
        {
289  
        {
292  
            int errn = errno;
290  
            int errn = errno;
293  
            ::close(pipe_fds_[0]);
291  
            ::close(pipe_fds_[0]);
294  
            ::close(pipe_fds_[1]);
292  
            ::close(pipe_fds_[1]);
295  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
293  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
296  
        }
294  
        }
297  
    }
295  
    }
298  

296  

299  
    timer_svc_ = &get_timer_service(ctx, *this);
297  
    timer_svc_ = &get_timer_service(ctx, *this);
300  
    timer_svc_->set_on_earliest_changed(
298  
    timer_svc_->set_on_earliest_changed(
301  
        timer_service::callback(this, [](void* p) {
299  
        timer_service::callback(this, [](void* p) {
302  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
300  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
303  
        }));
301  
        }));
304  

302  

305  
    // Initialize resolver service
303  
    // Initialize resolver service
306  
    get_resolver_service(ctx, *this);
304  
    get_resolver_service(ctx, *this);
307  

305  

308  
    // Initialize signal service
306  
    // Initialize signal service
309  
    get_signal_service(ctx, *this);
307  
    get_signal_service(ctx, *this);
310  

308  

311  
    // Push task sentinel to interleave reactor runs with handler execution
309  
    // Push task sentinel to interleave reactor runs with handler execution
312  
    completed_ops_.push(&task_op_);
310  
    completed_ops_.push(&task_op_);
313  
}
311  
}
314  

312  

315  
inline select_scheduler::~select_scheduler()
313  
inline select_scheduler::~select_scheduler()
316  
{
314  
{
317  
    if (pipe_fds_[0] >= 0)
315  
    if (pipe_fds_[0] >= 0)
318  
        ::close(pipe_fds_[0]);
316  
        ::close(pipe_fds_[0]);
319  
    if (pipe_fds_[1] >= 0)
317  
    if (pipe_fds_[1] >= 0)
320  
        ::close(pipe_fds_[1]);
318  
        ::close(pipe_fds_[1]);
321  
}
319  
}
322  

320  

323  
inline void
321  
inline void
324  
select_scheduler::shutdown()
322  
select_scheduler::shutdown()
325  
{
323  
{
326  
    {
324  
    {
327 -
        shutdown_ = true;
 
328  
        std::unique_lock lock(mutex_);
325  
        std::unique_lock lock(mutex_);
329  

326  

330  
        while (auto* h = completed_ops_.pop())
327  
        while (auto* h = completed_ops_.pop())
331  
        {
328  
        {
332  
            if (h == &task_op_)
329  
            if (h == &task_op_)
333  
                continue;
330  
                continue;
334  
            lock.unlock();
331  
            lock.unlock();
335  
            h->destroy();
332  
            h->destroy();
336  
            lock.lock();
333  
            lock.lock();
337  
        }
334  
        }
338  
    }
335  
    }
339 -
    outstanding_work_.store(0, std::memory_order_release);
 
340 -

 
341  

336  

342  
    if (pipe_fds_[1] >= 0)
337  
    if (pipe_fds_[1] >= 0)
343  
        interrupt_reactor();
338  
        interrupt_reactor();
344  

339  

345  
    wakeup_event_.notify_all();
340  
    wakeup_event_.notify_all();
346  
}
341  
}
347  

342  

348  
inline void
343  
inline void
349  
select_scheduler::post(std::coroutine_handle<> h) const
344  
select_scheduler::post(std::coroutine_handle<> h) const
350  
{
345  
{
351  
    struct post_handler final : scheduler_op
346  
    struct post_handler final : scheduler_op
352  
    {
347  
    {
353  
        std::coroutine_handle<> h_;
348  
        std::coroutine_handle<> h_;
354  

349  

355  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
350  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
356  

351  

357  
        ~post_handler() override = default;
352  
        ~post_handler() override = default;
358  

353  

359  
        void operator()() override
354  
        void operator()() override
360  
        {
355  
        {
361  
            auto h = h_;
356  
            auto h = h_;
362  
            delete this;
357  
            delete this;
363  
            h.resume();
358  
            h.resume();
364  
        }
359  
        }
365  

360  

366  
        void destroy() override
361  
        void destroy() override
367  
        {
362  
        {
 
363 +
            auto h = h_;
368  
            delete this;
364  
            delete this;
 
365 +
            h.destroy();
369  
        }
366  
        }
370  
    };
367  
    };
371  

368  

372  
    auto ph = std::make_unique<post_handler>(h);
369  
    auto ph = std::make_unique<post_handler>(h);
373  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
370  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
374  

371  

375  
    std::unique_lock lock(mutex_);
372  
    std::unique_lock lock(mutex_);
376  
    completed_ops_.push(ph.release());
373  
    completed_ops_.push(ph.release());
377  
    wake_one_thread_and_unlock(lock);
374  
    wake_one_thread_and_unlock(lock);
378  
}
375  
}
379  

376  

380  
inline void
377  
inline void
381  
select_scheduler::post(scheduler_op* h) const
378  
select_scheduler::post(scheduler_op* h) const
382  
{
379  
{
383  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
380  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
384  

381  

385  
    std::unique_lock lock(mutex_);
382  
    std::unique_lock lock(mutex_);
386  
    completed_ops_.push(h);
383  
    completed_ops_.push(h);
387  
    wake_one_thread_and_unlock(lock);
384  
    wake_one_thread_and_unlock(lock);
388  
}
385  
}
389  

386  

390  
inline bool
387  
inline bool
391  
select_scheduler::running_in_this_thread() const noexcept
388  
select_scheduler::running_in_this_thread() const noexcept
392  
{
389  
{
393  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
390  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
394  
        if (c->key == this)
391  
        if (c->key == this)
395  
            return true;
392  
            return true;
396  
    return false;
393  
    return false;
397  
}
394  
}
398  

395  

399  
inline void
396  
inline void
400  
select_scheduler::stop()
397  
select_scheduler::stop()
401  
{
398  
{
402  
    bool expected = false;
399  
    bool expected = false;
403  
    if (stopped_.compare_exchange_strong(
400  
    if (stopped_.compare_exchange_strong(
404  
            expected, true, std::memory_order_release,
401  
            expected, true, std::memory_order_release,
405  
            std::memory_order_relaxed))
402  
            std::memory_order_relaxed))
406  
    {
403  
    {
407  
        // Wake all threads so they notice stopped_ and exit
404  
        // Wake all threads so they notice stopped_ and exit
408  
        {
405  
        {
409  
            std::lock_guard lock(mutex_);
406  
            std::lock_guard lock(mutex_);
410  
            wakeup_event_.notify_all();
407  
            wakeup_event_.notify_all();
411  
        }
408  
        }
412  
        interrupt_reactor();
409  
        interrupt_reactor();
413  
    }
410  
    }
414  
}
411  
}
415  

412  

416  
inline bool
413  
inline bool
417  
select_scheduler::stopped() const noexcept
414  
select_scheduler::stopped() const noexcept
418  
{
415  
{
419  
    return stopped_.load(std::memory_order_acquire);
416  
    return stopped_.load(std::memory_order_acquire);
420  
}
417  
}
421  

418  

422  
inline void
419  
inline void
423  
select_scheduler::restart()
420  
select_scheduler::restart()
424  
{
421  
{
425  
    stopped_.store(false, std::memory_order_release);
422  
    stopped_.store(false, std::memory_order_release);
426  
}
423  
}
427  

424  

428  
inline std::size_t
425  
inline std::size_t
429  
select_scheduler::run()
426  
select_scheduler::run()
430  
{
427  
{
431  
    if (stopped_.load(std::memory_order_acquire))
428  
    if (stopped_.load(std::memory_order_acquire))
432  
        return 0;
429  
        return 0;
433  

430  

434  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
431  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
435  
    {
432  
    {
436  
        stop();
433  
        stop();
437  
        return 0;
434  
        return 0;
438  
    }
435  
    }
439  

436  

440  
    select::thread_context_guard ctx(this);
437  
    select::thread_context_guard ctx(this);
441  

438  

442  
    std::size_t n = 0;
439  
    std::size_t n = 0;
443  
    while (do_one(-1))
440  
    while (do_one(-1))
444  
        if (n != (std::numeric_limits<std::size_t>::max)())
441  
        if (n != (std::numeric_limits<std::size_t>::max)())
445  
            ++n;
442  
            ++n;
446  
    return n;
443  
    return n;
447  
}
444  
}
448  

445  

449  
inline std::size_t
446  
inline std::size_t
450  
select_scheduler::run_one()
447  
select_scheduler::run_one()
451  
{
448  
{
452  
    if (stopped_.load(std::memory_order_acquire))
449  
    if (stopped_.load(std::memory_order_acquire))
453  
        return 0;
450  
        return 0;
454  

451  

455  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
452  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
456  
    {
453  
    {
457  
        stop();
454  
        stop();
458  
        return 0;
455  
        return 0;
459  
    }
456  
    }
460  

457  

461  
    select::thread_context_guard ctx(this);
458  
    select::thread_context_guard ctx(this);
462  
    return do_one(-1);
459  
    return do_one(-1);
463  
}
460  
}
464  

461  

465  
inline std::size_t
462  
inline std::size_t
466  
select_scheduler::wait_one(long usec)
463  
select_scheduler::wait_one(long usec)
467  
{
464  
{
468  
    if (stopped_.load(std::memory_order_acquire))
465  
    if (stopped_.load(std::memory_order_acquire))
469  
        return 0;
466  
        return 0;
470  

467  

471  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
468  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
472  
    {
469  
    {
473  
        stop();
470  
        stop();
474  
        return 0;
471  
        return 0;
475  
    }
472  
    }
476  

473  

477  
    select::thread_context_guard ctx(this);
474  
    select::thread_context_guard ctx(this);
478  
    return do_one(usec);
475  
    return do_one(usec);
479  
}
476  
}
480  

477  

481  
inline std::size_t
478  
inline std::size_t
482  
select_scheduler::poll()
479  
select_scheduler::poll()
483  
{
480  
{
484  
    if (stopped_.load(std::memory_order_acquire))
481  
    if (stopped_.load(std::memory_order_acquire))
485  
        return 0;
482  
        return 0;
486  

483  

487  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
484  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
488  
    {
485  
    {
489  
        stop();
486  
        stop();
490  
        return 0;
487  
        return 0;
491  
    }
488  
    }
492  

489  

493  
    select::thread_context_guard ctx(this);
490  
    select::thread_context_guard ctx(this);
494  

491  

495  
    std::size_t n = 0;
492  
    std::size_t n = 0;
496  
    while (do_one(0))
493  
    while (do_one(0))
497  
        if (n != (std::numeric_limits<std::size_t>::max)())
494  
        if (n != (std::numeric_limits<std::size_t>::max)())
498  
            ++n;
495  
            ++n;
499  
    return n;
496  
    return n;
500  
}
497  
}
501  

498  

502  
inline std::size_t
499  
inline std::size_t
503  
select_scheduler::poll_one()
500  
select_scheduler::poll_one()
504  
{
501  
{
505  
    if (stopped_.load(std::memory_order_acquire))
502  
    if (stopped_.load(std::memory_order_acquire))
506  
        return 0;
503  
        return 0;
507  

504  

508  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
505  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
509  
    {
506  
    {
510  
        stop();
507  
        stop();
511  
        return 0;
508  
        return 0;
512  
    }
509  
    }
513  

510  

514  
    select::thread_context_guard ctx(this);
511  
    select::thread_context_guard ctx(this);
515  
    return do_one(0);
512  
    return do_one(0);
516  
}
513  
}
517  

514  

518  
inline void
515  
inline void
519  
select_scheduler::register_fd(int fd, select_op* op, int events) const
516  
select_scheduler::register_fd(int fd, select_op* op, int events) const
520  
{
517  
{
521  
    // Validate fd is within select() limits
518  
    // Validate fd is within select() limits
522  
    if (fd < 0 || fd >= FD_SETSIZE)
519  
    if (fd < 0 || fd >= FD_SETSIZE)
523  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
520  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
524  

521  

525  
    {
522  
    {
526  
        std::lock_guard lock(mutex_);
523  
        std::lock_guard lock(mutex_);
527  

524  

528  
        auto& state = registered_fds_[fd];
525  
        auto& state = registered_fds_[fd];
529  
        if (events & event_read)
526  
        if (events & event_read)
530  
            state.read_op = op;
527  
            state.read_op = op;
531  
        if (events & event_write)
528  
        if (events & event_write)
532  
            state.write_op = op;
529  
            state.write_op = op;
533  

530  

534  
        if (fd > max_fd_)
531  
        if (fd > max_fd_)
535  
            max_fd_ = fd;
532  
            max_fd_ = fd;
536  
    }
533  
    }
537  

534  

538  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
535  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
539  
    // with the newly registered fd.
536  
    // with the newly registered fd.
540  
    interrupt_reactor();
537  
    interrupt_reactor();
541  
}
538  
}
542  

539  

543  
inline void
540  
inline void
544  
select_scheduler::deregister_fd(int fd, int events) const
541  
select_scheduler::deregister_fd(int fd, int events) const
545  
{
542  
{
546  
    std::lock_guard lock(mutex_);
543  
    std::lock_guard lock(mutex_);
547  

544  

548  
    auto it = registered_fds_.find(fd);
545  
    auto it = registered_fds_.find(fd);
549  
    if (it == registered_fds_.end())
546  
    if (it == registered_fds_.end())
550  
        return;
547  
        return;
551  

548  

552  
    if (events & event_read)
549  
    if (events & event_read)
553  
        it->second.read_op = nullptr;
550  
        it->second.read_op = nullptr;
554  
    if (events & event_write)
551  
    if (events & event_write)
555  
        it->second.write_op = nullptr;
552  
        it->second.write_op = nullptr;
556  

553  

557  
    // Remove entry if both are null
554  
    // Remove entry if both are null
558  
    if (!it->second.read_op && !it->second.write_op)
555  
    if (!it->second.read_op && !it->second.write_op)
559  
    {
556  
    {
560  
        registered_fds_.erase(it);
557  
        registered_fds_.erase(it);
561  

558  

562  
        // Recalculate max_fd_ if needed
559  
        // Recalculate max_fd_ if needed
563  
        if (fd == max_fd_)
560  
        if (fd == max_fd_)
564  
        {
561  
        {
565  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
562  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
566  
            for (auto& [registered_fd, state] : registered_fds_)
563  
            for (auto& [registered_fd, state] : registered_fds_)
567  
            {
564  
            {
568  
                if (registered_fd > max_fd_)
565  
                if (registered_fd > max_fd_)
569  
                    max_fd_ = registered_fd;
566  
                    max_fd_ = registered_fd;
570  
            }
567  
            }
571  
        }
568  
        }
572  
    }
569  
    }
573  
}
570  
}
574  

571  

575  
inline void
572  
inline void
576  
select_scheduler::work_started() noexcept
573  
select_scheduler::work_started() noexcept
577  
{
574  
{
578  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
575  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
579  
}
576  
}
580  

577  

581  
inline void
578  
inline void
582  
select_scheduler::work_finished() noexcept
579  
select_scheduler::work_finished() noexcept
583  
{
580  
{
584  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
581  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
585  
        stop();
582  
        stop();
586  
}
583  
}
587  

584  

588  
inline void
585  
inline void
589  
select_scheduler::interrupt_reactor() const
586  
select_scheduler::interrupt_reactor() const
590  
{
587  
{
591  
    char byte               = 1;
588  
    char byte               = 1;
592  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
589  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
593  
}
590  
}
594  

591  

595  
inline void
592  
inline void
596  
select_scheduler::wake_one_thread_and_unlock(
593  
select_scheduler::wake_one_thread_and_unlock(
597  
    std::unique_lock<std::mutex>& lock) const
594  
    std::unique_lock<std::mutex>& lock) const
598  
{
595  
{
599  
    if (idle_thread_count_ > 0)
596  
    if (idle_thread_count_ > 0)
600  
    {
597  
    {
601  
        // Idle worker exists - wake it via condvar
598  
        // Idle worker exists - wake it via condvar
602  
        wakeup_event_.notify_one();
599  
        wakeup_event_.notify_one();
603  
        lock.unlock();
600  
        lock.unlock();
604  
    }
601  
    }
605  
    else if (reactor_running_ && !reactor_interrupted_)
602  
    else if (reactor_running_ && !reactor_interrupted_)
606  
    {
603  
    {
607  
        // No idle workers but reactor is running - interrupt it
604  
        // No idle workers but reactor is running - interrupt it
608  
        reactor_interrupted_ = true;
605  
        reactor_interrupted_ = true;
609  
        lock.unlock();
606  
        lock.unlock();
610  
        interrupt_reactor();
607  
        interrupt_reactor();
611  
    }
608  
    }
612  
    else
609  
    else
613  
    {
610  
    {
614  
        // No one to wake
611  
        // No one to wake
615  
        lock.unlock();
612  
        lock.unlock();
616  
    }
613  
    }
617  
}
614  
}
618  

615  

619  
inline long
616  
inline long
620  
select_scheduler::calculate_timeout(long requested_timeout_us) const
617  
select_scheduler::calculate_timeout(long requested_timeout_us) const
621  
{
618  
{
622  
    if (requested_timeout_us == 0)
619  
    if (requested_timeout_us == 0)
623  
        return 0;
620  
        return 0;
624  

621  

625  
    auto nearest = timer_svc_->nearest_expiry();
622  
    auto nearest = timer_svc_->nearest_expiry();
626  
    if (nearest == timer_service::time_point::max())
623  
    if (nearest == timer_service::time_point::max())
627  
        return requested_timeout_us;
624  
        return requested_timeout_us;
628  

625  

629  
    auto now = std::chrono::steady_clock::now();
626  
    auto now = std::chrono::steady_clock::now();
630  
    if (nearest <= now)
627  
    if (nearest <= now)
631  
        return 0;
628  
        return 0;
632  

629  

633  
    auto timer_timeout_us =
630  
    auto timer_timeout_us =
634  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
631  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
635  
            .count();
632  
            .count();
636  

633  

637  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
634  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
638  
    constexpr auto long_max =
635  
    constexpr auto long_max =
639  
        static_cast<long long>((std::numeric_limits<long>::max)());
636  
        static_cast<long long>((std::numeric_limits<long>::max)());
640  
    auto capped_timer_us =
637  
    auto capped_timer_us =
641  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
638  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
642  
                              static_cast<long long>(0)),
639  
                              static_cast<long long>(0)),
643  
                   long_max);
640  
                   long_max);
644  

641  

645  
    if (requested_timeout_us < 0)
642  
    if (requested_timeout_us < 0)
646  
        return static_cast<long>(capped_timer_us);
643  
        return static_cast<long>(capped_timer_us);
647  

644  

648  
    // requested_timeout_us is already long, so min() result fits in long
645  
    // requested_timeout_us is already long, so min() result fits in long
649  
    return static_cast<long>(
646  
    return static_cast<long>(
650  
        (std::min)(static_cast<long long>(requested_timeout_us),
647  
        (std::min)(static_cast<long long>(requested_timeout_us),
651  
                   capped_timer_us));
648  
                   capped_timer_us));
652  
}
649  
}
653  

650  

654  
inline void
651  
inline void
655  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
652  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
656  
{
653  
{
657  
    // Calculate timeout considering timers, use 0 if interrupted
654  
    // Calculate timeout considering timers, use 0 if interrupted
658  
    long effective_timeout_us =
655  
    long effective_timeout_us =
659  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
656  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
660  

657  

661  
    // Build fd_sets from registered_fds_
658  
    // Build fd_sets from registered_fds_
662  
    fd_set read_fds, write_fds, except_fds;
659  
    fd_set read_fds, write_fds, except_fds;
663  
    FD_ZERO(&read_fds);
660  
    FD_ZERO(&read_fds);
664  
    FD_ZERO(&write_fds);
661  
    FD_ZERO(&write_fds);
665  
    FD_ZERO(&except_fds);
662  
    FD_ZERO(&except_fds);
666  

663  

667  
    // Always include the interrupt pipe
664  
    // Always include the interrupt pipe
668  
    FD_SET(pipe_fds_[0], &read_fds);
665  
    FD_SET(pipe_fds_[0], &read_fds);
669  
    int nfds = pipe_fds_[0];
666  
    int nfds = pipe_fds_[0];
670  

667  

671  
    // Add registered fds
668  
    // Add registered fds
672  
    for (auto& [fd, state] : registered_fds_)
669  
    for (auto& [fd, state] : registered_fds_)
673  
    {
670  
    {
674  
        if (state.read_op)
671  
        if (state.read_op)
675  
            FD_SET(fd, &read_fds);
672  
            FD_SET(fd, &read_fds);
676  
        if (state.write_op)
673  
        if (state.write_op)
677  
        {
674  
        {
678  
            FD_SET(fd, &write_fds);
675  
            FD_SET(fd, &write_fds);
679  
            // Also monitor for errors on connect operations
676  
            // Also monitor for errors on connect operations
680  
            FD_SET(fd, &except_fds);
677  
            FD_SET(fd, &except_fds);
681  
        }
678  
        }
682  
        if (fd > nfds)
679  
        if (fd > nfds)
683  
            nfds = fd;
680  
            nfds = fd;
684  
    }
681  
    }
685  

682  

686  
    // Convert timeout to timeval
683  
    // Convert timeout to timeval
687  
    struct timeval tv;
684  
    struct timeval tv;
688  
    struct timeval* tv_ptr = nullptr;
685  
    struct timeval* tv_ptr = nullptr;
689  
    if (effective_timeout_us >= 0)
686  
    if (effective_timeout_us >= 0)
690  
    {
687  
    {
691  
        tv.tv_sec  = effective_timeout_us / 1000000;
688  
        tv.tv_sec  = effective_timeout_us / 1000000;
692  
        tv.tv_usec = effective_timeout_us % 1000000;
689  
        tv.tv_usec = effective_timeout_us % 1000000;
693  
        tv_ptr     = &tv;
690  
        tv_ptr     = &tv;
694  
    }
691  
    }
695  

692  

696  
    lock.unlock();
693  
    lock.unlock();
697  

694  

698  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
695  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
699  
    int saved_errno = errno;
696  
    int saved_errno = errno;
700  

697  

701  
    // Process timers outside the lock
698  
    // Process timers outside the lock
702  
    timer_svc_->process_expired();
699  
    timer_svc_->process_expired();
703  

700  

704  
    if (ready < 0 && saved_errno != EINTR)
701  
    if (ready < 0 && saved_errno != EINTR)
705  
        detail::throw_system_error(make_err(saved_errno), "select");
702  
        detail::throw_system_error(make_err(saved_errno), "select");
706  

703  

707  
    // Re-acquire lock before modifying completed_ops_
704  
    // Re-acquire lock before modifying completed_ops_
708  
    lock.lock();
705  
    lock.lock();
709  

706  

710  
    // Drain the interrupt pipe if readable
707  
    // Drain the interrupt pipe if readable
711  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
708  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
712  
    {
709  
    {
713  
        char buf[256];
710  
        char buf[256];
714  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
711  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
715  
        {
712  
        {
716  
        }
713  
        }
717  
    }
714  
    }
718  

715  

719  
    // Process I/O completions
716  
    // Process I/O completions
720  
    int completions_queued = 0;
717  
    int completions_queued = 0;
721  
    if (ready > 0)
718  
    if (ready > 0)
722  
    {
719  
    {
723  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
720  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
724  
        std::vector<int> fds_to_check;
721  
        std::vector<int> fds_to_check;
725  
        fds_to_check.reserve(registered_fds_.size());
722  
        fds_to_check.reserve(registered_fds_.size());
726  
        for (auto& [fd, state] : registered_fds_)
723  
        for (auto& [fd, state] : registered_fds_)
727  
            fds_to_check.push_back(fd);
724  
            fds_to_check.push_back(fd);
728  

725  

729  
        for (int fd : fds_to_check)
726  
        for (int fd : fds_to_check)
730  
        {
727  
        {
731  
            auto it = registered_fds_.find(fd);
728  
            auto it = registered_fds_.find(fd);
732  
            if (it == registered_fds_.end())
729  
            if (it == registered_fds_.end())
733  
                continue;
730  
                continue;
734  

731  

735  
            auto& state = it->second;
732  
            auto& state = it->second;
736  

733  

737  
            // Check for errors (especially for connect operations)
734  
            // Check for errors (especially for connect operations)
738  
            bool has_error = FD_ISSET(fd, &except_fds);
735  
            bool has_error = FD_ISSET(fd, &except_fds);
739  

736  

740  
            // Process read readiness
737  
            // Process read readiness
741  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
738  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
742  
            {
739  
            {
743  
                auto* op = state.read_op;
740  
                auto* op = state.read_op;
744  
                // Claim the op by exchanging to unregistered. Both registering and
741  
                // Claim the op by exchanging to unregistered. Both registering and
745  
                // registered states mean the op is ours to complete.
742  
                // registered states mean the op is ours to complete.
746  
                auto prev = op->registered.exchange(
743  
                auto prev = op->registered.exchange(
747  
                    select_registration_state::unregistered,
744  
                    select_registration_state::unregistered,
748  
                    std::memory_order_acq_rel);
745  
                    std::memory_order_acq_rel);
749  
                if (prev != select_registration_state::unregistered)
746  
                if (prev != select_registration_state::unregistered)
750  
                {
747  
                {
751  
                    state.read_op = nullptr;
748  
                    state.read_op = nullptr;
752  

749  

753  
                    if (has_error)
750  
                    if (has_error)
754  
                    {
751  
                    {
755  
                        int errn      = 0;
752  
                        int errn      = 0;
756  
                        socklen_t len = sizeof(errn);
753  
                        socklen_t len = sizeof(errn);
757  
                        if (::getsockopt(
754  
                        if (::getsockopt(
758  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
755  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
759  
                            errn = errno;
756  
                            errn = errno;
760  
                        if (errn == 0)
757  
                        if (errn == 0)
761  
                            errn = EIO;
758  
                            errn = EIO;
762  
                        op->complete(errn, 0);
759  
                        op->complete(errn, 0);
763  
                    }
760  
                    }
764  
                    else
761  
                    else
765  
                    {
762  
                    {
766  
                        op->perform_io();
763  
                        op->perform_io();
767  
                    }
764  
                    }
768  

765  

769  
                    completed_ops_.push(op);
766  
                    completed_ops_.push(op);
770  
                    ++completions_queued;
767  
                    ++completions_queued;
771  
                }
768  
                }
772  
            }
769  
            }
773  

770  

774  
            // Process write readiness
771  
            // Process write readiness
775  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
772  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
776  
            {
773  
            {
777  
                auto* op = state.write_op;
774  
                auto* op = state.write_op;
778  
                // Claim the op by exchanging to unregistered. Both registering and
775  
                // Claim the op by exchanging to unregistered. Both registering and
779  
                // registered states mean the op is ours to complete.
776  
                // registered states mean the op is ours to complete.
780  
                auto prev = op->registered.exchange(
777  
                auto prev = op->registered.exchange(
781  
                    select_registration_state::unregistered,
778  
                    select_registration_state::unregistered,
782  
                    std::memory_order_acq_rel);
779  
                    std::memory_order_acq_rel);
783  
                if (prev != select_registration_state::unregistered)
780  
                if (prev != select_registration_state::unregistered)
784  
                {
781  
                {
785  
                    state.write_op = nullptr;
782  
                    state.write_op = nullptr;
786  

783  

787  
                    if (has_error)
784  
                    if (has_error)
788  
                    {
785  
                    {
789  
                        int errn      = 0;
786  
                        int errn      = 0;
790  
                        socklen_t len = sizeof(errn);
787  
                        socklen_t len = sizeof(errn);
791  
                        if (::getsockopt(
788  
                        if (::getsockopt(
792  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
789  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
793  
                            errn = errno;
790  
                            errn = errno;
794  
                        if (errn == 0)
791  
                        if (errn == 0)
795  
                            errn = EIO;
792  
                            errn = EIO;
796  
                        op->complete(errn, 0);
793  
                        op->complete(errn, 0);
797  
                    }
794  
                    }
798  
                    else
795  
                    else
799  
                    {
796  
                    {
800  
                        op->perform_io();
797  
                        op->perform_io();
801  
                    }
798  
                    }
802  

799  

803  
                    completed_ops_.push(op);
800  
                    completed_ops_.push(op);
804  
                    ++completions_queued;
801  
                    ++completions_queued;
805  
                }
802  
                }
806  
            }
803  
            }
807  

804  

808  
            // Clean up empty entries
805  
            // Clean up empty entries
809  
            if (!state.read_op && !state.write_op)
806  
            if (!state.read_op && !state.write_op)
810  
                registered_fds_.erase(it);
807  
                registered_fds_.erase(it);
811  
        }
808  
        }
812  
    }
809  
    }
813  

810  

814  
    if (completions_queued > 0)
811  
    if (completions_queued > 0)
815  
    {
812  
    {
816  
        if (completions_queued == 1)
813  
        if (completions_queued == 1)
817  
            wakeup_event_.notify_one();
814  
            wakeup_event_.notify_one();
818  
        else
815  
        else
819  
            wakeup_event_.notify_all();
816  
            wakeup_event_.notify_all();
820  
    }
817  
    }
821  
}
818  
}
822  

819  

823  
inline std::size_t
820  
inline std::size_t
824  
select_scheduler::do_one(long timeout_us)
821  
select_scheduler::do_one(long timeout_us)
825  
{
822  
{
826  
    std::unique_lock lock(mutex_);
823  
    std::unique_lock lock(mutex_);
827  

824  

828  
    for (;;)
825  
    for (;;)
829  
    {
826  
    {
830  
        if (stopped_.load(std::memory_order_acquire))
827  
        if (stopped_.load(std::memory_order_acquire))
831  
            return 0;
828  
            return 0;
832  

829  

833  
        scheduler_op* op = completed_ops_.pop();
830  
        scheduler_op* op = completed_ops_.pop();
834  

831  

835  
        if (op == &task_op_)
832  
        if (op == &task_op_)
836  
        {
833  
        {
837  
            bool more_handlers = !completed_ops_.empty();
834  
            bool more_handlers = !completed_ops_.empty();
838  

835  

839  
            if (!more_handlers)
836  
            if (!more_handlers)
840  
            {
837  
            {
841  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
838  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
842  
                {
839  
                {
843  
                    completed_ops_.push(&task_op_);
840  
                    completed_ops_.push(&task_op_);
844  
                    return 0;
841  
                    return 0;
845  
                }
842  
                }
846  
                if (timeout_us == 0)
843  
                if (timeout_us == 0)
847  
                {
844  
                {
848  
                    completed_ops_.push(&task_op_);
845  
                    completed_ops_.push(&task_op_);
849  
                    return 0;
846  
                    return 0;
850  
                }
847  
                }
851  
            }
848  
            }
852  

849  

853  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
850  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
854  
            reactor_running_     = true;
851  
            reactor_running_     = true;
855  

852  

856  
            if (more_handlers && idle_thread_count_ > 0)
853  
            if (more_handlers && idle_thread_count_ > 0)
857  
                wakeup_event_.notify_one();
854  
                wakeup_event_.notify_one();
858  

855  

859  
            run_reactor(lock);
856  
            run_reactor(lock);
860  

857  

861  
            reactor_running_ = false;
858  
            reactor_running_ = false;
862  
            completed_ops_.push(&task_op_);
859  
            completed_ops_.push(&task_op_);
863  
            continue;
860  
            continue;
864  
        }
861  
        }
865  

862  

866  
        if (op != nullptr)
863  
        if (op != nullptr)
867  
        {
864  
        {
868  
            lock.unlock();
865  
            lock.unlock();
869  
            select::work_guard g{this};
866  
            select::work_guard g{this};
870  
            (*op)();
867  
            (*op)();
871  
            return 1;
868  
            return 1;
872  
        }
869  
        }
873  

870  

874  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
871  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
875  
            return 0;
872  
            return 0;
876  

873  

877  
        if (timeout_us == 0)
874  
        if (timeout_us == 0)
878  
            return 0;
875  
            return 0;
879  

876  

880  
        ++idle_thread_count_;
877  
        ++idle_thread_count_;
881  
        if (timeout_us < 0)
878  
        if (timeout_us < 0)
882  
            wakeup_event_.wait(lock);
879  
            wakeup_event_.wait(lock);
883  
        else
880  
        else
884  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
881  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
885  
        --idle_thread_count_;
882  
        --idle_thread_count_;
886  
    }
883  
    }
887  
}
884  
}
888  

885  

889  
} // namespace boost::corosio::detail
886  
} // namespace boost::corosio::detail
890  

887  

891  
#endif // BOOST_COROSIO_HAS_SELECT
888  
#endif // BOOST_COROSIO_HAS_SELECT
892  

889  

893  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
890  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP