include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
80.1% Lines (395/493)
87.5% Functions (42/48)
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_EPOLL | ||
| 16 | |||
| 17 | #include <boost/corosio/detail/config.hpp> | ||
| 18 | #include <boost/capy/ex/execution_context.hpp> | ||
| 19 | |||
| 20 | #include <boost/corosio/native/native_scheduler.hpp> | ||
| 21 | #include <boost/corosio/detail/scheduler_op.hpp> | ||
| 22 | |||
| 23 | #include <boost/corosio/native/detail/epoll/epoll_op.hpp> | ||
| 24 | #include <boost/corosio/detail/timer_service.hpp> | ||
| 25 | #include <boost/corosio/detail/make_err.hpp> | ||
| 26 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | ||
| 27 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | ||
| 28 | |||
| 29 | #include <boost/corosio/detail/except.hpp> | ||
| 30 | #include <boost/corosio/detail/thread_local_ptr.hpp> | ||
| 31 | |||
| 32 | #include <atomic> | ||
| 33 | #include <chrono> | ||
| 34 | #include <condition_variable> | ||
| 35 | #include <cstddef> | ||
| 36 | #include <cstdint> | ||
| 37 | #include <limits> | ||
| 38 | #include <mutex> | ||
| 39 | #include <utility> | ||
| 40 | |||
| 41 | #include <errno.h> | ||
| 42 | #include <fcntl.h> | ||
| 43 | #include <sys/epoll.h> | ||
| 44 | #include <sys/eventfd.h> | ||
| 45 | #include <sys/socket.h> | ||
| 46 | #include <sys/timerfd.h> | ||
| 47 | #include <unistd.h> | ||
| 48 | |||
| 49 | namespace boost::corosio::detail { | ||
| 50 | |||
| 51 | struct epoll_op; | ||
| 52 | struct descriptor_state; | ||
| 53 | namespace epoll { | ||
| 54 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context; | ||
| 55 | } // namespace epoll | ||
| 56 | |||
| 57 | /** Linux scheduler using epoll for I/O multiplexing. | ||
| 58 | |||
| 59 | This scheduler implements the scheduler interface using Linux epoll | ||
| 60 | for efficient I/O event notification. It uses a single reactor model | ||
| 61 | where one thread runs epoll_wait while other threads | ||
| 62 | wait on a condition variable for handler work. This design provides: | ||
| 63 | |||
| 64 | - Handler parallelism: N posted handlers can execute on N threads | ||
| 65 | - No thundering herd: condition_variable wakes exactly one thread | ||
| 66 | - IOCP parity: Behavior matches Windows I/O completion port semantics | ||
| 67 | |||
| 68 | When threads call run(), they first try to execute queued handlers. | ||
| 69 | If the queue is empty and no reactor is running, one thread becomes | ||
| 70 | the reactor and runs epoll_wait. Other threads wait on a condition | ||
| 71 | variable until handlers are available. | ||
| 72 | |||
| 73 | @par Thread Safety | ||
| 74 | All public member functions are thread-safe. | ||
| 75 | */ | ||
| 76 | class BOOST_COROSIO_DECL epoll_scheduler final | ||
| 77 | : public native_scheduler | ||
| 78 | , public capy::execution_context::service | ||
| 79 | { | ||
| 80 | public: | ||
| 81 | using key_type = scheduler; | ||
| 82 | |||
| 83 | /** Construct the scheduler. | ||
| 84 | |||
| 85 | Creates an epoll instance, eventfd for reactor interruption, | ||
| 86 | and timerfd for kernel-managed timer expiry. | ||
| 87 | |||
| 88 | @param ctx Reference to the owning execution_context. | ||
| 89 | @param concurrency_hint Hint for expected thread count (unused). | ||
| 90 | */ | ||
| 91 | epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | ||
| 92 | |||
| 93 | /// Destroy the scheduler. | ||
| 94 | ~epoll_scheduler() override; | ||
| 95 | |||
| 96 | epoll_scheduler(epoll_scheduler const&) = delete; | ||
| 97 | epoll_scheduler& operator=(epoll_scheduler const&) = delete; | ||
| 98 | |||
| 99 | void shutdown() override; | ||
| 100 | void post(std::coroutine_handle<> h) const override; | ||
| 101 | void post(scheduler_op* h) const override; | ||
| 102 | bool running_in_this_thread() const noexcept override; | ||
| 103 | void stop() override; | ||
| 104 | bool stopped() const noexcept override; | ||
| 105 | void restart() override; | ||
| 106 | std::size_t run() override; | ||
| 107 | std::size_t run_one() override; | ||
| 108 | std::size_t wait_one(long usec) override; | ||
| 109 | std::size_t poll() override; | ||
| 110 | std::size_t poll_one() override; | ||
| 111 | |||
| 112 | /** Return the epoll file descriptor. | ||
| 113 | |||
| 114 | Used by socket services to register file descriptors | ||
| 115 | for I/O event notification. | ||
| 116 | |||
| 117 | @return The epoll file descriptor. | ||
| 118 | */ | ||
| 119 | int epoll_fd() const noexcept | ||
| 120 | { | ||
| 121 | return epoll_fd_; | ||
| 122 | } | ||
| 123 | |||
| 124 | /** Reset the thread's inline completion budget. | ||
| 125 | |||
| 126 | Called at the start of each posted completion handler to | ||
| 127 | grant a fresh budget for speculative inline completions. | ||
| 128 | */ | ||
| 129 | void reset_inline_budget() const noexcept; | ||
| 130 | |||
| 131 | /** Consume one unit of inline budget if available. | ||
| 132 | |||
| 133 | @return True if budget was available and consumed. | ||
| 134 | */ | ||
| 135 | bool try_consume_inline_budget() const noexcept; | ||
| 136 | |||
| 137 | /** Register a descriptor for persistent monitoring. | ||
| 138 | |||
| 139 | The fd is registered once and stays registered until explicitly | ||
| 140 | deregistered. Events are dispatched via descriptor_state which | ||
| 141 | tracks pending read/write/connect operations. | ||
| 142 | |||
| 143 | @param fd The file descriptor to register. | ||
| 144 | @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). | ||
| 145 | */ | ||
| 146 | void register_descriptor(int fd, descriptor_state* desc) const; | ||
| 147 | |||
| 148 | /** Deregister a persistently registered descriptor. | ||
| 149 | |||
| 150 | @param fd The file descriptor to deregister. | ||
| 151 | */ | ||
| 152 | void deregister_descriptor(int fd) const; | ||
| 153 | |||
| 154 | void work_started() noexcept override; | ||
| 155 | void work_finished() noexcept override; | ||
| 156 | |||
| 157 | /** Offset a forthcoming work_finished from work_cleanup. | ||
| 158 | |||
| 159 | Called by descriptor_state when all I/O returned EAGAIN and no | ||
| 160 | handler will be executed. Must be called from a scheduler thread. | ||
| 161 | */ | ||
| 162 | void compensating_work_started() const noexcept; | ||
| 163 | |||
| 164 | /** Drain work from thread context's private queue to global queue. | ||
| 165 | |||
| 166 | Called by thread_context_guard destructor when a thread exits run(). | ||
| 167 | Transfers pending work to the global queue under mutex protection. | ||
| 168 | |||
| 169 | @param queue The private queue to drain. | ||
| 170 | @param count Item count for wakeup decisions (wakes other threads if positive). | ||
| 171 | */ | ||
| 172 | void drain_thread_queue(op_queue& queue, long count) const; | ||
| 173 | |||
| 174 | /** Post completed operations for deferred invocation. | ||
| 175 | |||
| 176 | If called from a thread running this scheduler, operations go to | ||
| 177 | the thread's private queue (fast path). Otherwise, operations are | ||
| 178 | added to the global queue under mutex and a waiter is signaled. | ||
| 179 | |||
| 180 | @par Preconditions | ||
| 181 | work_started() must have been called for each operation. | ||
| 182 | |||
| 183 | @param ops Queue of operations to post. | ||
| 184 | */ | ||
| 185 | void post_deferred_completions(op_queue& ops) const; | ||
| 186 | |||
| 187 | private: | ||
| 188 | struct work_cleanup | ||
| 189 | { | ||
| 190 | epoll_scheduler* scheduler; | ||
| 191 | std::unique_lock<std::mutex>* lock; | ||
| 192 | epoll::scheduler_context* ctx; | ||
| 193 | ~work_cleanup(); | ||
| 194 | }; | ||
| 195 | |||
| 196 | struct task_cleanup | ||
| 197 | { | ||
| 198 | epoll_scheduler const* scheduler; | ||
| 199 | std::unique_lock<std::mutex>* lock; | ||
| 200 | epoll::scheduler_context* ctx; | ||
| 201 | ~task_cleanup(); | ||
| 202 | }; | ||
| 203 | |||
| 204 | std::size_t do_one( | ||
| 205 | std::unique_lock<std::mutex>& lock, | ||
| 206 | long timeout_us, | ||
| 207 | epoll::scheduler_context* ctx); | ||
| 208 | void | ||
| 209 | run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx); | ||
| 210 | void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const; | ||
| 211 | void interrupt_reactor() const; | ||
| 212 | void update_timerfd() const; | ||
| 213 | |||
| 214 | /** Set the signaled state and wake all waiting threads. | ||
| 215 | |||
| 216 | @par Preconditions | ||
| 217 | Mutex must be held. | ||
| 218 | |||
| 219 | @param lock The held mutex lock. | ||
| 220 | */ | ||
| 221 | void signal_all(std::unique_lock<std::mutex>& lock) const; | ||
| 222 | |||
| 223 | /** Set the signaled state and wake one waiter if any exist. | ||
| 224 | |||
| 225 | Only unlocks and signals if at least one thread is waiting. | ||
| 226 | Use this when the caller needs to perform a fallback action | ||
| 227 | (such as interrupting the reactor) when no waiters exist. | ||
| 228 | |||
| 229 | @par Preconditions | ||
| 230 | Mutex must be held. | ||
| 231 | |||
| 232 | @param lock The held mutex lock. | ||
| 233 | |||
| 234 | @return `true` if unlocked and signaled, `false` if lock still held. | ||
| 235 | */ | ||
| 236 | bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | ||
| 237 | |||
| 238 | /** Set the signaled state, unlock, and wake one waiter if any exist. | ||
| 239 | |||
| 240 | Always unlocks the mutex. Use this when the caller will release | ||
| 241 | the lock regardless of whether a waiter exists. | ||
| 242 | |||
| 243 | @par Preconditions | ||
| 244 | Mutex must be held. | ||
| 245 | |||
| 246 | @param lock The held mutex lock. | ||
| 247 | |||
| 248 | @return `true` if a waiter was signaled, `false` otherwise. | ||
| 249 | */ | ||
| 250 | bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const; | ||
| 251 | |||
| 252 | /** Clear the signaled state before waiting. | ||
| 253 | |||
| 254 | @par Preconditions | ||
| 255 | Mutex must be held. | ||
| 256 | */ | ||
| 257 | void clear_signal() const; | ||
| 258 | |||
| 259 | /** Block until the signaled state is set. | ||
| 260 | |||
| 261 | Returns immediately if already signaled (fast-path). Otherwise | ||
| 262 | increments the waiter count, waits on the condition variable, | ||
| 263 | and decrements the waiter count upon waking. | ||
| 264 | |||
| 265 | @par Preconditions | ||
| 266 | Mutex must be held. | ||
| 267 | |||
| 268 | @param lock The held mutex lock. | ||
| 269 | */ | ||
| 270 | void wait_for_signal(std::unique_lock<std::mutex>& lock) const; | ||
| 271 | |||
| 272 | /** Block until signaled or timeout expires. | ||
| 273 | |||
| 274 | @par Preconditions | ||
| 275 | Mutex must be held. | ||
| 276 | |||
| 277 | @param lock The held mutex lock. | ||
| 278 | @param timeout_us Maximum time to wait in microseconds. | ||
| 279 | */ | ||
| 280 | void wait_for_signal_for( | ||
| 281 | std::unique_lock<std::mutex>& lock, long timeout_us) const; | ||
| 282 | |||
| 283 | int epoll_fd_; | ||
| 284 | int event_fd_; // for interrupting reactor | ||
| 285 | int timer_fd_; // timerfd for kernel-managed timer expiry | ||
| 286 | mutable std::mutex mutex_; | ||
| 287 | mutable std::condition_variable cond_; | ||
| 288 | mutable op_queue completed_ops_; | ||
| 289 | mutable std::atomic<long> outstanding_work_; | ||
| 290 | bool stopped_; | ||
| 291 | |||
| 292 | // True while a thread is blocked in epoll_wait. Used by | ||
| 293 | // wake_one_thread_and_unlock and work_finished to know when | ||
| 294 | // an eventfd interrupt is needed instead of a condvar signal. | ||
| 295 | mutable std::atomic<bool> task_running_{false}; | ||
| 296 | |||
| 297 | // True when the reactor has been told to do a non-blocking poll | ||
| 298 | // (more handlers queued or poll mode). Prevents redundant eventfd | ||
| 299 | // writes and controls the epoll_wait timeout. | ||
| 300 | mutable bool task_interrupted_ = false; | ||
| 301 | |||
| 302 | // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2) | ||
| 303 | mutable std::size_t state_ = 0; | ||
| 304 | |||
| 305 | // Edge-triggered eventfd state | ||
| 306 | mutable std::atomic<bool> eventfd_armed_{false}; | ||
| 307 | |||
| 308 | // Set when the earliest timer changes; flushed before epoll_wait | ||
| 309 | // blocks. Avoids timerfd_settime syscalls for timers that are | ||
| 310 | // scheduled then cancelled without being waited on. | ||
| 311 | mutable std::atomic<bool> timerfd_stale_{false}; | ||
| 312 | |||
| 313 | // Sentinel operation for interleaving reactor runs with handler execution. | ||
| 314 | // Ensures the reactor runs periodically even when handlers are continuously | ||
| 315 | // posted, preventing starvation of I/O events, timers, and signals. | ||
| 316 | struct task_op final : scheduler_op | ||
| 317 | { | ||
| 318 | ✗ | void operator()() override {} | |
| 319 | ✗ | void destroy() override {} | |
| 320 | }; | ||
| 321 | task_op task_op_; | ||
| 322 | }; | ||
| 323 | |||
| 324 | //-------------------------------------------------------------------------- | ||
| 325 | // | ||
| 326 | // Implementation | ||
| 327 | // | ||
| 328 | //-------------------------------------------------------------------------- | ||
| 329 | |||
| 330 | /* | ||
| 331 | epoll Scheduler - Single Reactor Model | ||
| 332 | ====================================== | ||
| 333 | |||
| 334 | This scheduler uses a thread coordination strategy to provide handler | ||
| 335 | parallelism and avoid the thundering herd problem. | ||
| 336 | Instead of all threads blocking on epoll_wait(), one thread becomes the | ||
| 337 | "reactor" while others wait on a condition variable for handler work. | ||
| 338 | |||
| 339 | Thread Model | ||
| 340 | ------------ | ||
| 341 | - ONE thread runs epoll_wait() at a time (the reactor thread) | ||
| 342 | - OTHER threads wait on cond_ (condition variable) for handlers | ||
| 343 | - When work is posted, exactly one waiting thread wakes via notify_one() | ||
| 344 | - This matches Windows IOCP semantics where N posted items wake N threads | ||
| 345 | |||
| 346 | Event Loop Structure (do_one) | ||
| 347 | ----------------------------- | ||
| 348 | 1. Lock mutex, try to pop handler from queue | ||
| 349 | 2. If got handler: execute it (unlocked), return | ||
| 350 | 3. If queue empty and no reactor running: become reactor | ||
| 351 | - Run epoll_wait (unlocked), queue I/O completions, loop back | ||
| 352 | 4. If queue empty and reactor running: wait on condvar for work | ||
| 353 | |||
| 354 | The task_running_ flag ensures only one thread owns epoll_wait(). | ||
| 355 | After the reactor queues I/O completions, it loops back to try getting | ||
| 356 | a handler, giving priority to handler execution over more I/O polling. | ||
| 357 | |||
| 358 | Signaling State (state_) | ||
| 359 | ------------------------ | ||
| 360 | The state_ variable encodes two pieces of information: | ||
| 361 | - Bit 0: signaled flag (1 = signaled, persists until cleared) | ||
| 362 | - Upper bits: waiter count (each waiter adds 2 before blocking) | ||
| 363 | |||
| 364 | This allows efficient coordination: | ||
| 365 | - Signalers only call notify when waiters exist (state_ > 1) | ||
| 366 | - Waiters check if already signaled before blocking (fast-path) | ||
| 367 | |||
| 368 | Wake Coordination (wake_one_thread_and_unlock) | ||
| 369 | ---------------------------------------------- | ||
| 370 | When posting work: | ||
| 371 | - If waiters exist (state_ > 1): signal and notify_one() | ||
| 372 | - Else if reactor running: interrupt via eventfd write | ||
| 373 | - Else: no-op (thread will find work when it checks queue) | ||
| 374 | |||
| 375 | This avoids waking threads unnecessarily. With cascading wakes, | ||
| 376 | each handler execution wakes at most one additional thread if | ||
| 377 | more work exists in the queue. | ||
| 378 | |||
| 379 | Work Counting | ||
| 380 | ------------- | ||
| 381 | outstanding_work_ tracks pending operations. When it hits zero, run() | ||
| 382 | returns. Each operation increments on start, decrements on completion. | ||
| 383 | |||
| 384 | Timer Integration | ||
| 385 | ----------------- | ||
| 386 | Timers are handled by timer_service. The reactor adjusts epoll_wait | ||
| 387 | timeout to wake for the nearest timer expiry. When a new timer is | ||
| 388 | scheduled earlier than current, timer_service calls interrupt_reactor() | ||
| 389 | to re-evaluate the timeout. | ||
| 390 | */ | ||
| 391 | |||
| 392 | namespace epoll { | ||
| 393 | |||
| 394 | struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context | ||
| 395 | { | ||
| 396 | epoll_scheduler const* key; | ||
| 397 | scheduler_context* next; | ||
| 398 | op_queue private_queue; | ||
| 399 | long private_outstanding_work; | ||
| 400 | int inline_budget; | ||
| 401 | int inline_budget_max; | ||
| 402 | bool unassisted; | ||
| 403 | |||
| 404 | 192 | scheduler_context(epoll_scheduler const* k, scheduler_context* n) | |
| 405 | 192 | : key(k) | |
| 406 | 192 | , next(n) | |
| 407 | 192 | , private_outstanding_work(0) | |
| 408 | 192 | , inline_budget(0) | |
| 409 | 192 | , inline_budget_max(2) | |
| 410 | 192 | , unassisted(false) | |
| 411 | { | ||
| 412 | 192 | } | |
| 413 | }; | ||
| 414 | |||
| 415 | inline thread_local_ptr<scheduler_context> context_stack; | ||
| 416 | |||
| 417 | struct thread_context_guard | ||
| 418 | { | ||
| 419 | scheduler_context frame_; | ||
| 420 | |||
| 421 | 192 | explicit thread_context_guard(epoll_scheduler const* ctx) noexcept | |
| 422 | 192 | : frame_(ctx, context_stack.get()) | |
| 423 | { | ||
| 424 | 192 | context_stack.set(&frame_); | |
| 425 | 192 | } | |
| 426 | |||
| 427 | 192 | ~thread_context_guard() noexcept | |
| 428 | { | ||
| 429 | 192 | if (!frame_.private_queue.empty()) | |
| 430 | ✗ | frame_.key->drain_thread_queue( | |
| 431 | ✗ | frame_.private_queue, frame_.private_outstanding_work); | |
| 432 | 192 | context_stack.set(frame_.next); | |
| 433 | 192 | } | |
| 434 | }; | ||
| 435 | |||
| 436 | inline scheduler_context* | ||
| 437 | 428484 | find_context(epoll_scheduler const* self) noexcept | |
| 438 | { | ||
| 439 | 428484 | for (auto* c = context_stack.get(); c != nullptr; c = c->next) | |
| 440 | 426782 | if (c->key == self) | |
| 441 | 426782 | return c; | |
| 442 | 1702 | return nullptr; | |
| 443 | } | ||
| 444 | |||
| 445 | } // namespace epoll | ||
| 446 | |||
| 447 | inline void | ||
| 448 | 62171 | epoll_scheduler::reset_inline_budget() const noexcept | |
| 449 | { | ||
| 450 | 62171 | if (auto* ctx = epoll::find_context(this)) | |
| 451 | { | ||
| 452 | // Cap when no other thread absorbed queued work. A moderate | ||
| 453 | // cap (4) amortizes scheduling for small buffers while avoiding | ||
| 454 | // bursty I/O that fills socket buffers and stalls large transfers. | ||
| 455 | 62171 | if (ctx->unassisted) | |
| 456 | { | ||
| 457 | 62171 | ctx->inline_budget_max = 4; | |
| 458 | 62171 | ctx->inline_budget = 4; | |
| 459 | 62171 | return; | |
| 460 | } | ||
| 461 | // Ramp up when previous cycle fully consumed budget. | ||
| 462 | // Reset on partial consumption (EAGAIN hit or peer got scheduled). | ||
| 463 | ✗ | if (ctx->inline_budget == 0) | |
| 464 | ✗ | ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16); | |
| 465 | ✗ | else if (ctx->inline_budget < ctx->inline_budget_max) | |
| 466 | ✗ | ctx->inline_budget_max = 2; | |
| 467 | ✗ | ctx->inline_budget = ctx->inline_budget_max; | |
| 468 | } | ||
| 469 | } | ||
| 470 | |||
| 471 | inline bool | ||
| 472 | 265165 | epoll_scheduler::try_consume_inline_budget() const noexcept | |
| 473 | { | ||
| 474 | 265165 | if (auto* ctx = epoll::find_context(this)) | |
| 475 | { | ||
| 476 | 265165 | if (ctx->inline_budget > 0) | |
| 477 | { | ||
| 478 | 212206 | --ctx->inline_budget; | |
| 479 | 212206 | return true; | |
| 480 | } | ||
| 481 | } | ||
| 482 | 52959 | return false; | |
| 483 | } | ||
| 484 | |||
| 485 | inline void | ||
| 486 | 45652 | descriptor_state::operator()() | |
| 487 | { | ||
| 488 | 45652 | is_enqueued_.store(false, std::memory_order_relaxed); | |
| 489 | |||
| 490 | // Take ownership of impl ref set by close_socket() to prevent | ||
| 491 | // the owning impl from being freed while we're executing | ||
| 492 | 45652 | auto prevent_impl_destruction = std::move(impl_ref_); | |
| 493 | |||
| 494 | 45652 | std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire); | |
| 495 | 45652 | if (ev == 0) | |
| 496 | { | ||
| 497 | ✗ | scheduler_->compensating_work_started(); | |
| 498 | ✗ | return; | |
| 499 | } | ||
| 500 | |||
| 501 | 45652 | op_queue local_ops; | |
| 502 | |||
| 503 | 45652 | int err = 0; | |
| 504 | 45652 | if (ev & EPOLLERR) | |
| 505 | { | ||
| 506 | 1 | socklen_t len = sizeof(err); | |
| 507 | 1 | if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) | |
| 508 | ✗ | err = errno; | |
| 509 | 1 | if (err == 0) | |
| 510 | ✗ | err = EIO; | |
| 511 | } | ||
| 512 | |||
| 513 | { | ||
| 514 | 45652 | std::lock_guard lock(mutex); | |
| 515 | 45652 | if (ev & EPOLLIN) | |
| 516 | { | ||
| 517 | 14527 | if (read_op) | |
| 518 | { | ||
| 519 | 4502 | auto* rd = read_op; | |
| 520 | 4502 | if (err) | |
| 521 | ✗ | rd->complete(err, 0); | |
| 522 | else | ||
| 523 | 4502 | rd->perform_io(); | |
| 524 | |||
| 525 | 4502 | if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK) | |
| 526 | { | ||
| 527 | ✗ | rd->errn = 0; | |
| 528 | } | ||
| 529 | else | ||
| 530 | { | ||
| 531 | 4502 | read_op = nullptr; | |
| 532 | 4502 | local_ops.push(rd); | |
| 533 | } | ||
| 534 | } | ||
| 535 | else | ||
| 536 | { | ||
| 537 | 10025 | read_ready = true; | |
| 538 | } | ||
| 539 | } | ||
| 540 | 45652 | if (ev & EPOLLOUT) | |
| 541 | { | ||
| 542 | 41154 | bool had_write_op = (connect_op || write_op); | |
| 543 | 41154 | if (connect_op) | |
| 544 | { | ||
| 545 | 4502 | auto* cn = connect_op; | |
| 546 | 4502 | if (err) | |
| 547 | 1 | cn->complete(err, 0); | |
| 548 | else | ||
| 549 | 4501 | cn->perform_io(); | |
| 550 | 4502 | connect_op = nullptr; | |
| 551 | 4502 | local_ops.push(cn); | |
| 552 | } | ||
| 553 | 41154 | if (write_op) | |
| 554 | { | ||
| 555 | ✗ | auto* wr = write_op; | |
| 556 | ✗ | if (err) | |
| 557 | ✗ | wr->complete(err, 0); | |
| 558 | else | ||
| 559 | ✗ | wr->perform_io(); | |
| 560 | |||
| 561 | ✗ | if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK) | |
| 562 | { | ||
| 563 | ✗ | wr->errn = 0; | |
| 564 | } | ||
| 565 | else | ||
| 566 | { | ||
| 567 | ✗ | write_op = nullptr; | |
| 568 | ✗ | local_ops.push(wr); | |
| 569 | } | ||
| 570 | } | ||
| 571 | 41154 | if (!had_write_op) | |
| 572 | 36652 | write_ready = true; | |
| 573 | } | ||
| 574 | 45652 | if (err) | |
| 575 | { | ||
| 576 | 1 | if (read_op) | |
| 577 | { | ||
| 578 | ✗ | read_op->complete(err, 0); | |
| 579 | ✗ | local_ops.push(std::exchange(read_op, nullptr)); | |
| 580 | } | ||
| 581 | 1 | if (write_op) | |
| 582 | { | ||
| 583 | ✗ | write_op->complete(err, 0); | |
| 584 | ✗ | local_ops.push(std::exchange(write_op, nullptr)); | |
| 585 | } | ||
| 586 | 1 | if (connect_op) | |
| 587 | { | ||
| 588 | ✗ | connect_op->complete(err, 0); | |
| 589 | ✗ | local_ops.push(std::exchange(connect_op, nullptr)); | |
| 590 | } | ||
| 591 | } | ||
| 592 | 45652 | } | |
| 593 | |||
| 594 | // Execute first handler inline — the scheduler's work_cleanup | ||
| 595 | // accounts for this as the "consumed" work item | ||
| 596 | 45652 | scheduler_op* first = local_ops.pop(); | |
| 597 | 45652 | if (first) | |
| 598 | { | ||
| 599 | 9004 | scheduler_->post_deferred_completions(local_ops); | |
| 600 | 9004 | (*first)(); | |
| 601 | } | ||
| 602 | else | ||
| 603 | { | ||
| 604 | 36648 | scheduler_->compensating_work_started(); | |
| 605 | } | ||
| 606 | 45652 | } | |
| 607 | |||
| 608 | 210 | inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) | |
| 609 | 210 | : epoll_fd_(-1) | |
| 610 | 210 | , event_fd_(-1) | |
| 611 | 210 | , timer_fd_(-1) | |
| 612 | 210 | , outstanding_work_(0) | |
| 613 | 210 | , stopped_(false) | |
| 614 | 210 | , task_running_{false} | |
| 615 | 210 | , task_interrupted_(false) | |
| 616 | 420 | , state_(0) | |
| 617 | { | ||
| 618 | 210 | epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); | |
| 619 | 210 | if (epoll_fd_ < 0) | |
| 620 | ✗ | detail::throw_system_error(make_err(errno), "epoll_create1"); | |
| 621 | |||
| 622 | 210 | event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); | |
| 623 | 210 | if (event_fd_ < 0) | |
| 624 | { | ||
| 625 | ✗ | int errn = errno; | |
| 626 | ✗ | ::close(epoll_fd_); | |
| 627 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); | |
| 628 | } | ||
| 629 | |||
| 630 | 210 | timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); | |
| 631 | 210 | if (timer_fd_ < 0) | |
| 632 | { | ||
| 633 | ✗ | int errn = errno; | |
| 634 | ✗ | ::close(event_fd_); | |
| 635 | ✗ | ::close(epoll_fd_); | |
| 636 | ✗ | detail::throw_system_error(make_err(errn), "timerfd_create"); | |
| 637 | } | ||
| 638 | |||
| 639 | 210 | epoll_event ev{}; | |
| 640 | 210 | ev.events = EPOLLIN | EPOLLET; | |
| 641 | 210 | ev.data.ptr = nullptr; | |
| 642 | 210 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) | |
| 643 | { | ||
| 644 | ✗ | int errn = errno; | |
| 645 | ✗ | ::close(timer_fd_); | |
| 646 | ✗ | ::close(event_fd_); | |
| 647 | ✗ | ::close(epoll_fd_); | |
| 648 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl"); | |
| 649 | } | ||
| 650 | |||
| 651 | 210 | epoll_event timer_ev{}; | |
| 652 | 210 | timer_ev.events = EPOLLIN | EPOLLERR; | |
| 653 | 210 | timer_ev.data.ptr = &timer_fd_; | |
| 654 | 210 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0) | |
| 655 | { | ||
| 656 | ✗ | int errn = errno; | |
| 657 | ✗ | ::close(timer_fd_); | |
| 658 | ✗ | ::close(event_fd_); | |
| 659 | ✗ | ::close(epoll_fd_); | |
| 660 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)"); | |
| 661 | } | ||
| 662 | |||
| 663 | 210 | timer_svc_ = &get_timer_service(ctx, *this); | |
| 664 | 210 | timer_svc_->set_on_earliest_changed( | |
| 665 | 4926 | timer_service::callback(this, [](void* p) { | |
| 666 | 4716 | auto* self = static_cast<epoll_scheduler*>(p); | |
| 667 | 4716 | self->timerfd_stale_.store(true, std::memory_order_release); | |
| 668 | 4716 | if (self->task_running_.load(std::memory_order_acquire)) | |
| 669 | ✗ | self->interrupt_reactor(); | |
| 670 | 4716 | })); | |
| 671 | |||
| 672 | // Initialize resolver service | ||
| 673 | 210 | get_resolver_service(ctx, *this); | |
| 674 | |||
| 675 | // Initialize signal service | ||
| 676 | 210 | get_signal_service(ctx, *this); | |
| 677 | |||
| 678 | // Push task sentinel to interleave reactor runs with handler execution | ||
| 679 | 210 | completed_ops_.push(&task_op_); | |
| 680 | 210 | } | |
| 681 | |||
| 682 | 420 | inline epoll_scheduler::~epoll_scheduler() | |
| 683 | { | ||
| 684 | 210 | if (timer_fd_ >= 0) | |
| 685 | 210 | ::close(timer_fd_); | |
| 686 | 210 | if (event_fd_ >= 0) | |
| 687 | 210 | ::close(event_fd_); | |
| 688 | 210 | if (epoll_fd_ >= 0) | |
| 689 | 210 | ::close(epoll_fd_); | |
| 690 | 420 | } | |
| 691 | |||
| 692 | inline void | ||
| 693 | 210 | epoll_scheduler::shutdown() | |
| 694 | { | ||
| 695 | { | ||
| 696 | 210 | std::unique_lock lock(mutex_); | |
| 697 | |||
| 698 | 459 | while (auto* h = completed_ops_.pop()) | |
| 699 | { | ||
| 700 | 249 | if (h == &task_op_) | |
| 701 | 210 | continue; | |
| 702 | 39 | lock.unlock(); | |
| 703 | 39 | h->destroy(); | |
| 704 | 39 | lock.lock(); | |
| 705 | 249 | } | |
| 706 | |||
| 707 | 210 | signal_all(lock); | |
| 708 | 210 | } | |
| 709 | |||
| 710 | 210 | if (event_fd_ >= 0) | |
| 711 | 210 | interrupt_reactor(); | |
| 712 | 210 | } | |
| 713 | |||
| 714 | inline void | ||
| 715 | 6565 | epoll_scheduler::post(std::coroutine_handle<> h) const | |
| 716 | { | ||
| 717 | struct post_handler final : scheduler_op | ||
| 718 | { | ||
| 719 | std::coroutine_handle<> h_; | ||
| 720 | |||
| 721 | 6565 | explicit post_handler(std::coroutine_handle<> h) : h_(h) {} | |
| 722 | |||
| 723 | 13130 | ~post_handler() override = default; | |
| 724 | |||
| 725 | 6559 | void operator()() override | |
| 726 | { | ||
| 727 | 6559 | auto h = h_; | |
| 728 | 6559 | delete this; | |
| 729 | 6559 | h.resume(); | |
| 730 | 6559 | } | |
| 731 | |||
| 732 | 6 | void destroy() override | |
| 733 | { | ||
| 734 | 6 | auto h = h_; | |
| 735 | 6 | delete this; | |
| 736 | 6 | h.destroy(); | |
| 737 | 6 | } | |
| 738 | }; | ||
| 739 | |||
| 740 | 6565 | auto ph = std::make_unique<post_handler>(h); | |
| 741 | |||
| 742 | // Fast path: same thread posts to private queue | ||
| 743 | // Only count locally; work_cleanup batches to global counter | ||
| 744 | 6565 | if (auto* ctx = epoll::find_context(this)) | |
| 745 | { | ||
| 746 | 4893 | ++ctx->private_outstanding_work; | |
| 747 | 4893 | ctx->private_queue.push(ph.release()); | |
| 748 | 4893 | return; | |
| 749 | } | ||
| 750 | |||
| 751 | // Slow path: cross-thread post requires mutex | ||
| 752 | 1672 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 753 | |||
| 754 | 1672 | std::unique_lock lock(mutex_); | |
| 755 | 1672 | completed_ops_.push(ph.release()); | |
| 756 | 1672 | wake_one_thread_and_unlock(lock); | |
| 757 | 6565 | } | |
| 758 | |||
| 759 | inline void | ||
| 760 | 57935 | epoll_scheduler::post(scheduler_op* h) const | |
| 761 | { | ||
| 762 | // Fast path: same thread posts to private queue | ||
| 763 | // Only count locally; work_cleanup batches to global counter | ||
| 764 | 57935 | if (auto* ctx = epoll::find_context(this)) | |
| 765 | { | ||
| 766 | 57905 | ++ctx->private_outstanding_work; | |
| 767 | 57905 | ctx->private_queue.push(h); | |
| 768 | 57905 | return; | |
| 769 | } | ||
| 770 | |||
| 771 | // Slow path: cross-thread post requires mutex | ||
| 772 | 30 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 773 | |||
| 774 | 30 | std::unique_lock lock(mutex_); | |
| 775 | 30 | completed_ops_.push(h); | |
| 776 | 30 | wake_one_thread_and_unlock(lock); | |
| 777 | 30 | } | |
| 778 | |||
| 779 | inline bool | ||
| 780 | 709 | epoll_scheduler::running_in_this_thread() const noexcept | |
| 781 | { | ||
| 782 | 709 | for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next) | |
| 783 | 456 | if (c->key == this) | |
| 784 | 456 | return true; | |
| 785 | 253 | return false; | |
| 786 | } | ||
| 787 | |||
| 788 | inline void | ||
| 789 | 205 | epoll_scheduler::stop() | |
| 790 | { | ||
| 791 | 205 | std::unique_lock lock(mutex_); | |
| 792 | 205 | if (!stopped_) | |
| 793 | { | ||
| 794 | 170 | stopped_ = true; | |
| 795 | 170 | signal_all(lock); | |
| 796 | 170 | interrupt_reactor(); | |
| 797 | } | ||
| 798 | 205 | } | |
| 799 | |||
| 800 | inline bool | ||
| 801 | 18 | epoll_scheduler::stopped() const noexcept | |
| 802 | { | ||
| 803 | 18 | std::unique_lock lock(mutex_); | |
| 804 | 36 | return stopped_; | |
| 805 | 18 | } | |
| 806 | |||
| 807 | inline void | ||
| 808 | 52 | epoll_scheduler::restart() | |
| 809 | { | ||
| 810 | 52 | std::unique_lock lock(mutex_); | |
| 811 | 52 | stopped_ = false; | |
| 812 | 52 | } | |
| 813 | |||
| 814 | inline std::size_t | ||
| 815 | 188 | epoll_scheduler::run() | |
| 816 | { | ||
| 817 | 376 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 818 | { | ||
| 819 | 30 | stop(); | |
| 820 | 30 | return 0; | |
| 821 | } | ||
| 822 | |||
| 823 | 158 | epoll::thread_context_guard ctx(this); | |
| 824 | 158 | std::unique_lock lock(mutex_); | |
| 825 | |||
| 826 | 158 | std::size_t n = 0; | |
| 827 | for (;;) | ||
| 828 | { | ||
| 829 | 110265 | if (!do_one(lock, -1, &ctx.frame_)) | |
| 830 | 158 | break; | |
| 831 | 110107 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 832 | 110107 | ++n; | |
| 833 | 110107 | if (!lock.owns_lock()) | |
| 834 | 52038 | lock.lock(); | |
| 835 | } | ||
| 836 | 158 | return n; | |
| 837 | 158 | } | |
| 838 | |||
| 839 | inline std::size_t | ||
| 840 | 2 | epoll_scheduler::run_one() | |
| 841 | { | ||
| 842 | 4 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 843 | { | ||
| 844 | ✗ | stop(); | |
| 845 | ✗ | return 0; | |
| 846 | } | ||
| 847 | |||
| 848 | 2 | epoll::thread_context_guard ctx(this); | |
| 849 | 2 | std::unique_lock lock(mutex_); | |
| 850 | 2 | return do_one(lock, -1, &ctx.frame_); | |
| 851 | 2 | } | |
| 852 | |||
| 853 | inline std::size_t | ||
| 854 | 34 | epoll_scheduler::wait_one(long usec) | |
| 855 | { | ||
| 856 | 68 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 857 | { | ||
| 858 | 7 | stop(); | |
| 859 | 7 | return 0; | |
| 860 | } | ||
| 861 | |||
| 862 | 27 | epoll::thread_context_guard ctx(this); | |
| 863 | 27 | std::unique_lock lock(mutex_); | |
| 864 | 27 | return do_one(lock, usec, &ctx.frame_); | |
| 865 | 27 | } | |
| 866 | |||
| 867 | inline std::size_t | ||
| 868 | 4 | epoll_scheduler::poll() | |
| 869 | { | ||
| 870 | 8 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 871 | { | ||
| 872 | 1 | stop(); | |
| 873 | 1 | return 0; | |
| 874 | } | ||
| 875 | |||
| 876 | 3 | epoll::thread_context_guard ctx(this); | |
| 877 | 3 | std::unique_lock lock(mutex_); | |
| 878 | |||
| 879 | 3 | std::size_t n = 0; | |
| 880 | for (;;) | ||
| 881 | { | ||
| 882 | 7 | if (!do_one(lock, 0, &ctx.frame_)) | |
| 883 | 3 | break; | |
| 884 | 4 | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 885 | 4 | ++n; | |
| 886 | 4 | if (!lock.owns_lock()) | |
| 887 | 4 | lock.lock(); | |
| 888 | } | ||
| 889 | 3 | return n; | |
| 890 | 3 | } | |
| 891 | |||
| 892 | inline std::size_t | ||
| 893 | 4 | epoll_scheduler::poll_one() | |
| 894 | { | ||
| 895 | 8 | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 896 | { | ||
| 897 | 2 | stop(); | |
| 898 | 2 | return 0; | |
| 899 | } | ||
| 900 | |||
| 901 | 2 | epoll::thread_context_guard ctx(this); | |
| 902 | 2 | std::unique_lock lock(mutex_); | |
| 903 | 2 | return do_one(lock, 0, &ctx.frame_); | |
| 904 | 2 | } | |
| 905 | |||
| 906 | inline void | ||
| 907 | 9077 | epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const | |
| 908 | { | ||
| 909 | 9077 | epoll_event ev{}; | |
| 910 | 9077 | ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; | |
| 911 | 9077 | ev.data.ptr = desc; | |
| 912 | |||
| 913 | 9077 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) | |
| 914 | ✗ | detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); | |
| 915 | |||
| 916 | 9077 | desc->registered_events = ev.events; | |
| 917 | 9077 | desc->fd = fd; | |
| 918 | 9077 | desc->scheduler_ = this; | |
| 919 | |||
| 920 | 9077 | std::lock_guard lock(desc->mutex); | |
| 921 | 9077 | desc->read_ready = false; | |
| 922 | 9077 | desc->write_ready = false; | |
| 923 | 9077 | } | |
| 924 | |||
| 925 | inline void | ||
| 926 | 9077 | epoll_scheduler::deregister_descriptor(int fd) const | |
| 927 | { | ||
| 928 | 9077 | ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); | |
| 929 | 9077 | } | |
| 930 | |||
| 931 | inline void | ||
| 932 | 14664 | epoll_scheduler::work_started() noexcept | |
| 933 | { | ||
| 934 | 14664 | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 935 | 14664 | } | |
| 936 | |||
| 937 | inline void | ||
| 938 | 21074 | epoll_scheduler::work_finished() noexcept | |
| 939 | { | ||
| 940 | 42148 | if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) | |
| 941 | 163 | stop(); | |
| 942 | 21074 | } | |
| 943 | |||
| 944 | inline void | ||
| 945 | 36648 | epoll_scheduler::compensating_work_started() const noexcept | |
| 946 | { | ||
| 947 | 36648 | auto* ctx = epoll::find_context(this); | |
| 948 | 36648 | if (ctx) | |
| 949 | 36648 | ++ctx->private_outstanding_work; | |
| 950 | 36648 | } | |
| 951 | |||
| 952 | inline void | ||
| 953 | ✗ | epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const | |
| 954 | { | ||
| 955 | // Note: outstanding_work_ was already incremented when posting | ||
| 956 | ✗ | std::unique_lock lock(mutex_); | |
| 957 | ✗ | completed_ops_.splice(queue); | |
| 958 | ✗ | if (count > 0) | |
| 959 | ✗ | maybe_unlock_and_signal_one(lock); | |
| 960 | ✗ | } | |
| 961 | |||
| 962 | inline void | ||
| 963 | 9004 | epoll_scheduler::post_deferred_completions(op_queue& ops) const | |
| 964 | { | ||
| 965 | 9004 | if (ops.empty()) | |
| 966 | 9004 | return; | |
| 967 | |||
| 968 | // Fast path: if on scheduler thread, use private queue | ||
| 969 | ✗ | if (auto* ctx = epoll::find_context(this)) | |
| 970 | { | ||
| 971 | ✗ | ctx->private_queue.splice(ops); | |
| 972 | ✗ | return; | |
| 973 | } | ||
| 974 | |||
| 975 | // Slow path: add to global queue and wake a thread | ||
| 976 | ✗ | std::unique_lock lock(mutex_); | |
| 977 | ✗ | completed_ops_.splice(ops); | |
| 978 | ✗ | wake_one_thread_and_unlock(lock); | |
| 979 | ✗ | } | |
| 980 | |||
| 981 | inline void | ||
| 982 | 406 | epoll_scheduler::interrupt_reactor() const | |
| 983 | { | ||
| 984 | // Only write if not already armed to avoid redundant writes | ||
| 985 | 406 | bool expected = false; | |
| 986 | 406 | if (eventfd_armed_.compare_exchange_strong( | |
| 987 | expected, true, std::memory_order_release, | ||
| 988 | std::memory_order_relaxed)) | ||
| 989 | { | ||
| 990 | 280 | std::uint64_t val = 1; | |
| 991 | 280 | [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); | |
| 992 | } | ||
| 993 | 406 | } | |
| 994 | |||
| 995 | inline void | ||
| 996 | 380 | epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const | |
| 997 | { | ||
| 998 | 380 | state_ |= 1; | |
| 999 | 380 | cond_.notify_all(); | |
| 1000 | 380 | } | |
| 1001 | |||
| 1002 | inline bool | ||
| 1003 | 1702 | epoll_scheduler::maybe_unlock_and_signal_one( | |
| 1004 | std::unique_lock<std::mutex>& lock) const | ||
| 1005 | { | ||
| 1006 | 1702 | state_ |= 1; | |
| 1007 | 1702 | if (state_ > 1) | |
| 1008 | { | ||
| 1009 | ✗ | lock.unlock(); | |
| 1010 | ✗ | cond_.notify_one(); | |
| 1011 | ✗ | return true; | |
| 1012 | } | ||
| 1013 | 1702 | return false; | |
| 1014 | } | ||
| 1015 | |||
| 1016 | inline bool | ||
| 1017 | 139335 | epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const | |
| 1018 | { | ||
| 1019 | 139335 | state_ |= 1; | |
| 1020 | 139335 | bool have_waiters = state_ > 1; | |
| 1021 | 139335 | lock.unlock(); | |
| 1022 | 139335 | if (have_waiters) | |
| 1023 | ✗ | cond_.notify_one(); | |
| 1024 | 139335 | return have_waiters; | |
| 1025 | } | ||
| 1026 | |||
| 1027 | inline void | ||
| 1028 | ✗ | epoll_scheduler::clear_signal() const | |
| 1029 | { | ||
| 1030 | ✗ | state_ &= ~std::size_t(1); | |
| 1031 | ✗ | } | |
| 1032 | |||
| 1033 | inline void | ||
| 1034 | ✗ | epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const | |
| 1035 | { | ||
| 1036 | ✗ | while ((state_ & 1) == 0) | |
| 1037 | { | ||
| 1038 | ✗ | state_ += 2; | |
| 1039 | ✗ | cond_.wait(lock); | |
| 1040 | ✗ | state_ -= 2; | |
| 1041 | } | ||
| 1042 | ✗ | } | |
| 1043 | |||
| 1044 | inline void | ||
| 1045 | ✗ | epoll_scheduler::wait_for_signal_for( | |
| 1046 | std::unique_lock<std::mutex>& lock, long timeout_us) const | ||
| 1047 | { | ||
| 1048 | ✗ | if ((state_ & 1) == 0) | |
| 1049 | { | ||
| 1050 | ✗ | state_ += 2; | |
| 1051 | ✗ | cond_.wait_for(lock, std::chrono::microseconds(timeout_us)); | |
| 1052 | ✗ | state_ -= 2; | |
| 1053 | } | ||
| 1054 | ✗ | } | |
| 1055 | |||
| 1056 | inline void | ||
| 1057 | 1702 | epoll_scheduler::wake_one_thread_and_unlock( | |
| 1058 | std::unique_lock<std::mutex>& lock) const | ||
| 1059 | { | ||
| 1060 | 1702 | if (maybe_unlock_and_signal_one(lock)) | |
| 1061 | ✗ | return; | |
| 1062 | |||
| 1063 | 1702 | if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_) | |
| 1064 | { | ||
| 1065 | 26 | task_interrupted_ = true; | |
| 1066 | 26 | lock.unlock(); | |
| 1067 | 26 | interrupt_reactor(); | |
| 1068 | } | ||
| 1069 | else | ||
| 1070 | { | ||
| 1071 | 1676 | lock.unlock(); | |
| 1072 | } | ||
| 1073 | } | ||
| 1074 | |||
| 1075 | 110142 | inline epoll_scheduler::work_cleanup::~work_cleanup() | |
| 1076 | { | ||
| 1077 | 110142 | if (ctx) | |
| 1078 | { | ||
| 1079 | 110142 | long produced = ctx->private_outstanding_work; | |
| 1080 | 110142 | if (produced > 1) | |
| 1081 | 7 | scheduler->outstanding_work_.fetch_add( | |
| 1082 | produced - 1, std::memory_order_relaxed); | ||
| 1083 | 110135 | else if (produced < 1) | |
| 1084 | 15414 | scheduler->work_finished(); | |
| 1085 | 110142 | ctx->private_outstanding_work = 0; | |
| 1086 | |||
| 1087 | 110142 | if (!ctx->private_queue.empty()) | |
| 1088 | { | ||
| 1089 | 58080 | lock->lock(); | |
| 1090 | 58080 | scheduler->completed_ops_.splice(ctx->private_queue); | |
| 1091 | } | ||
| 1092 | } | ||
| 1093 | else | ||
| 1094 | { | ||
| 1095 | ✗ | scheduler->work_finished(); | |
| 1096 | } | ||
| 1097 | 110142 | } | |
| 1098 | |||
| 1099 | 76854 | inline epoll_scheduler::task_cleanup::~task_cleanup() | |
| 1100 | { | ||
| 1101 | 38427 | if (!ctx) | |
| 1102 | ✗ | return; | |
| 1103 | |||
| 1104 | 38427 | if (ctx->private_outstanding_work > 0) | |
| 1105 | { | ||
| 1106 | 4705 | scheduler->outstanding_work_.fetch_add( | |
| 1107 | 4705 | ctx->private_outstanding_work, std::memory_order_relaxed); | |
| 1108 | 4705 | ctx->private_outstanding_work = 0; | |
| 1109 | } | ||
| 1110 | |||
| 1111 | 38427 | if (!ctx->private_queue.empty()) | |
| 1112 | { | ||
| 1113 | 4705 | if (!lock->owns_lock()) | |
| 1114 | ✗ | lock->lock(); | |
| 1115 | 4705 | scheduler->completed_ops_.splice(ctx->private_queue); | |
| 1116 | } | ||
| 1117 | 38427 | } | |
| 1118 | |||
| 1119 | inline void | ||
| 1120 | 9406 | epoll_scheduler::update_timerfd() const | |
| 1121 | { | ||
| 1122 | 9406 | auto nearest = timer_svc_->nearest_expiry(); | |
| 1123 | |||
| 1124 | 9406 | itimerspec ts{}; | |
| 1125 | 9406 | int flags = 0; | |
| 1126 | |||
| 1127 | 9406 | if (nearest == timer_service::time_point::max()) | |
| 1128 | { | ||
| 1129 | // No timers - disarm by setting to 0 (relative) | ||
| 1130 | } | ||
| 1131 | else | ||
| 1132 | { | ||
| 1133 | 9361 | auto now = std::chrono::steady_clock::now(); | |
| 1134 | 9361 | if (nearest <= now) | |
| 1135 | { | ||
| 1136 | // Use 1ns instead of 0 - zero disarms the timerfd | ||
| 1137 | 197 | ts.it_value.tv_nsec = 1; | |
| 1138 | } | ||
| 1139 | else | ||
| 1140 | { | ||
| 1141 | 9164 | auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
| 1142 | 9164 | nearest - now) | |
| 1143 | 9164 | .count(); | |
| 1144 | 9164 | ts.it_value.tv_sec = nsec / 1000000000; | |
| 1145 | 9164 | ts.it_value.tv_nsec = nsec % 1000000000; | |
| 1146 | // Ensure non-zero to avoid disarming if duration rounds to 0 | ||
| 1147 | 9164 | if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0) | |
| 1148 | ✗ | ts.it_value.tv_nsec = 1; | |
| 1149 | } | ||
| 1150 | } | ||
| 1151 | |||
| 1152 | 9406 | if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0) | |
| 1153 | ✗ | detail::throw_system_error(make_err(errno), "timerfd_settime"); | |
| 1154 | 9406 | } | |
| 1155 | |||
| 1156 | inline void | ||
| 1157 | 38427 | epoll_scheduler::run_task( | |
| 1158 | std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx) | ||
| 1159 | { | ||
| 1160 | 38427 | int timeout_ms = task_interrupted_ ? 0 : -1; | |
| 1161 | |||
| 1162 | 38427 | if (lock.owns_lock()) | |
| 1163 | 9234 | lock.unlock(); | |
| 1164 | |||
| 1165 | 38427 | task_cleanup on_exit{this, &lock, ctx}; | |
| 1166 | |||
| 1167 | // Flush deferred timerfd programming before blocking | ||
| 1168 | 38427 | if (timerfd_stale_.exchange(false, std::memory_order_acquire)) | |
| 1169 | 4701 | update_timerfd(); | |
| 1170 | |||
| 1171 | // Event loop runs without mutex held | ||
| 1172 | epoll_event events[128]; | ||
| 1173 | 38427 | int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); | |
| 1174 | |||
| 1175 | 38427 | if (nfds < 0 && errno != EINTR) | |
| 1176 | ✗ | detail::throw_system_error(make_err(errno), "epoll_wait"); | |
| 1177 | |||
| 1178 | 38427 | bool check_timers = false; | |
| 1179 | 38427 | op_queue local_ops; | |
| 1180 | |||
| 1181 | // Process events without holding the mutex | ||
| 1182 | 88883 | for (int i = 0; i < nfds; ++i) | |
| 1183 | { | ||
| 1184 | 50456 | if (events[i].data.ptr == nullptr) | |
| 1185 | { | ||
| 1186 | std::uint64_t val; | ||
| 1187 | // Mutex released above; analyzer can't track unlock via ref | ||
| 1188 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | ||
| 1189 | 70 | [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); | |
| 1190 | 70 | eventfd_armed_.store(false, std::memory_order_relaxed); | |
| 1191 | 70 | continue; | |
| 1192 | 70 | } | |
| 1193 | |||
| 1194 | 50386 | if (events[i].data.ptr == &timer_fd_) | |
| 1195 | { | ||
| 1196 | std::uint64_t expirations; | ||
| 1197 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | ||
| 1198 | [[maybe_unused]] auto r = | ||
| 1199 | 4705 | ::read(timer_fd_, &expirations, sizeof(expirations)); | |
| 1200 | 4705 | check_timers = true; | |
| 1201 | 4705 | continue; | |
| 1202 | 4705 | } | |
| 1203 | |||
| 1204 | // Deferred I/O: just set ready events and enqueue descriptor | ||
| 1205 | // No per-descriptor mutex locking in reactor hot path! | ||
| 1206 | 45681 | auto* desc = static_cast<descriptor_state*>(events[i].data.ptr); | |
| 1207 | 45681 | desc->add_ready_events(events[i].events); | |
| 1208 | |||
| 1209 | // Only enqueue if not already enqueued | ||
| 1210 | 45681 | bool expected = false; | |
| 1211 | 45681 | if (desc->is_enqueued_.compare_exchange_strong( | |
| 1212 | expected, true, std::memory_order_release, | ||
| 1213 | std::memory_order_relaxed)) | ||
| 1214 | { | ||
| 1215 | 45681 | local_ops.push(desc); | |
| 1216 | } | ||
| 1217 | } | ||
| 1218 | |||
| 1219 | // Process timers only when timerfd fires | ||
| 1220 | 38427 | if (check_timers) | |
| 1221 | { | ||
| 1222 | 4705 | timer_svc_->process_expired(); | |
| 1223 | 4705 | update_timerfd(); | |
| 1224 | } | ||
| 1225 | |||
| 1226 | 38427 | lock.lock(); | |
| 1227 | |||
| 1228 | 38427 | if (!local_ops.empty()) | |
| 1229 | 28737 | completed_ops_.splice(local_ops); | |
| 1230 | 38427 | } | |
| 1231 | |||
| 1232 | inline std::size_t | ||
| 1233 | 110303 | epoll_scheduler::do_one( | |
| 1234 | std::unique_lock<std::mutex>& lock, | ||
| 1235 | long timeout_us, | ||
| 1236 | epoll::scheduler_context* ctx) | ||
| 1237 | { | ||
| 1238 | for (;;) | ||
| 1239 | { | ||
| 1240 | 148730 | if (stopped_) | |
| 1241 | 159 | return 0; | |
| 1242 | |||
| 1243 | 148571 | scheduler_op* op = completed_ops_.pop(); | |
| 1244 | |||
| 1245 | // Handle reactor sentinel - time to poll for I/O | ||
| 1246 | 148571 | if (op == &task_op_) | |
| 1247 | { | ||
| 1248 | 38429 | bool more_handlers = !completed_ops_.empty(); | |
| 1249 | |||
| 1250 | // Nothing to run the reactor for: no pending work to wait on, | ||
| 1251 | // or caller requested a non-blocking poll | ||
| 1252 | 47665 | if (!more_handlers && | |
| 1253 | 18472 | (outstanding_work_.load(std::memory_order_acquire) == 0 || | |
| 1254 | timeout_us == 0)) | ||
| 1255 | { | ||
| 1256 | 2 | completed_ops_.push(&task_op_); | |
| 1257 | 2 | return 0; | |
| 1258 | } | ||
| 1259 | |||
| 1260 | 38427 | task_interrupted_ = more_handlers || timeout_us == 0; | |
| 1261 | 38427 | task_running_.store(true, std::memory_order_release); | |
| 1262 | |||
| 1263 | 38427 | if (more_handlers) | |
| 1264 | 29193 | unlock_and_signal_one(lock); | |
| 1265 | |||
| 1266 | 38427 | run_task(lock, ctx); | |
| 1267 | |||
| 1268 | 38427 | task_running_.store(false, std::memory_order_relaxed); | |
| 1269 | 38427 | completed_ops_.push(&task_op_); | |
| 1270 | 38427 | continue; | |
| 1271 | 38427 | } | |
| 1272 | |||
| 1273 | // Handle operation | ||
| 1274 | 110142 | if (op != nullptr) | |
| 1275 | { | ||
| 1276 | 110142 | bool more = !completed_ops_.empty(); | |
| 1277 | |||
| 1278 | 110142 | if (more) | |
| 1279 | 110142 | ctx->unassisted = !unlock_and_signal_one(lock); | |
| 1280 | else | ||
| 1281 | { | ||
| 1282 | ✗ | ctx->unassisted = false; | |
| 1283 | ✗ | lock.unlock(); | |
| 1284 | } | ||
| 1285 | |||
| 1286 | 110142 | work_cleanup on_exit{this, &lock, ctx}; | |
| 1287 | |||
| 1288 | 110142 | (*op)(); | |
| 1289 | 110142 | return 1; | |
| 1290 | 110142 | } | |
| 1291 | |||
| 1292 | // No pending work to wait on, or caller requested non-blocking poll | ||
| 1293 | ✗ | if (outstanding_work_.load(std::memory_order_acquire) == 0 || | |
| 1294 | timeout_us == 0) | ||
| 1295 | ✗ | return 0; | |
| 1296 | |||
| 1297 | ✗ | clear_signal(); | |
| 1298 | ✗ | if (timeout_us < 0) | |
| 1299 | ✗ | wait_for_signal(lock); | |
| 1300 | else | ||
| 1301 | ✗ | wait_for_signal_for(lock, timeout_us); | |
| 1302 | 38427 | } | |
| 1303 | } | ||
| 1304 | |||
| 1305 | } // namespace boost::corosio::detail | ||
| 1306 | |||
| 1307 | #endif // BOOST_COROSIO_HAS_EPOLL | ||
| 1308 | |||
| 1309 | #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | ||
| 1310 |