1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <condition_variable>
34  
#include <condition_variable>
35  
#include <cstddef>
35  
#include <cstddef>
36  
#include <cstdint>
36  
#include <cstdint>
37  
#include <limits>
37  
#include <limits>
38  
#include <mutex>
38  
#include <mutex>
39  
#include <utility>
39  
#include <utility>
40  

40  

41  
#include <errno.h>
41  
#include <errno.h>
42  
#include <fcntl.h>
42  
#include <fcntl.h>
43  
#include <sys/epoll.h>
43  
#include <sys/epoll.h>
44  
#include <sys/eventfd.h>
44  
#include <sys/eventfd.h>
45  
#include <sys/socket.h>
45  
#include <sys/socket.h>
46  
#include <sys/timerfd.h>
46  
#include <sys/timerfd.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
namespace boost::corosio::detail {
49  
namespace boost::corosio::detail {
50  

50  

51  
struct epoll_op;
51  
struct epoll_op;
52  
struct descriptor_state;
52  
struct descriptor_state;
53  
namespace epoll {
53  
namespace epoll {
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55  
} // namespace epoll
55  
} // namespace epoll
56  

56  

57  
/** Linux scheduler using epoll for I/O multiplexing.
57  
/** Linux scheduler using epoll for I/O multiplexing.
58  

58  

59  
    This scheduler implements the scheduler interface using Linux epoll
59  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
60  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
61  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
62  
    wait on a condition variable for handler work. This design provides:
63  

63  

64  
    - Handler parallelism: N posted handlers can execute on N threads
64  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
65  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

67  

68  
    When threads call run(), they first try to execute queued handlers.
68  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
69  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
71  
    variable until handlers are available.
72  

72  

73  
    @par Thread Safety
73  
    @par Thread Safety
74  
    All public member functions are thread-safe.
74  
    All public member functions are thread-safe.
75  
*/
75  
*/
76  
class BOOST_COROSIO_DECL epoll_scheduler final
76  
class BOOST_COROSIO_DECL epoll_scheduler final
77  
    : public native_scheduler
77  
    : public native_scheduler
78  
    , public capy::execution_context::service
78  
    , public capy::execution_context::service
79  
{
79  
{
80  
public:
80  
public:
81  
    using key_type = scheduler;
81  
    using key_type = scheduler;
82  

82  

83  
    /** Construct the scheduler.
83  
    /** Construct the scheduler.
84  

84  

85  
        Creates an epoll instance, eventfd for reactor interruption,
85  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
86  
        and timerfd for kernel-managed timer expiry.
87  

87  

88  
        @param ctx Reference to the owning execution_context.
88  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
89  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
90  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

92  

93  
    /// Destroy the scheduler.
93  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
94  
    ~epoll_scheduler() override;
95  

95  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

98  

99  
    void shutdown() override;
99  
    void shutdown() override;
100  
    void post(std::coroutine_handle<> h) const override;
100  
    void post(std::coroutine_handle<> h) const override;
101  
    void post(scheduler_op* h) const override;
101  
    void post(scheduler_op* h) const override;
102  
    bool running_in_this_thread() const noexcept override;
102  
    bool running_in_this_thread() const noexcept override;
103  
    void stop() override;
103  
    void stop() override;
104  
    bool stopped() const noexcept override;
104  
    bool stopped() const noexcept override;
105  
    void restart() override;
105  
    void restart() override;
106  
    std::size_t run() override;
106  
    std::size_t run() override;
107  
    std::size_t run_one() override;
107  
    std::size_t run_one() override;
108  
    std::size_t wait_one(long usec) override;
108  
    std::size_t wait_one(long usec) override;
109  
    std::size_t poll() override;
109  
    std::size_t poll() override;
110  
    std::size_t poll_one() override;
110  
    std::size_t poll_one() override;
111  

111  

112  
    /** Return the epoll file descriptor.
112  
    /** Return the epoll file descriptor.
113  

113  

114  
        Used by socket services to register file descriptors
114  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
115  
        for I/O event notification.
116  

116  

117  
        @return The epoll file descriptor.
117  
        @return The epoll file descriptor.
118  
    */
118  
    */
119  
    int epoll_fd() const noexcept
119  
    int epoll_fd() const noexcept
120  
    {
120  
    {
121  
        return epoll_fd_;
121  
        return epoll_fd_;
122  
    }
122  
    }
123  

123  

124  
    /** Reset the thread's inline completion budget.
124  
    /** Reset the thread's inline completion budget.
125  

125  

126  
        Called at the start of each posted completion handler to
126  
        Called at the start of each posted completion handler to
127  
        grant a fresh budget for speculative inline completions.
127  
        grant a fresh budget for speculative inline completions.
128  
    */
128  
    */
129  
    void reset_inline_budget() const noexcept;
129  
    void reset_inline_budget() const noexcept;
130  

130  

131  
    /** Consume one unit of inline budget if available.
131  
    /** Consume one unit of inline budget if available.
132  

132  

133  
        @return True if budget was available and consumed.
133  
        @return True if budget was available and consumed.
134  
    */
134  
    */
135  
    bool try_consume_inline_budget() const noexcept;
135  
    bool try_consume_inline_budget() const noexcept;
136  

136  

137  
    /** Register a descriptor for persistent monitoring.
137  
    /** Register a descriptor for persistent monitoring.
138  

138  

139  
        The fd is registered once and stays registered until explicitly
139  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
140  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
141  
        tracks pending read/write/connect operations.
142  

142  

143  
        @param fd The file descriptor to register.
143  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
145  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

147  

148  
    /** Deregister a persistently registered descriptor.
148  
    /** Deregister a persistently registered descriptor.
149  

149  

150  
        @param fd The file descriptor to deregister.
150  
        @param fd The file descriptor to deregister.
151  
    */
151  
    */
152  
    void deregister_descriptor(int fd) const;
152  
    void deregister_descriptor(int fd) const;
153  

153  

154  
    void work_started() noexcept override;
154  
    void work_started() noexcept override;
155  
    void work_finished() noexcept override;
155  
    void work_finished() noexcept override;
156  

156  

157  
    /** Offset a forthcoming work_finished from work_cleanup.
157  
    /** Offset a forthcoming work_finished from work_cleanup.
158  

158  

159  
        Called by descriptor_state when all I/O returned EAGAIN and no
159  
        Called by descriptor_state when all I/O returned EAGAIN and no
160  
        handler will be executed. Must be called from a scheduler thread.
160  
        handler will be executed. Must be called from a scheduler thread.
161  
    */
161  
    */
162  
    void compensating_work_started() const noexcept;
162  
    void compensating_work_started() const noexcept;
163  

163  

164  
    /** Drain work from thread context's private queue to global queue.
164  
    /** Drain work from thread context's private queue to global queue.
165  

165  

166  
        Called by thread_context_guard destructor when a thread exits run().
166  
        Called by thread_context_guard destructor when a thread exits run().
167  
        Transfers pending work to the global queue under mutex protection.
167  
        Transfers pending work to the global queue under mutex protection.
168  

168  

169  
        @param queue The private queue to drain.
169  
        @param queue The private queue to drain.
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
171  
    */
171  
    */
172  
    void drain_thread_queue(op_queue& queue, long count) const;
172  
    void drain_thread_queue(op_queue& queue, long count) const;
173  

173  

174  
    /** Post completed operations for deferred invocation.
174  
    /** Post completed operations for deferred invocation.
175  

175  

176  
        If called from a thread running this scheduler, operations go to
176  
        If called from a thread running this scheduler, operations go to
177  
        the thread's private queue (fast path). Otherwise, operations are
177  
        the thread's private queue (fast path). Otherwise, operations are
178  
        added to the global queue under mutex and a waiter is signaled.
178  
        added to the global queue under mutex and a waiter is signaled.
179  

179  

180  
        @par Preconditions
180  
        @par Preconditions
181  
        work_started() must have been called for each operation.
181  
        work_started() must have been called for each operation.
182  

182  

183  
        @param ops Queue of operations to post.
183  
        @param ops Queue of operations to post.
184  
    */
184  
    */
185  
    void post_deferred_completions(op_queue& ops) const;
185  
    void post_deferred_completions(op_queue& ops) const;
186  

186  

187  
private:
187  
private:
188  
    struct work_cleanup
188  
    struct work_cleanup
189  
    {
189  
    {
190  
        epoll_scheduler* scheduler;
190  
        epoll_scheduler* scheduler;
191  
        std::unique_lock<std::mutex>* lock;
191  
        std::unique_lock<std::mutex>* lock;
192  
        epoll::scheduler_context* ctx;
192  
        epoll::scheduler_context* ctx;
193  
        ~work_cleanup();
193  
        ~work_cleanup();
194  
    };
194  
    };
195  

195  

196  
    struct task_cleanup
196  
    struct task_cleanup
197  
    {
197  
    {
198  
        epoll_scheduler const* scheduler;
198  
        epoll_scheduler const* scheduler;
199  
        std::unique_lock<std::mutex>* lock;
199  
        std::unique_lock<std::mutex>* lock;
200  
        epoll::scheduler_context* ctx;
200  
        epoll::scheduler_context* ctx;
201  
        ~task_cleanup();
201  
        ~task_cleanup();
202  
    };
202  
    };
203  

203  

204  
    std::size_t do_one(
204  
    std::size_t do_one(
205  
        std::unique_lock<std::mutex>& lock,
205  
        std::unique_lock<std::mutex>& lock,
206  
        long timeout_us,
206  
        long timeout_us,
207  
        epoll::scheduler_context* ctx);
207  
        epoll::scheduler_context* ctx);
208  
    void
208  
    void
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211  
    void interrupt_reactor() const;
211  
    void interrupt_reactor() const;
212  
    void update_timerfd() const;
212  
    void update_timerfd() const;
213  

213  

214  
    /** Set the signaled state and wake all waiting threads.
214  
    /** Set the signaled state and wake all waiting threads.
215  

215  

216  
        @par Preconditions
216  
        @par Preconditions
217  
        Mutex must be held.
217  
        Mutex must be held.
218  

218  

219  
        @param lock The held mutex lock.
219  
        @param lock The held mutex lock.
220  
    */
220  
    */
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
222  

222  

223  
    /** Set the signaled state and wake one waiter if any exist.
223  
    /** Set the signaled state and wake one waiter if any exist.
224  

224  

225  
        Only unlocks and signals if at least one thread is waiting.
225  
        Only unlocks and signals if at least one thread is waiting.
226  
        Use this when the caller needs to perform a fallback action
226  
        Use this when the caller needs to perform a fallback action
227  
        (such as interrupting the reactor) when no waiters exist.
227  
        (such as interrupting the reactor) when no waiters exist.
228  

228  

229  
        @par Preconditions
229  
        @par Preconditions
230  
        Mutex must be held.
230  
        Mutex must be held.
231  

231  

232  
        @param lock The held mutex lock.
232  
        @param lock The held mutex lock.
233  

233  

234  
        @return `true` if unlocked and signaled, `false` if lock still held.
234  
        @return `true` if unlocked and signaled, `false` if lock still held.
235  
    */
235  
    */
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237  

237  

238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
239  

239  

240  
        Always unlocks the mutex. Use this when the caller will release
240  
        Always unlocks the mutex. Use this when the caller will release
241  
        the lock regardless of whether a waiter exists.
241  
        the lock regardless of whether a waiter exists.
242  

242  

243  
        @par Preconditions
243  
        @par Preconditions
244  
        Mutex must be held.
244  
        Mutex must be held.
245  

245  

246  
        @param lock The held mutex lock.
246  
        @param lock The held mutex lock.
247  

247  

248  
        @return `true` if a waiter was signaled, `false` otherwise.
248  
        @return `true` if a waiter was signaled, `false` otherwise.
249  
    */
249  
    */
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251  

251  

252  
    /** Clear the signaled state before waiting.
252  
    /** Clear the signaled state before waiting.
253  

253  

254  
        @par Preconditions
254  
        @par Preconditions
255  
        Mutex must be held.
255  
        Mutex must be held.
256  
    */
256  
    */
257  
    void clear_signal() const;
257  
    void clear_signal() const;
258  

258  

259  
    /** Block until the signaled state is set.
259  
    /** Block until the signaled state is set.
260  

260  

261  
        Returns immediately if already signaled (fast-path). Otherwise
261  
        Returns immediately if already signaled (fast-path). Otherwise
262  
        increments the waiter count, waits on the condition variable,
262  
        increments the waiter count, waits on the condition variable,
263  
        and decrements the waiter count upon waking.
263  
        and decrements the waiter count upon waking.
264  

264  

265  
        @par Preconditions
265  
        @par Preconditions
266  
        Mutex must be held.
266  
        Mutex must be held.
267  

267  

268  
        @param lock The held mutex lock.
268  
        @param lock The held mutex lock.
269  
    */
269  
    */
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271  

271  

272  
    /** Block until signaled or timeout expires.
272  
    /** Block until signaled or timeout expires.
273  

273  

274  
        @par Preconditions
274  
        @par Preconditions
275  
        Mutex must be held.
275  
        Mutex must be held.
276  

276  

277  
        @param lock The held mutex lock.
277  
        @param lock The held mutex lock.
278  
        @param timeout_us Maximum time to wait in microseconds.
278  
        @param timeout_us Maximum time to wait in microseconds.
279  
    */
279  
    */
280  
    void wait_for_signal_for(
280  
    void wait_for_signal_for(
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
282  

282  

283  
    int epoll_fd_;
283  
    int epoll_fd_;
284  
    int event_fd_; // for interrupting reactor
284  
    int event_fd_; // for interrupting reactor
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
286  
    mutable std::mutex mutex_;
286  
    mutable std::mutex mutex_;
287  
    mutable std::condition_variable cond_;
287  
    mutable std::condition_variable cond_;
288  
    mutable op_queue completed_ops_;
288  
    mutable op_queue completed_ops_;
289  
    mutable std::atomic<long> outstanding_work_;
289  
    mutable std::atomic<long> outstanding_work_;
290 -
    bool shutdown_;
 
291  
    bool stopped_;
290  
    bool stopped_;
292  

291  

293  
    // True while a thread is blocked in epoll_wait. Used by
292  
    // True while a thread is blocked in epoll_wait. Used by
294  
    // wake_one_thread_and_unlock and work_finished to know when
293  
    // wake_one_thread_and_unlock and work_finished to know when
295  
    // an eventfd interrupt is needed instead of a condvar signal.
294  
    // an eventfd interrupt is needed instead of a condvar signal.
296  
    mutable std::atomic<bool> task_running_{false};
295  
    mutable std::atomic<bool> task_running_{false};
297  

296  

298  
    // True when the reactor has been told to do a non-blocking poll
297  
    // True when the reactor has been told to do a non-blocking poll
299  
    // (more handlers queued or poll mode). Prevents redundant eventfd
298  
    // (more handlers queued or poll mode). Prevents redundant eventfd
300  
    // writes and controls the epoll_wait timeout.
299  
    // writes and controls the epoll_wait timeout.
301  
    mutable bool task_interrupted_ = false;
300  
    mutable bool task_interrupted_ = false;
302  

301  

303  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
302  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
304  
    mutable std::size_t state_ = 0;
303  
    mutable std::size_t state_ = 0;
305  

304  

306  
    // Edge-triggered eventfd state
305  
    // Edge-triggered eventfd state
307  
    mutable std::atomic<bool> eventfd_armed_{false};
306  
    mutable std::atomic<bool> eventfd_armed_{false};
308  

307  

309  
    // Set when the earliest timer changes; flushed before epoll_wait
308  
    // Set when the earliest timer changes; flushed before epoll_wait
310  
    // blocks. Avoids timerfd_settime syscalls for timers that are
309  
    // blocks. Avoids timerfd_settime syscalls for timers that are
311  
    // scheduled then cancelled without being waited on.
310  
    // scheduled then cancelled without being waited on.
312  
    mutable std::atomic<bool> timerfd_stale_{false};
311  
    mutable std::atomic<bool> timerfd_stale_{false};
313  

312  

314  
    // Sentinel operation for interleaving reactor runs with handler execution.
313  
    // Sentinel operation for interleaving reactor runs with handler execution.
315  
    // Ensures the reactor runs periodically even when handlers are continuously
314  
    // Ensures the reactor runs periodically even when handlers are continuously
316  
    // posted, preventing starvation of I/O events, timers, and signals.
315  
    // posted, preventing starvation of I/O events, timers, and signals.
317  
    struct task_op final : scheduler_op
316  
    struct task_op final : scheduler_op
318  
    {
317  
    {
319  
        void operator()() override {}
318  
        void operator()() override {}
320  
        void destroy() override {}
319  
        void destroy() override {}
321  
    };
320  
    };
322  
    task_op task_op_;
321  
    task_op task_op_;
323  
};
322  
};
324  

323  

325  
//--------------------------------------------------------------------------
324  
//--------------------------------------------------------------------------
326  
//
325  
//
327  
// Implementation
326  
// Implementation
328  
//
327  
//
329  
//--------------------------------------------------------------------------
328  
//--------------------------------------------------------------------------
330  

329  

331  
/*
330  
/*
332  
    epoll Scheduler - Single Reactor Model
331  
    epoll Scheduler - Single Reactor Model
333  
    ======================================
332  
    ======================================
334  

333  

335  
    This scheduler uses a thread coordination strategy to provide handler
334  
    This scheduler uses a thread coordination strategy to provide handler
336  
    parallelism and avoid the thundering herd problem.
335  
    parallelism and avoid the thundering herd problem.
337  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
336  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
338  
    "reactor" while others wait on a condition variable for handler work.
337  
    "reactor" while others wait on a condition variable for handler work.
339  

338  

340  
    Thread Model
339  
    Thread Model
341  
    ------------
340  
    ------------
342  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
341  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
343  
    - OTHER threads wait on cond_ (condition variable) for handlers
342  
    - OTHER threads wait on cond_ (condition variable) for handlers
344  
    - When work is posted, exactly one waiting thread wakes via notify_one()
343  
    - When work is posted, exactly one waiting thread wakes via notify_one()
345  
    - This matches Windows IOCP semantics where N posted items wake N threads
344  
    - This matches Windows IOCP semantics where N posted items wake N threads
346  

345  

347  
    Event Loop Structure (do_one)
346  
    Event Loop Structure (do_one)
348  
    -----------------------------
347  
    -----------------------------
349  
    1. Lock mutex, try to pop handler from queue
348  
    1. Lock mutex, try to pop handler from queue
350  
    2. If got handler: execute it (unlocked), return
349  
    2. If got handler: execute it (unlocked), return
351  
    3. If queue empty and no reactor running: become reactor
350  
    3. If queue empty and no reactor running: become reactor
352  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
351  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
353  
    4. If queue empty and reactor running: wait on condvar for work
352  
    4. If queue empty and reactor running: wait on condvar for work
354  

353  

355  
    The task_running_ flag ensures only one thread owns epoll_wait().
354  
    The task_running_ flag ensures only one thread owns epoll_wait().
356  
    After the reactor queues I/O completions, it loops back to try getting
355  
    After the reactor queues I/O completions, it loops back to try getting
357  
    a handler, giving priority to handler execution over more I/O polling.
356  
    a handler, giving priority to handler execution over more I/O polling.
358  

357  

359  
    Signaling State (state_)
358  
    Signaling State (state_)
360  
    ------------------------
359  
    ------------------------
361  
    The state_ variable encodes two pieces of information:
360  
    The state_ variable encodes two pieces of information:
362  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
361  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
363  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
362  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
364  

363  

365  
    This allows efficient coordination:
364  
    This allows efficient coordination:
366  
    - Signalers only call notify when waiters exist (state_ > 1)
365  
    - Signalers only call notify when waiters exist (state_ > 1)
367  
    - Waiters check if already signaled before blocking (fast-path)
366  
    - Waiters check if already signaled before blocking (fast-path)
368  

367  

369  
    Wake Coordination (wake_one_thread_and_unlock)
368  
    Wake Coordination (wake_one_thread_and_unlock)
370  
    ----------------------------------------------
369  
    ----------------------------------------------
371  
    When posting work:
370  
    When posting work:
372  
    - If waiters exist (state_ > 1): signal and notify_one()
371  
    - If waiters exist (state_ > 1): signal and notify_one()
373  
    - Else if reactor running: interrupt via eventfd write
372  
    - Else if reactor running: interrupt via eventfd write
374  
    - Else: no-op (thread will find work when it checks queue)
373  
    - Else: no-op (thread will find work when it checks queue)
375  

374  

376  
    This avoids waking threads unnecessarily. With cascading wakes,
375  
    This avoids waking threads unnecessarily. With cascading wakes,
377  
    each handler execution wakes at most one additional thread if
376  
    each handler execution wakes at most one additional thread if
378  
    more work exists in the queue.
377  
    more work exists in the queue.
379  

378  

380  
    Work Counting
379  
    Work Counting
381  
    -------------
380  
    -------------
382  
    outstanding_work_ tracks pending operations. When it hits zero, run()
381  
    outstanding_work_ tracks pending operations. When it hits zero, run()
383  
    returns. Each operation increments on start, decrements on completion.
382  
    returns. Each operation increments on start, decrements on completion.
384  

383  

385  
    Timer Integration
384  
    Timer Integration
386  
    -----------------
385  
    -----------------
387  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
386  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
388  
    timeout to wake for the nearest timer expiry. When a new timer is
387  
    timeout to wake for the nearest timer expiry. When a new timer is
389  
    scheduled earlier than current, timer_service calls interrupt_reactor()
388  
    scheduled earlier than current, timer_service calls interrupt_reactor()
390  
    to re-evaluate the timeout.
389  
    to re-evaluate the timeout.
391  
*/
390  
*/
392  

391  

393  
namespace epoll {
392  
namespace epoll {
394  

393  

395  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
394  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
396  
{
395  
{
397  
    epoll_scheduler const* key;
396  
    epoll_scheduler const* key;
398  
    scheduler_context* next;
397  
    scheduler_context* next;
399  
    op_queue private_queue;
398  
    op_queue private_queue;
400  
    long private_outstanding_work;
399  
    long private_outstanding_work;
401  
    int inline_budget;
400  
    int inline_budget;
402  
    int inline_budget_max;
401  
    int inline_budget_max;
403  
    bool unassisted;
402  
    bool unassisted;
404  

403  

405  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
404  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
406  
        : key(k)
405  
        : key(k)
407  
        , next(n)
406  
        , next(n)
408  
        , private_outstanding_work(0)
407  
        , private_outstanding_work(0)
409  
        , inline_budget(0)
408  
        , inline_budget(0)
410  
        , inline_budget_max(2)
409  
        , inline_budget_max(2)
411  
        , unassisted(false)
410  
        , unassisted(false)
412  
    {
411  
    {
413  
    }
412  
    }
414  
};
413  
};
415  

414  

416  
inline thread_local_ptr<scheduler_context> context_stack;
415  
inline thread_local_ptr<scheduler_context> context_stack;
417  

416  

418  
struct thread_context_guard
417  
struct thread_context_guard
419  
{
418  
{
420  
    scheduler_context frame_;
419  
    scheduler_context frame_;
421  

420  

422  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
421  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
423  
        : frame_(ctx, context_stack.get())
422  
        : frame_(ctx, context_stack.get())
424  
    {
423  
    {
425  
        context_stack.set(&frame_);
424  
        context_stack.set(&frame_);
426  
    }
425  
    }
427  

426  

428  
    ~thread_context_guard() noexcept
427  
    ~thread_context_guard() noexcept
429  
    {
428  
    {
430  
        if (!frame_.private_queue.empty())
429  
        if (!frame_.private_queue.empty())
431  
            frame_.key->drain_thread_queue(
430  
            frame_.key->drain_thread_queue(
432  
                frame_.private_queue, frame_.private_outstanding_work);
431  
                frame_.private_queue, frame_.private_outstanding_work);
433  
        context_stack.set(frame_.next);
432  
        context_stack.set(frame_.next);
434  
    }
433  
    }
435  
};
434  
};
436  

435  

437  
inline scheduler_context*
436  
inline scheduler_context*
438  
find_context(epoll_scheduler const* self) noexcept
437  
find_context(epoll_scheduler const* self) noexcept
439  
{
438  
{
440  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
439  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
441  
        if (c->key == self)
440  
        if (c->key == self)
442  
            return c;
441  
            return c;
443  
    return nullptr;
442  
    return nullptr;
444  
}
443  
}
445  

444  

446  
} // namespace epoll
445  
} // namespace epoll
447  

446  

448  
inline void
447  
inline void
449  
epoll_scheduler::reset_inline_budget() const noexcept
448  
epoll_scheduler::reset_inline_budget() const noexcept
450  
{
449  
{
451  
    if (auto* ctx = epoll::find_context(this))
450  
    if (auto* ctx = epoll::find_context(this))
452  
    {
451  
    {
453  
        // Cap when no other thread absorbed queued work. A moderate
452  
        // Cap when no other thread absorbed queued work. A moderate
454  
        // cap (4) amortizes scheduling for small buffers while avoiding
453  
        // cap (4) amortizes scheduling for small buffers while avoiding
455  
        // bursty I/O that fills socket buffers and stalls large transfers.
454  
        // bursty I/O that fills socket buffers and stalls large transfers.
456  
        if (ctx->unassisted)
455  
        if (ctx->unassisted)
457  
        {
456  
        {
458  
            ctx->inline_budget_max = 4;
457  
            ctx->inline_budget_max = 4;
459  
            ctx->inline_budget     = 4;
458  
            ctx->inline_budget     = 4;
460  
            return;
459  
            return;
461  
        }
460  
        }
462  
        // Ramp up when previous cycle fully consumed budget.
461  
        // Ramp up when previous cycle fully consumed budget.
463  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
462  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
464  
        if (ctx->inline_budget == 0)
463  
        if (ctx->inline_budget == 0)
465  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
464  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
466  
        else if (ctx->inline_budget < ctx->inline_budget_max)
465  
        else if (ctx->inline_budget < ctx->inline_budget_max)
467  
            ctx->inline_budget_max = 2;
466  
            ctx->inline_budget_max = 2;
468  
        ctx->inline_budget = ctx->inline_budget_max;
467  
        ctx->inline_budget = ctx->inline_budget_max;
469  
    }
468  
    }
470  
}
469  
}
471  

470  

472  
inline bool
471  
inline bool
473  
epoll_scheduler::try_consume_inline_budget() const noexcept
472  
epoll_scheduler::try_consume_inline_budget() const noexcept
474  
{
473  
{
475  
    if (auto* ctx = epoll::find_context(this))
474  
    if (auto* ctx = epoll::find_context(this))
476  
    {
475  
    {
477  
        if (ctx->inline_budget > 0)
476  
        if (ctx->inline_budget > 0)
478  
        {
477  
        {
479  
            --ctx->inline_budget;
478  
            --ctx->inline_budget;
480  
            return true;
479  
            return true;
481  
        }
480  
        }
482  
    }
481  
    }
483  
    return false;
482  
    return false;
484  
}
483  
}
485  

484  

486  
inline void
485  
inline void
487  
descriptor_state::operator()()
486  
descriptor_state::operator()()
488  
{
487  
{
489  
    is_enqueued_.store(false, std::memory_order_relaxed);
488  
    is_enqueued_.store(false, std::memory_order_relaxed);
490  

489  

491  
    // Take ownership of impl ref set by close_socket() to prevent
490  
    // Take ownership of impl ref set by close_socket() to prevent
492  
    // the owning impl from being freed while we're executing
491  
    // the owning impl from being freed while we're executing
493  
    auto prevent_impl_destruction = std::move(impl_ref_);
492  
    auto prevent_impl_destruction = std::move(impl_ref_);
494  

493  

495  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
494  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
496  
    if (ev == 0)
495  
    if (ev == 0)
497  
    {
496  
    {
498  
        scheduler_->compensating_work_started();
497  
        scheduler_->compensating_work_started();
499  
        return;
498  
        return;
500  
    }
499  
    }
501  

500  

502  
    op_queue local_ops;
501  
    op_queue local_ops;
503  

502  

504  
    int err = 0;
503  
    int err = 0;
505  
    if (ev & EPOLLERR)
504  
    if (ev & EPOLLERR)
506  
    {
505  
    {
507  
        socklen_t len = sizeof(err);
506  
        socklen_t len = sizeof(err);
508  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
507  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
509  
            err = errno;
508  
            err = errno;
510  
        if (err == 0)
509  
        if (err == 0)
511  
            err = EIO;
510  
            err = EIO;
512  
    }
511  
    }
513  

512  

514  
    {
513  
    {
515  
        std::lock_guard lock(mutex);
514  
        std::lock_guard lock(mutex);
516  
        if (ev & EPOLLIN)
515  
        if (ev & EPOLLIN)
517  
        {
516  
        {
518  
            if (read_op)
517  
            if (read_op)
519  
            {
518  
            {
520  
                auto* rd = read_op;
519  
                auto* rd = read_op;
521  
                if (err)
520  
                if (err)
522  
                    rd->complete(err, 0);
521  
                    rd->complete(err, 0);
523  
                else
522  
                else
524  
                    rd->perform_io();
523  
                    rd->perform_io();
525  

524  

526  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
525  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
527  
                {
526  
                {
528  
                    rd->errn = 0;
527  
                    rd->errn = 0;
529  
                }
528  
                }
530  
                else
529  
                else
531  
                {
530  
                {
532  
                    read_op = nullptr;
531  
                    read_op = nullptr;
533  
                    local_ops.push(rd);
532  
                    local_ops.push(rd);
534  
                }
533  
                }
535  
            }
534  
            }
536  
            else
535  
            else
537  
            {
536  
            {
538  
                read_ready = true;
537  
                read_ready = true;
539  
            }
538  
            }
540  
        }
539  
        }
541  
        if (ev & EPOLLOUT)
540  
        if (ev & EPOLLOUT)
542  
        {
541  
        {
543  
            bool had_write_op = (connect_op || write_op);
542  
            bool had_write_op = (connect_op || write_op);
544  
            if (connect_op)
543  
            if (connect_op)
545  
            {
544  
            {
546  
                auto* cn = connect_op;
545  
                auto* cn = connect_op;
547  
                if (err)
546  
                if (err)
548  
                    cn->complete(err, 0);
547  
                    cn->complete(err, 0);
549  
                else
548  
                else
550  
                    cn->perform_io();
549  
                    cn->perform_io();
551  
                connect_op = nullptr;
550  
                connect_op = nullptr;
552  
                local_ops.push(cn);
551  
                local_ops.push(cn);
553  
            }
552  
            }
554  
            if (write_op)
553  
            if (write_op)
555  
            {
554  
            {
556  
                auto* wr = write_op;
555  
                auto* wr = write_op;
557  
                if (err)
556  
                if (err)
558  
                    wr->complete(err, 0);
557  
                    wr->complete(err, 0);
559  
                else
558  
                else
560  
                    wr->perform_io();
559  
                    wr->perform_io();
561  

560  

562  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
561  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
563  
                {
562  
                {
564  
                    wr->errn = 0;
563  
                    wr->errn = 0;
565  
                }
564  
                }
566  
                else
565  
                else
567  
                {
566  
                {
568  
                    write_op = nullptr;
567  
                    write_op = nullptr;
569  
                    local_ops.push(wr);
568  
                    local_ops.push(wr);
570  
                }
569  
                }
571  
            }
570  
            }
572  
            if (!had_write_op)
571  
            if (!had_write_op)
573  
                write_ready = true;
572  
                write_ready = true;
574  
        }
573  
        }
575  
        if (err)
574  
        if (err)
576  
        {
575  
        {
577  
            if (read_op)
576  
            if (read_op)
578  
            {
577  
            {
579  
                read_op->complete(err, 0);
578  
                read_op->complete(err, 0);
580  
                local_ops.push(std::exchange(read_op, nullptr));
579  
                local_ops.push(std::exchange(read_op, nullptr));
581  
            }
580  
            }
582  
            if (write_op)
581  
            if (write_op)
583  
            {
582  
            {
584  
                write_op->complete(err, 0);
583  
                write_op->complete(err, 0);
585  
                local_ops.push(std::exchange(write_op, nullptr));
584  
                local_ops.push(std::exchange(write_op, nullptr));
586  
            }
585  
            }
587  
            if (connect_op)
586  
            if (connect_op)
588  
            {
587  
            {
589  
                connect_op->complete(err, 0);
588  
                connect_op->complete(err, 0);
590  
                local_ops.push(std::exchange(connect_op, nullptr));
589  
                local_ops.push(std::exchange(connect_op, nullptr));
591  
            }
590  
            }
592  
        }
591  
        }
593  
    }
592  
    }
594  

593  

595  
    // Execute first handler inline — the scheduler's work_cleanup
594  
    // Execute first handler inline — the scheduler's work_cleanup
596  
    // accounts for this as the "consumed" work item
595  
    // accounts for this as the "consumed" work item
597  
    scheduler_op* first = local_ops.pop();
596  
    scheduler_op* first = local_ops.pop();
598  
    if (first)
597  
    if (first)
599  
    {
598  
    {
600  
        scheduler_->post_deferred_completions(local_ops);
599  
        scheduler_->post_deferred_completions(local_ops);
601  
        (*first)();
600  
        (*first)();
602  
    }
601  
    }
603  
    else
602  
    else
604  
    {
603  
    {
605  
        scheduler_->compensating_work_started();
604  
        scheduler_->compensating_work_started();
606  
    }
605  
    }
607  
}
606  
}
608  

607  

609  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
610  
    : epoll_fd_(-1)
609  
    : epoll_fd_(-1)
611  
    , event_fd_(-1)
610  
    , event_fd_(-1)
612  
    , timer_fd_(-1)
611  
    , timer_fd_(-1)
613  
    , outstanding_work_(0)
612  
    , outstanding_work_(0)
614 -
    , shutdown_(false)
 
615  
    , stopped_(false)
613  
    , stopped_(false)
616  
    , task_running_{false}
614  
    , task_running_{false}
617  
    , task_interrupted_(false)
615  
    , task_interrupted_(false)
618  
    , state_(0)
616  
    , state_(0)
619  
{
617  
{
620  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
621  
    if (epoll_fd_ < 0)
619  
    if (epoll_fd_ < 0)
622  
        detail::throw_system_error(make_err(errno), "epoll_create1");
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
623  

621  

624  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
625  
    if (event_fd_ < 0)
623  
    if (event_fd_ < 0)
626  
    {
624  
    {
627  
        int errn = errno;
625  
        int errn = errno;
628  
        ::close(epoll_fd_);
626  
        ::close(epoll_fd_);
629  
        detail::throw_system_error(make_err(errn), "eventfd");
627  
        detail::throw_system_error(make_err(errn), "eventfd");
630  
    }
628  
    }
631  

629  

632  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
633  
    if (timer_fd_ < 0)
631  
    if (timer_fd_ < 0)
634  
    {
632  
    {
635  
        int errn = errno;
633  
        int errn = errno;
636  
        ::close(event_fd_);
634  
        ::close(event_fd_);
637  
        ::close(epoll_fd_);
635  
        ::close(epoll_fd_);
638  
        detail::throw_system_error(make_err(errn), "timerfd_create");
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
639  
    }
637  
    }
640  

638  

641  
    epoll_event ev{};
639  
    epoll_event ev{};
642  
    ev.events   = EPOLLIN | EPOLLET;
640  
    ev.events   = EPOLLIN | EPOLLET;
643  
    ev.data.ptr = nullptr;
641  
    ev.data.ptr = nullptr;
644  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
645  
    {
643  
    {
646  
        int errn = errno;
644  
        int errn = errno;
647  
        ::close(timer_fd_);
645  
        ::close(timer_fd_);
648  
        ::close(event_fd_);
646  
        ::close(event_fd_);
649  
        ::close(epoll_fd_);
647  
        ::close(epoll_fd_);
650  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
651  
    }
649  
    }
652  

650  

653  
    epoll_event timer_ev{};
651  
    epoll_event timer_ev{};
654  
    timer_ev.events   = EPOLLIN | EPOLLERR;
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
655  
    timer_ev.data.ptr = &timer_fd_;
653  
    timer_ev.data.ptr = &timer_fd_;
656  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
657  
    {
655  
    {
658  
        int errn = errno;
656  
        int errn = errno;
659  
        ::close(timer_fd_);
657  
        ::close(timer_fd_);
660  
        ::close(event_fd_);
658  
        ::close(event_fd_);
661  
        ::close(epoll_fd_);
659  
        ::close(epoll_fd_);
662  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
663  
    }
661  
    }
664  

662  

665  
    timer_svc_ = &get_timer_service(ctx, *this);
663  
    timer_svc_ = &get_timer_service(ctx, *this);
666  
    timer_svc_->set_on_earliest_changed(
664  
    timer_svc_->set_on_earliest_changed(
667  
        timer_service::callback(this, [](void* p) {
665  
        timer_service::callback(this, [](void* p) {
668  
            auto* self = static_cast<epoll_scheduler*>(p);
666  
            auto* self = static_cast<epoll_scheduler*>(p);
669  
            self->timerfd_stale_.store(true, std::memory_order_release);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
670  
            if (self->task_running_.load(std::memory_order_acquire))
668  
            if (self->task_running_.load(std::memory_order_acquire))
671  
                self->interrupt_reactor();
669  
                self->interrupt_reactor();
672  
        }));
670  
        }));
673  

671  

674  
    // Initialize resolver service
672  
    // Initialize resolver service
675  
    get_resolver_service(ctx, *this);
673  
    get_resolver_service(ctx, *this);
676  

674  

677  
    // Initialize signal service
675  
    // Initialize signal service
678  
    get_signal_service(ctx, *this);
676  
    get_signal_service(ctx, *this);
679  

677  

680  
    // Push task sentinel to interleave reactor runs with handler execution
678  
    // Push task sentinel to interleave reactor runs with handler execution
681  
    completed_ops_.push(&task_op_);
679  
    completed_ops_.push(&task_op_);
682  
}
680  
}
683  

681  

684  
inline epoll_scheduler::~epoll_scheduler()
682  
inline epoll_scheduler::~epoll_scheduler()
685  
{
683  
{
686  
    if (timer_fd_ >= 0)
684  
    if (timer_fd_ >= 0)
687  
        ::close(timer_fd_);
685  
        ::close(timer_fd_);
688  
    if (event_fd_ >= 0)
686  
    if (event_fd_ >= 0)
689  
        ::close(event_fd_);
687  
        ::close(event_fd_);
690  
    if (epoll_fd_ >= 0)
688  
    if (epoll_fd_ >= 0)
691  
        ::close(epoll_fd_);
689  
        ::close(epoll_fd_);
692  
}
690  
}
693  

691  

694  
inline void
692  
inline void
695  
epoll_scheduler::shutdown()
693  
epoll_scheduler::shutdown()
696  
{
694  
{
697  
    {
695  
    {
698 -
        shutdown_ = true;
 
699  
        std::unique_lock lock(mutex_);
696  
        std::unique_lock lock(mutex_);
700  

697  

701  
        while (auto* h = completed_ops_.pop())
698  
        while (auto* h = completed_ops_.pop())
702  
        {
699  
        {
703  
            if (h == &task_op_)
700  
            if (h == &task_op_)
704  
                continue;
701  
                continue;
705  
            lock.unlock();
702  
            lock.unlock();
706  
            h->destroy();
703  
            h->destroy();
707  
            lock.lock();
704  
            lock.lock();
708  
        }
705  
        }
709  

706  

710  
        signal_all(lock);
707  
        signal_all(lock);
711  
    }
708  
    }
712 -
    outstanding_work_.store(0, std::memory_order_release);
 
713 -

 
714  

709  

715  
    if (event_fd_ >= 0)
710  
    if (event_fd_ >= 0)
716  
        interrupt_reactor();
711  
        interrupt_reactor();
717  
}
712  
}
718  

713  

719  
inline void
714  
inline void
720  
epoll_scheduler::post(std::coroutine_handle<> h) const
715  
epoll_scheduler::post(std::coroutine_handle<> h) const
721  
{
716  
{
722  
    struct post_handler final : scheduler_op
717  
    struct post_handler final : scheduler_op
723  
    {
718  
    {
724  
        std::coroutine_handle<> h_;
719  
        std::coroutine_handle<> h_;
725  

720  

726  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
721  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
727  

722  

728  
        ~post_handler() override = default;
723  
        ~post_handler() override = default;
729  

724  

730  
        void operator()() override
725  
        void operator()() override
731  
        {
726  
        {
732  
            auto h = h_;
727  
            auto h = h_;
733  
            delete this;
728  
            delete this;
734  
            h.resume();
729  
            h.resume();
735  
        }
730  
        }
736  

731  

737  
        void destroy() override
732  
        void destroy() override
738  
        {
733  
        {
 
734 +
            auto h = h_;
739  
            delete this;
735  
            delete this;
 
736 +
            h.destroy();
740  
        }
737  
        }
741  
    };
738  
    };
742  

739  

743  
    auto ph = std::make_unique<post_handler>(h);
740  
    auto ph = std::make_unique<post_handler>(h);
744  

741  

745  
    // Fast path: same thread posts to private queue
742  
    // Fast path: same thread posts to private queue
746  
    // Only count locally; work_cleanup batches to global counter
743  
    // Only count locally; work_cleanup batches to global counter
747  
    if (auto* ctx = epoll::find_context(this))
744  
    if (auto* ctx = epoll::find_context(this))
748  
    {
745  
    {
749  
        ++ctx->private_outstanding_work;
746  
        ++ctx->private_outstanding_work;
750  
        ctx->private_queue.push(ph.release());
747  
        ctx->private_queue.push(ph.release());
751  
        return;
748  
        return;
752  
    }
749  
    }
753  

750  

754  
    // Slow path: cross-thread post requires mutex
751  
    // Slow path: cross-thread post requires mutex
755  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
752  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
756  

753  

757  
    std::unique_lock lock(mutex_);
754  
    std::unique_lock lock(mutex_);
758  
    completed_ops_.push(ph.release());
755  
    completed_ops_.push(ph.release());
759  
    wake_one_thread_and_unlock(lock);
756  
    wake_one_thread_and_unlock(lock);
760  
}
757  
}
761  

758  

762  
inline void
759  
inline void
763  
epoll_scheduler::post(scheduler_op* h) const
760  
epoll_scheduler::post(scheduler_op* h) const
764  
{
761  
{
765  
    // Fast path: same thread posts to private queue
762  
    // Fast path: same thread posts to private queue
766  
    // Only count locally; work_cleanup batches to global counter
763  
    // Only count locally; work_cleanup batches to global counter
767  
    if (auto* ctx = epoll::find_context(this))
764  
    if (auto* ctx = epoll::find_context(this))
768  
    {
765  
    {
769  
        ++ctx->private_outstanding_work;
766  
        ++ctx->private_outstanding_work;
770  
        ctx->private_queue.push(h);
767  
        ctx->private_queue.push(h);
771  
        return;
768  
        return;
772  
    }
769  
    }
773  

770  

774  
    // Slow path: cross-thread post requires mutex
771  
    // Slow path: cross-thread post requires mutex
775  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
776  

773  

777  
    std::unique_lock lock(mutex_);
774  
    std::unique_lock lock(mutex_);
778  
    completed_ops_.push(h);
775  
    completed_ops_.push(h);
779  
    wake_one_thread_and_unlock(lock);
776  
    wake_one_thread_and_unlock(lock);
780  
}
777  
}
781  

778  

782  
inline bool
779  
inline bool
783  
epoll_scheduler::running_in_this_thread() const noexcept
780  
epoll_scheduler::running_in_this_thread() const noexcept
784  
{
781  
{
785  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
782  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
786  
        if (c->key == this)
783  
        if (c->key == this)
787  
            return true;
784  
            return true;
788  
    return false;
785  
    return false;
789  
}
786  
}
790  

787  

791  
inline void
788  
inline void
792  
epoll_scheduler::stop()
789  
epoll_scheduler::stop()
793  
{
790  
{
794  
    std::unique_lock lock(mutex_);
791  
    std::unique_lock lock(mutex_);
795  
    if (!stopped_)
792  
    if (!stopped_)
796  
    {
793  
    {
797  
        stopped_ = true;
794  
        stopped_ = true;
798  
        signal_all(lock);
795  
        signal_all(lock);
799  
        interrupt_reactor();
796  
        interrupt_reactor();
800  
    }
797  
    }
801  
}
798  
}
802  

799  

803  
inline bool
800  
inline bool
804  
epoll_scheduler::stopped() const noexcept
801  
epoll_scheduler::stopped() const noexcept
805  
{
802  
{
806  
    std::unique_lock lock(mutex_);
803  
    std::unique_lock lock(mutex_);
807  
    return stopped_;
804  
    return stopped_;
808  
}
805  
}
809  

806  

810  
inline void
807  
inline void
811  
epoll_scheduler::restart()
808  
epoll_scheduler::restart()
812  
{
809  
{
813  
    std::unique_lock lock(mutex_);
810  
    std::unique_lock lock(mutex_);
814  
    stopped_ = false;
811  
    stopped_ = false;
815  
}
812  
}
816  

813  

817  
inline std::size_t
814  
inline std::size_t
818  
epoll_scheduler::run()
815  
epoll_scheduler::run()
819  
{
816  
{
820  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
817  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
821  
    {
818  
    {
822  
        stop();
819  
        stop();
823  
        return 0;
820  
        return 0;
824  
    }
821  
    }
825  

822  

826  
    epoll::thread_context_guard ctx(this);
823  
    epoll::thread_context_guard ctx(this);
827  
    std::unique_lock lock(mutex_);
824  
    std::unique_lock lock(mutex_);
828  

825  

829  
    std::size_t n = 0;
826  
    std::size_t n = 0;
830  
    for (;;)
827  
    for (;;)
831  
    {
828  
    {
832  
        if (!do_one(lock, -1, &ctx.frame_))
829  
        if (!do_one(lock, -1, &ctx.frame_))
833  
            break;
830  
            break;
834  
        if (n != (std::numeric_limits<std::size_t>::max)())
831  
        if (n != (std::numeric_limits<std::size_t>::max)())
835  
            ++n;
832  
            ++n;
836  
        if (!lock.owns_lock())
833  
        if (!lock.owns_lock())
837  
            lock.lock();
834  
            lock.lock();
838  
    }
835  
    }
839  
    return n;
836  
    return n;
840  
}
837  
}
841  

838  

842  
inline std::size_t
839  
inline std::size_t
843  
epoll_scheduler::run_one()
840  
epoll_scheduler::run_one()
844  
{
841  
{
845  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
842  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
846  
    {
843  
    {
847  
        stop();
844  
        stop();
848  
        return 0;
845  
        return 0;
849  
    }
846  
    }
850  

847  

851  
    epoll::thread_context_guard ctx(this);
848  
    epoll::thread_context_guard ctx(this);
852  
    std::unique_lock lock(mutex_);
849  
    std::unique_lock lock(mutex_);
853  
    return do_one(lock, -1, &ctx.frame_);
850  
    return do_one(lock, -1, &ctx.frame_);
854  
}
851  
}
855  

852  

856  
inline std::size_t
853  
inline std::size_t
857  
epoll_scheduler::wait_one(long usec)
854  
epoll_scheduler::wait_one(long usec)
858  
{
855  
{
859  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
856  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
860  
    {
857  
    {
861  
        stop();
858  
        stop();
862  
        return 0;
859  
        return 0;
863  
    }
860  
    }
864  

861  

865  
    epoll::thread_context_guard ctx(this);
862  
    epoll::thread_context_guard ctx(this);
866  
    std::unique_lock lock(mutex_);
863  
    std::unique_lock lock(mutex_);
867  
    return do_one(lock, usec, &ctx.frame_);
864  
    return do_one(lock, usec, &ctx.frame_);
868  
}
865  
}
869  

866  

870  
inline std::size_t
867  
inline std::size_t
871  
epoll_scheduler::poll()
868  
epoll_scheduler::poll()
872  
{
869  
{
873  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
870  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
874  
    {
871  
    {
875  
        stop();
872  
        stop();
876  
        return 0;
873  
        return 0;
877  
    }
874  
    }
878  

875  

879  
    epoll::thread_context_guard ctx(this);
876  
    epoll::thread_context_guard ctx(this);
880  
    std::unique_lock lock(mutex_);
877  
    std::unique_lock lock(mutex_);
881  

878  

882  
    std::size_t n = 0;
879  
    std::size_t n = 0;
883  
    for (;;)
880  
    for (;;)
884  
    {
881  
    {
885  
        if (!do_one(lock, 0, &ctx.frame_))
882  
        if (!do_one(lock, 0, &ctx.frame_))
886  
            break;
883  
            break;
887  
        if (n != (std::numeric_limits<std::size_t>::max)())
884  
        if (n != (std::numeric_limits<std::size_t>::max)())
888  
            ++n;
885  
            ++n;
889  
        if (!lock.owns_lock())
886  
        if (!lock.owns_lock())
890  
            lock.lock();
887  
            lock.lock();
891  
    }
888  
    }
892  
    return n;
889  
    return n;
893  
}
890  
}
894  

891  

895  
inline std::size_t
892  
inline std::size_t
896  
epoll_scheduler::poll_one()
893  
epoll_scheduler::poll_one()
897  
{
894  
{
898  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
895  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
899  
    {
896  
    {
900  
        stop();
897  
        stop();
901  
        return 0;
898  
        return 0;
902  
    }
899  
    }
903  

900  

904  
    epoll::thread_context_guard ctx(this);
901  
    epoll::thread_context_guard ctx(this);
905  
    std::unique_lock lock(mutex_);
902  
    std::unique_lock lock(mutex_);
906  
    return do_one(lock, 0, &ctx.frame_);
903  
    return do_one(lock, 0, &ctx.frame_);
907  
}
904  
}
908  

905  

909  
inline void
906  
inline void
910  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
911  
{
908  
{
912  
    epoll_event ev{};
909  
    epoll_event ev{};
913  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
910  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
914  
    ev.data.ptr = desc;
911  
    ev.data.ptr = desc;
915  

912  

916  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
917  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
918  

915  

919  
    desc->registered_events = ev.events;
916  
    desc->registered_events = ev.events;
920  
    desc->fd                = fd;
917  
    desc->fd                = fd;
921  
    desc->scheduler_        = this;
918  
    desc->scheduler_        = this;
922  

919  

923  
    std::lock_guard lock(desc->mutex);
920  
    std::lock_guard lock(desc->mutex);
924  
    desc->read_ready  = false;
921  
    desc->read_ready  = false;
925  
    desc->write_ready = false;
922  
    desc->write_ready = false;
926  
}
923  
}
927  

924  

928  
inline void
925  
inline void
929  
epoll_scheduler::deregister_descriptor(int fd) const
926  
epoll_scheduler::deregister_descriptor(int fd) const
930  
{
927  
{
931  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
932  
}
929  
}
933  

930  

934  
inline void
931  
inline void
935  
epoll_scheduler::work_started() noexcept
932  
epoll_scheduler::work_started() noexcept
936  
{
933  
{
937  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
934  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
938  
}
935  
}
939  

936  

940  
inline void
937  
inline void
941  
epoll_scheduler::work_finished() noexcept
938  
epoll_scheduler::work_finished() noexcept
942  
{
939  
{
943  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
940  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
944  
        stop();
941  
        stop();
945  
}
942  
}
946  

943  

947  
inline void
944  
inline void
948  
epoll_scheduler::compensating_work_started() const noexcept
945  
epoll_scheduler::compensating_work_started() const noexcept
949  
{
946  
{
950  
    auto* ctx = epoll::find_context(this);
947  
    auto* ctx = epoll::find_context(this);
951  
    if (ctx)
948  
    if (ctx)
952  
        ++ctx->private_outstanding_work;
949  
        ++ctx->private_outstanding_work;
953  
}
950  
}
954  

951  

955  
inline void
952  
inline void
956  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
953  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
957  
{
954  
{
958  
    // Note: outstanding_work_ was already incremented when posting
955  
    // Note: outstanding_work_ was already incremented when posting
959  
    std::unique_lock lock(mutex_);
956  
    std::unique_lock lock(mutex_);
960  
    completed_ops_.splice(queue);
957  
    completed_ops_.splice(queue);
961  
    if (count > 0)
958  
    if (count > 0)
962  
        maybe_unlock_and_signal_one(lock);
959  
        maybe_unlock_and_signal_one(lock);
963  
}
960  
}
964  

961  

965  
inline void
962  
inline void
966  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
963  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
967  
{
964  
{
968  
    if (ops.empty())
965  
    if (ops.empty())
969  
        return;
966  
        return;
970  

967  

971  
    // Fast path: if on scheduler thread, use private queue
968  
    // Fast path: if on scheduler thread, use private queue
972  
    if (auto* ctx = epoll::find_context(this))
969  
    if (auto* ctx = epoll::find_context(this))
973  
    {
970  
    {
974  
        ctx->private_queue.splice(ops);
971  
        ctx->private_queue.splice(ops);
975  
        return;
972  
        return;
976  
    }
973  
    }
977  

974  

978  
    // Slow path: add to global queue and wake a thread
975  
    // Slow path: add to global queue and wake a thread
979  
    std::unique_lock lock(mutex_);
976  
    std::unique_lock lock(mutex_);
980  
    completed_ops_.splice(ops);
977  
    completed_ops_.splice(ops);
981  
    wake_one_thread_and_unlock(lock);
978  
    wake_one_thread_and_unlock(lock);
982  
}
979  
}
983  

980  

984  
inline void
981  
inline void
985  
epoll_scheduler::interrupt_reactor() const
982  
epoll_scheduler::interrupt_reactor() const
986  
{
983  
{
987  
    // Only write if not already armed to avoid redundant writes
984  
    // Only write if not already armed to avoid redundant writes
988  
    bool expected = false;
985  
    bool expected = false;
989  
    if (eventfd_armed_.compare_exchange_strong(
986  
    if (eventfd_armed_.compare_exchange_strong(
990  
            expected, true, std::memory_order_release,
987  
            expected, true, std::memory_order_release,
991  
            std::memory_order_relaxed))
988  
            std::memory_order_relaxed))
992  
    {
989  
    {
993  
        std::uint64_t val       = 1;
990  
        std::uint64_t val       = 1;
994  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
995  
    }
992  
    }
996  
}
993  
}
997  

994  

998  
inline void
995  
inline void
999  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
996  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1000  
{
997  
{
1001  
    state_ |= 1;
998  
    state_ |= 1;
1002  
    cond_.notify_all();
999  
    cond_.notify_all();
1003  
}
1000  
}
1004  

1001  

1005  
inline bool
1002  
inline bool
1006  
epoll_scheduler::maybe_unlock_and_signal_one(
1003  
epoll_scheduler::maybe_unlock_and_signal_one(
1007  
    std::unique_lock<std::mutex>& lock) const
1004  
    std::unique_lock<std::mutex>& lock) const
1008  
{
1005  
{
1009  
    state_ |= 1;
1006  
    state_ |= 1;
1010  
    if (state_ > 1)
1007  
    if (state_ > 1)
1011  
    {
1008  
    {
1012  
        lock.unlock();
1009  
        lock.unlock();
1013  
        cond_.notify_one();
1010  
        cond_.notify_one();
1014  
        return true;
1011  
        return true;
1015  
    }
1012  
    }
1016  
    return false;
1013  
    return false;
1017  
}
1014  
}
1018  

1015  

1019  
inline bool
1016  
inline bool
1020  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1017  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1021  
{
1018  
{
1022  
    state_ |= 1;
1019  
    state_ |= 1;
1023  
    bool have_waiters = state_ > 1;
1020  
    bool have_waiters = state_ > 1;
1024  
    lock.unlock();
1021  
    lock.unlock();
1025  
    if (have_waiters)
1022  
    if (have_waiters)
1026  
        cond_.notify_one();
1023  
        cond_.notify_one();
1027  
    return have_waiters;
1024  
    return have_waiters;
1028  
}
1025  
}
1029  

1026  

1030  
inline void
1027  
inline void
1031  
epoll_scheduler::clear_signal() const
1028  
epoll_scheduler::clear_signal() const
1032  
{
1029  
{
1033  
    state_ &= ~std::size_t(1);
1030  
    state_ &= ~std::size_t(1);
1034  
}
1031  
}
1035  

1032  

1036  
inline void
1033  
inline void
1037  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1034  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1038  
{
1035  
{
1039  
    while ((state_ & 1) == 0)
1036  
    while ((state_ & 1) == 0)
1040  
    {
1037  
    {
1041  
        state_ += 2;
1038  
        state_ += 2;
1042  
        cond_.wait(lock);
1039  
        cond_.wait(lock);
1043  
        state_ -= 2;
1040  
        state_ -= 2;
1044  
    }
1041  
    }
1045  
}
1042  
}
1046  

1043  

1047  
inline void
1044  
inline void
1048  
epoll_scheduler::wait_for_signal_for(
1045  
epoll_scheduler::wait_for_signal_for(
1049  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1046  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1050  
{
1047  
{
1051  
    if ((state_ & 1) == 0)
1048  
    if ((state_ & 1) == 0)
1052  
    {
1049  
    {
1053  
        state_ += 2;
1050  
        state_ += 2;
1054  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1051  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1055  
        state_ -= 2;
1052  
        state_ -= 2;
1056  
    }
1053  
    }
1057  
}
1054  
}
1058  

1055  

1059  
inline void
1056  
inline void
1060  
epoll_scheduler::wake_one_thread_and_unlock(
1057  
epoll_scheduler::wake_one_thread_and_unlock(
1061  
    std::unique_lock<std::mutex>& lock) const
1058  
    std::unique_lock<std::mutex>& lock) const
1062  
{
1059  
{
1063  
    if (maybe_unlock_and_signal_one(lock))
1060  
    if (maybe_unlock_and_signal_one(lock))
1064  
        return;
1061  
        return;
1065  

1062  

1066  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1063  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1067  
    {
1064  
    {
1068  
        task_interrupted_ = true;
1065  
        task_interrupted_ = true;
1069  
        lock.unlock();
1066  
        lock.unlock();
1070  
        interrupt_reactor();
1067  
        interrupt_reactor();
1071  
    }
1068  
    }
1072  
    else
1069  
    else
1073  
    {
1070  
    {
1074  
        lock.unlock();
1071  
        lock.unlock();
1075  
    }
1072  
    }
1076  
}
1073  
}
1077  

1074  

1078  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1075  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1079  
{
1076  
{
1080  
    if (ctx)
1077  
    if (ctx)
1081  
    {
1078  
    {
1082  
        long produced = ctx->private_outstanding_work;
1079  
        long produced = ctx->private_outstanding_work;
1083  
        if (produced > 1)
1080  
        if (produced > 1)
1084  
            scheduler->outstanding_work_.fetch_add(
1081  
            scheduler->outstanding_work_.fetch_add(
1085  
                produced - 1, std::memory_order_relaxed);
1082  
                produced - 1, std::memory_order_relaxed);
1086  
        else if (produced < 1)
1083  
        else if (produced < 1)
1087  
            scheduler->work_finished();
1084  
            scheduler->work_finished();
1088  
        ctx->private_outstanding_work = 0;
1085  
        ctx->private_outstanding_work = 0;
1089  

1086  

1090  
        if (!ctx->private_queue.empty())
1087  
        if (!ctx->private_queue.empty())
1091  
        {
1088  
        {
1092  
            lock->lock();
1089  
            lock->lock();
1093  
            scheduler->completed_ops_.splice(ctx->private_queue);
1090  
            scheduler->completed_ops_.splice(ctx->private_queue);
1094  
        }
1091  
        }
1095  
    }
1092  
    }
1096  
    else
1093  
    else
1097  
    {
1094  
    {
1098  
        scheduler->work_finished();
1095  
        scheduler->work_finished();
1099  
    }
1096  
    }
1100  
}
1097  
}
1101  

1098  

1102  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1099  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1103  
{
1100  
{
1104  
    if (!ctx)
1101  
    if (!ctx)
1105  
        return;
1102  
        return;
1106  

1103  

1107  
    if (ctx->private_outstanding_work > 0)
1104  
    if (ctx->private_outstanding_work > 0)
1108  
    {
1105  
    {
1109  
        scheduler->outstanding_work_.fetch_add(
1106  
        scheduler->outstanding_work_.fetch_add(
1110  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1107  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1111  
        ctx->private_outstanding_work = 0;
1108  
        ctx->private_outstanding_work = 0;
1112  
    }
1109  
    }
1113  

1110  

1114  
    if (!ctx->private_queue.empty())
1111  
    if (!ctx->private_queue.empty())
1115  
    {
1112  
    {
1116  
        if (!lock->owns_lock())
1113  
        if (!lock->owns_lock())
1117  
            lock->lock();
1114  
            lock->lock();
1118  
        scheduler->completed_ops_.splice(ctx->private_queue);
1115  
        scheduler->completed_ops_.splice(ctx->private_queue);
1119  
    }
1116  
    }
1120  
}
1117  
}
1121  

1118  

1122  
inline void
1119  
inline void
1123  
epoll_scheduler::update_timerfd() const
1120  
epoll_scheduler::update_timerfd() const
1124  
{
1121  
{
1125  
    auto nearest = timer_svc_->nearest_expiry();
1122  
    auto nearest = timer_svc_->nearest_expiry();
1126  

1123  

1127  
    itimerspec ts{};
1124  
    itimerspec ts{};
1128  
    int flags = 0;
1125  
    int flags = 0;
1129  

1126  

1130  
    if (nearest == timer_service::time_point::max())
1127  
    if (nearest == timer_service::time_point::max())
1131  
    {
1128  
    {
1132  
        // No timers - disarm by setting to 0 (relative)
1129  
        // No timers - disarm by setting to 0 (relative)
1133  
    }
1130  
    }
1134  
    else
1131  
    else
1135  
    {
1132  
    {
1136  
        auto now = std::chrono::steady_clock::now();
1133  
        auto now = std::chrono::steady_clock::now();
1137  
        if (nearest <= now)
1134  
        if (nearest <= now)
1138  
        {
1135  
        {
1139  
            // Use 1ns instead of 0 - zero disarms the timerfd
1136  
            // Use 1ns instead of 0 - zero disarms the timerfd
1140  
            ts.it_value.tv_nsec = 1;
1137  
            ts.it_value.tv_nsec = 1;
1141  
        }
1138  
        }
1142  
        else
1139  
        else
1143  
        {
1140  
        {
1144  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1145  
                            nearest - now)
1142  
                            nearest - now)
1146  
                            .count();
1143  
                            .count();
1147  
            ts.it_value.tv_sec  = nsec / 1000000000;
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
1148  
            ts.it_value.tv_nsec = nsec % 1000000000;
1145  
            ts.it_value.tv_nsec = nsec % 1000000000;
1149  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1146  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1150  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1151  
                ts.it_value.tv_nsec = 1;
1148  
                ts.it_value.tv_nsec = 1;
1152  
        }
1149  
        }
1153  
    }
1150  
    }
1154  

1151  

1155  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1156  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1157  
}
1154  
}
1158  

1155  

1159  
inline void
1156  
inline void
1160  
epoll_scheduler::run_task(
1157  
epoll_scheduler::run_task(
1161  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1158  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1162  
{
1159  
{
1163  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1164  

1161  

1165  
    if (lock.owns_lock())
1162  
    if (lock.owns_lock())
1166  
        lock.unlock();
1163  
        lock.unlock();
1167  

1164  

1168  
    task_cleanup on_exit{this, &lock, ctx};
1165  
    task_cleanup on_exit{this, &lock, ctx};
1169  

1166  

1170  
    // Flush deferred timerfd programming before blocking
1167  
    // Flush deferred timerfd programming before blocking
1171  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1172  
        update_timerfd();
1169  
        update_timerfd();
1173  

1170  

1174  
    // Event loop runs without mutex held
1171  
    // Event loop runs without mutex held
1175  
    epoll_event events[128];
1172  
    epoll_event events[128];
1176  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1177  

1174  

1178  
    if (nfds < 0 && errno != EINTR)
1175  
    if (nfds < 0 && errno != EINTR)
1179  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1180  

1177  

1181  
    bool check_timers = false;
1178  
    bool check_timers = false;
1182  
    op_queue local_ops;
1179  
    op_queue local_ops;
1183  

1180  

1184  
    // Process events without holding the mutex
1181  
    // Process events without holding the mutex
1185  
    for (int i = 0; i < nfds; ++i)
1182  
    for (int i = 0; i < nfds; ++i)
1186  
    {
1183  
    {
1187  
        if (events[i].data.ptr == nullptr)
1184  
        if (events[i].data.ptr == nullptr)
1188  
        {
1185  
        {
1189  
            std::uint64_t val;
1186  
            std::uint64_t val;
1190  
            // Mutex released above; analyzer can't track unlock via ref
1187  
            // Mutex released above; analyzer can't track unlock via ref
1191  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1192  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1193  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1194  
            continue;
1191  
            continue;
1195  
        }
1192  
        }
1196  

1193  

1197  
        if (events[i].data.ptr == &timer_fd_)
1194  
        if (events[i].data.ptr == &timer_fd_)
1198  
        {
1195  
        {
1199  
            std::uint64_t expirations;
1196  
            std::uint64_t expirations;
1200  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1201  
            [[maybe_unused]] auto r =
1198  
            [[maybe_unused]] auto r =
1202  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1203  
            check_timers = true;
1200  
            check_timers = true;
1204  
            continue;
1201  
            continue;
1205  
        }
1202  
        }
1206  

1203  

1207  
        // Deferred I/O: just set ready events and enqueue descriptor
1204  
        // Deferred I/O: just set ready events and enqueue descriptor
1208  
        // No per-descriptor mutex locking in reactor hot path!
1205  
        // No per-descriptor mutex locking in reactor hot path!
1209  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1210  
        desc->add_ready_events(events[i].events);
1207  
        desc->add_ready_events(events[i].events);
1211  

1208  

1212  
        // Only enqueue if not already enqueued
1209  
        // Only enqueue if not already enqueued
1213  
        bool expected = false;
1210  
        bool expected = false;
1214  
        if (desc->is_enqueued_.compare_exchange_strong(
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
1215  
                expected, true, std::memory_order_release,
1212  
                expected, true, std::memory_order_release,
1216  
                std::memory_order_relaxed))
1213  
                std::memory_order_relaxed))
1217  
        {
1214  
        {
1218  
            local_ops.push(desc);
1215  
            local_ops.push(desc);
1219  
        }
1216  
        }
1220  
    }
1217  
    }
1221  

1218  

1222  
    // Process timers only when timerfd fires
1219  
    // Process timers only when timerfd fires
1223  
    if (check_timers)
1220  
    if (check_timers)
1224  
    {
1221  
    {
1225  
        timer_svc_->process_expired();
1222  
        timer_svc_->process_expired();
1226  
        update_timerfd();
1223  
        update_timerfd();
1227  
    }
1224  
    }
1228  

1225  

1229  
    lock.lock();
1226  
    lock.lock();
1230  

1227  

1231  
    if (!local_ops.empty())
1228  
    if (!local_ops.empty())
1232  
        completed_ops_.splice(local_ops);
1229  
        completed_ops_.splice(local_ops);
1233  
}
1230  
}
1234  

1231  

1235  
inline std::size_t
1232  
inline std::size_t
1236  
epoll_scheduler::do_one(
1233  
epoll_scheduler::do_one(
1237  
    std::unique_lock<std::mutex>& lock,
1234  
    std::unique_lock<std::mutex>& lock,
1238  
    long timeout_us,
1235  
    long timeout_us,
1239  
    epoll::scheduler_context* ctx)
1236  
    epoll::scheduler_context* ctx)
1240  
{
1237  
{
1241  
    for (;;)
1238  
    for (;;)
1242  
    {
1239  
    {
1243  
        if (stopped_)
1240  
        if (stopped_)
1244  
            return 0;
1241  
            return 0;
1245  

1242  

1246  
        scheduler_op* op = completed_ops_.pop();
1243  
        scheduler_op* op = completed_ops_.pop();
1247  

1244  

1248  
        // Handle reactor sentinel - time to poll for I/O
1245  
        // Handle reactor sentinel - time to poll for I/O
1249  
        if (op == &task_op_)
1246  
        if (op == &task_op_)
1250  
        {
1247  
        {
1251  
            bool more_handlers = !completed_ops_.empty();
1248  
            bool more_handlers = !completed_ops_.empty();
1252  

1249  

1253  
            // Nothing to run the reactor for: no pending work to wait on,
1250  
            // Nothing to run the reactor for: no pending work to wait on,
1254  
            // or caller requested a non-blocking poll
1251  
            // or caller requested a non-blocking poll
1255  
            if (!more_handlers &&
1252  
            if (!more_handlers &&
1256  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1253  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1257  
                 timeout_us == 0))
1254  
                 timeout_us == 0))
1258  
            {
1255  
            {
1259  
                completed_ops_.push(&task_op_);
1256  
                completed_ops_.push(&task_op_);
1260  
                return 0;
1257  
                return 0;
1261  
            }
1258  
            }
1262  

1259  

1263  
            task_interrupted_ = more_handlers || timeout_us == 0;
1260  
            task_interrupted_ = more_handlers || timeout_us == 0;
1264  
            task_running_.store(true, std::memory_order_release);
1261  
            task_running_.store(true, std::memory_order_release);
1265  

1262  

1266  
            if (more_handlers)
1263  
            if (more_handlers)
1267  
                unlock_and_signal_one(lock);
1264  
                unlock_and_signal_one(lock);
1268  

1265  

1269  
            run_task(lock, ctx);
1266  
            run_task(lock, ctx);
1270  

1267  

1271  
            task_running_.store(false, std::memory_order_relaxed);
1268  
            task_running_.store(false, std::memory_order_relaxed);
1272  
            completed_ops_.push(&task_op_);
1269  
            completed_ops_.push(&task_op_);
1273  
            continue;
1270  
            continue;
1274  
        }
1271  
        }
1275  

1272  

1276  
        // Handle operation
1273  
        // Handle operation
1277  
        if (op != nullptr)
1274  
        if (op != nullptr)
1278  
        {
1275  
        {
1279  
            bool more = !completed_ops_.empty();
1276  
            bool more = !completed_ops_.empty();
1280  

1277  

1281  
            if (more)
1278  
            if (more)
1282  
                ctx->unassisted = !unlock_and_signal_one(lock);
1279  
                ctx->unassisted = !unlock_and_signal_one(lock);
1283  
            else
1280  
            else
1284  
            {
1281  
            {
1285  
                ctx->unassisted = false;
1282  
                ctx->unassisted = false;
1286  
                lock.unlock();
1283  
                lock.unlock();
1287  
            }
1284  
            }
1288  

1285  

1289  
            work_cleanup on_exit{this, &lock, ctx};
1286  
            work_cleanup on_exit{this, &lock, ctx};
1290  

1287  

1291  
            (*op)();
1288  
            (*op)();
1292  
            return 1;
1289  
            return 1;
1293  
        }
1290  
        }
1294  

1291  

1295  
        // No pending work to wait on, or caller requested non-blocking poll
1292  
        // No pending work to wait on, or caller requested non-blocking poll
1296  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1293  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1297  
            timeout_us == 0)
1294  
            timeout_us == 0)
1298  
            return 0;
1295  
            return 0;
1299  

1296  

1300  
        clear_signal();
1297  
        clear_signal();
1301  
        if (timeout_us < 0)
1298  
        if (timeout_us < 0)
1302  
            wait_for_signal(lock);
1299  
            wait_for_signal(lock);
1303  
        else
1300  
        else
1304  
            wait_for_signal_for(lock, timeout_us);
1301  
            wait_for_signal_for(lock, timeout_us);
1305  
    }
1302  
    }
1306  
}
1303  
}
1307  

1304  

1308  
} // namespace boost::corosio::detail
1305  
} // namespace boost::corosio::detail
1309  

1306  

1310  
#endif // BOOST_COROSIO_HAS_EPOLL
1307  
#endif // BOOST_COROSIO_HAS_EPOLL
1311  

1308  

1312  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP