TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <condition_variable>
35 : #include <cstddef>
36 : #include <cstdint>
37 : #include <limits>
38 : #include <mutex>
39 : #include <utility>
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <sys/epoll.h>
44 : #include <sys/eventfd.h>
45 : #include <sys/socket.h>
46 : #include <sys/timerfd.h>
47 : #include <unistd.h>
48 :
49 : namespace boost::corosio::detail {
50 :
51 : struct epoll_op;
52 : struct descriptor_state;
53 : namespace epoll {
54 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 : } // namespace epoll
56 :
57 : /** Linux scheduler using epoll for I/O multiplexing.
58 :
59 : This scheduler implements the scheduler interface using Linux epoll
60 : for efficient I/O event notification. It uses a single reactor model
61 : where one thread runs epoll_wait while other threads
62 : wait on a condition variable for handler work. This design provides:
63 :
64 : - Handler parallelism: N posted handlers can execute on N threads
65 : - No thundering herd: condition_variable wakes exactly one thread
66 : - IOCP parity: Behavior matches Windows I/O completion port semantics
67 :
68 : When threads call run(), they first try to execute queued handlers.
69 : If the queue is empty and no reactor is running, one thread becomes
70 : the reactor and runs epoll_wait. Other threads wait on a condition
71 : variable until handlers are available.
72 :
73 : @par Thread Safety
74 : All public member functions are thread-safe.
75 : */
76 : class BOOST_COROSIO_DECL epoll_scheduler final
77 : : public native_scheduler
78 : , public capy::execution_context::service
79 : {
80 : public:
81 : using key_type = scheduler;
82 :
83 : /** Construct the scheduler.
84 :
85 : Creates an epoll instance, eventfd for reactor interruption,
86 : and timerfd for kernel-managed timer expiry.
87 :
88 : @param ctx Reference to the owning execution_context.
89 : @param concurrency_hint Hint for expected thread count (unused).
90 : */
91 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92 :
93 : /// Destroy the scheduler.
94 : ~epoll_scheduler() override;
95 :
96 : epoll_scheduler(epoll_scheduler const&) = delete;
97 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98 :
99 : void shutdown() override;
100 : void post(std::coroutine_handle<> h) const override;
101 : void post(scheduler_op* h) const override;
102 : bool running_in_this_thread() const noexcept override;
103 : void stop() override;
104 : bool stopped() const noexcept override;
105 : void restart() override;
106 : std::size_t run() override;
107 : std::size_t run_one() override;
108 : std::size_t wait_one(long usec) override;
109 : std::size_t poll() override;
110 : std::size_t poll_one() override;
111 :
112 : /** Return the epoll file descriptor.
113 :
114 : Used by socket services to register file descriptors
115 : for I/O event notification.
116 :
117 : @return The epoll file descriptor.
118 : */
119 : int epoll_fd() const noexcept
120 : {
121 : return epoll_fd_;
122 : }
123 :
124 : /** Reset the thread's inline completion budget.
125 :
126 : Called at the start of each posted completion handler to
127 : grant a fresh budget for speculative inline completions.
128 : */
129 : void reset_inline_budget() const noexcept;
130 :
131 : /** Consume one unit of inline budget if available.
132 :
133 : @return True if budget was available and consumed.
134 : */
135 : bool try_consume_inline_budget() const noexcept;
136 :
137 : /** Register a descriptor for persistent monitoring.
138 :
139 : The fd is registered once and stays registered until explicitly
140 : deregistered. Events are dispatched via descriptor_state which
141 : tracks pending read/write/connect operations.
142 :
143 : @param fd The file descriptor to register.
144 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 : */
146 : void register_descriptor(int fd, descriptor_state* desc) const;
147 :
148 : /** Deregister a persistently registered descriptor.
149 :
150 : @param fd The file descriptor to deregister.
151 : */
152 : void deregister_descriptor(int fd) const;
153 :
154 : void work_started() noexcept override;
155 : void work_finished() noexcept override;
156 :
157 : /** Offset a forthcoming work_finished from work_cleanup.
158 :
159 : Called by descriptor_state when all I/O returned EAGAIN and no
160 : handler will be executed. Must be called from a scheduler thread.
161 : */
162 : void compensating_work_started() const noexcept;
163 :
164 : /** Drain work from thread context's private queue to global queue.
165 :
166 : Called by thread_context_guard destructor when a thread exits run().
167 : Transfers pending work to the global queue under mutex protection.
168 :
169 : @param queue The private queue to drain.
170 : @param count Item count for wakeup decisions (wakes other threads if positive).
171 : */
172 : void drain_thread_queue(op_queue& queue, long count) const;
173 :
174 : /** Post completed operations for deferred invocation.
175 :
176 : If called from a thread running this scheduler, operations go to
177 : the thread's private queue (fast path). Otherwise, operations are
178 : added to the global queue under mutex and a waiter is signaled.
179 :
180 : @par Preconditions
181 : work_started() must have been called for each operation.
182 :
183 : @param ops Queue of operations to post.
184 : */
185 : void post_deferred_completions(op_queue& ops) const;
186 :
187 : private:
188 : struct work_cleanup
189 : {
190 : epoll_scheduler* scheduler;
191 : std::unique_lock<std::mutex>* lock;
192 : epoll::scheduler_context* ctx;
193 : ~work_cleanup();
194 : };
195 :
196 : struct task_cleanup
197 : {
198 : epoll_scheduler const* scheduler;
199 : std::unique_lock<std::mutex>* lock;
200 : epoll::scheduler_context* ctx;
201 : ~task_cleanup();
202 : };
203 :
204 : std::size_t do_one(
205 : std::unique_lock<std::mutex>& lock,
206 : long timeout_us,
207 : epoll::scheduler_context* ctx);
208 : void
209 : run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211 : void interrupt_reactor() const;
212 : void update_timerfd() const;
213 :
214 : /** Set the signaled state and wake all waiting threads.
215 :
216 : @par Preconditions
217 : Mutex must be held.
218 :
219 : @param lock The held mutex lock.
220 : */
221 : void signal_all(std::unique_lock<std::mutex>& lock) const;
222 :
223 : /** Set the signaled state and wake one waiter if any exist.
224 :
225 : Only unlocks and signals if at least one thread is waiting.
226 : Use this when the caller needs to perform a fallback action
227 : (such as interrupting the reactor) when no waiters exist.
228 :
229 : @par Preconditions
230 : Mutex must be held.
231 :
232 : @param lock The held mutex lock.
233 :
234 : @return `true` if unlocked and signaled, `false` if lock still held.
235 : */
236 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237 :
238 : /** Set the signaled state, unlock, and wake one waiter if any exist.
239 :
240 : Always unlocks the mutex. Use this when the caller will release
241 : the lock regardless of whether a waiter exists.
242 :
243 : @par Preconditions
244 : Mutex must be held.
245 :
246 : @param lock The held mutex lock.
247 :
248 : @return `true` if a waiter was signaled, `false` otherwise.
249 : */
250 : bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251 :
252 : /** Clear the signaled state before waiting.
253 :
254 : @par Preconditions
255 : Mutex must be held.
256 : */
257 : void clear_signal() const;
258 :
259 : /** Block until the signaled state is set.
260 :
261 : Returns immediately if already signaled (fast-path). Otherwise
262 : increments the waiter count, waits on the condition variable,
263 : and decrements the waiter count upon waking.
264 :
265 : @par Preconditions
266 : Mutex must be held.
267 :
268 : @param lock The held mutex lock.
269 : */
270 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271 :
272 : /** Block until signaled or timeout expires.
273 :
274 : @par Preconditions
275 : Mutex must be held.
276 :
277 : @param lock The held mutex lock.
278 : @param timeout_us Maximum time to wait in microseconds.
279 : */
280 : void wait_for_signal_for(
281 : std::unique_lock<std::mutex>& lock, long timeout_us) const;
282 :
283 : int epoll_fd_;
284 : int event_fd_; // for interrupting reactor
285 : int timer_fd_; // timerfd for kernel-managed timer expiry
286 : mutable std::mutex mutex_;
287 : mutable std::condition_variable cond_;
288 : mutable op_queue completed_ops_;
289 : mutable std::atomic<long> outstanding_work_;
290 : bool stopped_;
291 :
292 : // True while a thread is blocked in epoll_wait. Used by
293 : // wake_one_thread_and_unlock and work_finished to know when
294 : // an eventfd interrupt is needed instead of a condvar signal.
295 : mutable std::atomic<bool> task_running_{false};
296 :
297 : // True when the reactor has been told to do a non-blocking poll
298 : // (more handlers queued or poll mode). Prevents redundant eventfd
299 : // writes and controls the epoll_wait timeout.
300 : mutable bool task_interrupted_ = false;
301 :
302 : // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303 : mutable std::size_t state_ = 0;
304 :
305 : // Edge-triggered eventfd state
306 : mutable std::atomic<bool> eventfd_armed_{false};
307 :
308 : // Set when the earliest timer changes; flushed before epoll_wait
309 : // blocks. Avoids timerfd_settime syscalls for timers that are
310 : // scheduled then cancelled without being waited on.
311 : mutable std::atomic<bool> timerfd_stale_{false};
312 :
313 : // Sentinel operation for interleaving reactor runs with handler execution.
314 : // Ensures the reactor runs periodically even when handlers are continuously
315 : // posted, preventing starvation of I/O events, timers, and signals.
316 : struct task_op final : scheduler_op
317 : {
318 MIS 0 : void operator()() override {}
319 0 : void destroy() override {}
320 : };
321 : task_op task_op_;
322 : };
323 :
324 : //--------------------------------------------------------------------------
325 : //
326 : // Implementation
327 : //
328 : //--------------------------------------------------------------------------
329 :
330 : /*
331 : epoll Scheduler - Single Reactor Model
332 : ======================================
333 :
334 : This scheduler uses a thread coordination strategy to provide handler
335 : parallelism and avoid the thundering herd problem.
336 : Instead of all threads blocking on epoll_wait(), one thread becomes the
337 : "reactor" while others wait on a condition variable for handler work.
338 :
339 : Thread Model
340 : ------------
341 : - ONE thread runs epoll_wait() at a time (the reactor thread)
342 : - OTHER threads wait on cond_ (condition variable) for handlers
343 : - When work is posted, exactly one waiting thread wakes via notify_one()
344 : - This matches Windows IOCP semantics where N posted items wake N threads
345 :
346 : Event Loop Structure (do_one)
347 : -----------------------------
348 : 1. Lock mutex, try to pop handler from queue
349 : 2. If got handler: execute it (unlocked), return
350 : 3. If queue empty and no reactor running: become reactor
351 : - Run epoll_wait (unlocked), queue I/O completions, loop back
352 : 4. If queue empty and reactor running: wait on condvar for work
353 :
354 : The task_running_ flag ensures only one thread owns epoll_wait().
355 : After the reactor queues I/O completions, it loops back to try getting
356 : a handler, giving priority to handler execution over more I/O polling.
357 :
358 : Signaling State (state_)
359 : ------------------------
360 : The state_ variable encodes two pieces of information:
361 : - Bit 0: signaled flag (1 = signaled, persists until cleared)
362 : - Upper bits: waiter count (each waiter adds 2 before blocking)
363 :
364 : This allows efficient coordination:
365 : - Signalers only call notify when waiters exist (state_ > 1)
366 : - Waiters check if already signaled before blocking (fast-path)
367 :
368 : Wake Coordination (wake_one_thread_and_unlock)
369 : ----------------------------------------------
370 : When posting work:
371 : - If waiters exist (state_ > 1): signal and notify_one()
372 : - Else if reactor running: interrupt via eventfd write
373 : - Else: no-op (thread will find work when it checks queue)
374 :
375 : This avoids waking threads unnecessarily. With cascading wakes,
376 : each handler execution wakes at most one additional thread if
377 : more work exists in the queue.
378 :
379 : Work Counting
380 : -------------
381 : outstanding_work_ tracks pending operations. When it hits zero, run()
382 : returns. Each operation increments on start, decrements on completion.
383 :
384 : Timer Integration
385 : -----------------
386 : Timers are handled by timer_service. The reactor adjusts epoll_wait
387 : timeout to wake for the nearest timer expiry. When a new timer is
388 : scheduled earlier than current, timer_service calls interrupt_reactor()
389 : to re-evaluate the timeout.
390 : */
391 :
392 : namespace epoll {
393 :
394 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395 : {
396 : epoll_scheduler const* key;
397 : scheduler_context* next;
398 : op_queue private_queue;
399 : long private_outstanding_work;
400 : int inline_budget;
401 : int inline_budget_max;
402 : bool unassisted;
403 :
404 HIT 192 : scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405 192 : : key(k)
406 192 : , next(n)
407 192 : , private_outstanding_work(0)
408 192 : , inline_budget(0)
409 192 : , inline_budget_max(2)
410 192 : , unassisted(false)
411 : {
412 192 : }
413 : };
414 :
415 : inline thread_local_ptr<scheduler_context> context_stack;
416 :
417 : struct thread_context_guard
418 : {
419 : scheduler_context frame_;
420 :
421 192 : explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422 192 : : frame_(ctx, context_stack.get())
423 : {
424 192 : context_stack.set(&frame_);
425 192 : }
426 :
427 192 : ~thread_context_guard() noexcept
428 : {
429 192 : if (!frame_.private_queue.empty())
430 MIS 0 : frame_.key->drain_thread_queue(
431 0 : frame_.private_queue, frame_.private_outstanding_work);
432 HIT 192 : context_stack.set(frame_.next);
433 192 : }
434 : };
435 :
436 : inline scheduler_context*
437 428484 : find_context(epoll_scheduler const* self) noexcept
438 : {
439 428484 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440 426782 : if (c->key == self)
441 426782 : return c;
442 1702 : return nullptr;
443 : }
444 :
445 : } // namespace epoll
446 :
447 : inline void
448 62171 : epoll_scheduler::reset_inline_budget() const noexcept
449 : {
450 62171 : if (auto* ctx = epoll::find_context(this))
451 : {
452 : // Cap when no other thread absorbed queued work. A moderate
453 : // cap (4) amortizes scheduling for small buffers while avoiding
454 : // bursty I/O that fills socket buffers and stalls large transfers.
455 62171 : if (ctx->unassisted)
456 : {
457 62171 : ctx->inline_budget_max = 4;
458 62171 : ctx->inline_budget = 4;
459 62171 : return;
460 : }
461 : // Ramp up when previous cycle fully consumed budget.
462 : // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463 MIS 0 : if (ctx->inline_budget == 0)
464 0 : ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
466 0 : ctx->inline_budget_max = 2;
467 0 : ctx->inline_budget = ctx->inline_budget_max;
468 : }
469 : }
470 :
471 : inline bool
472 HIT 265165 : epoll_scheduler::try_consume_inline_budget() const noexcept
473 : {
474 265165 : if (auto* ctx = epoll::find_context(this))
475 : {
476 265165 : if (ctx->inline_budget > 0)
477 : {
478 212206 : --ctx->inline_budget;
479 212206 : return true;
480 : }
481 : }
482 52959 : return false;
483 : }
484 :
485 : inline void
486 45652 : descriptor_state::operator()()
487 : {
488 45652 : is_enqueued_.store(false, std::memory_order_relaxed);
489 :
490 : // Take ownership of impl ref set by close_socket() to prevent
491 : // the owning impl from being freed while we're executing
492 45652 : auto prevent_impl_destruction = std::move(impl_ref_);
493 :
494 45652 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495 45652 : if (ev == 0)
496 : {
497 MIS 0 : scheduler_->compensating_work_started();
498 0 : return;
499 : }
500 :
501 HIT 45652 : op_queue local_ops;
502 :
503 45652 : int err = 0;
504 45652 : if (ev & EPOLLERR)
505 : {
506 1 : socklen_t len = sizeof(err);
507 1 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508 MIS 0 : err = errno;
509 HIT 1 : if (err == 0)
510 MIS 0 : err = EIO;
511 : }
512 :
513 : {
514 HIT 45652 : std::lock_guard lock(mutex);
515 45652 : if (ev & EPOLLIN)
516 : {
517 14527 : if (read_op)
518 : {
519 4502 : auto* rd = read_op;
520 4502 : if (err)
521 MIS 0 : rd->complete(err, 0);
522 : else
523 HIT 4502 : rd->perform_io();
524 :
525 4502 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526 : {
527 MIS 0 : rd->errn = 0;
528 : }
529 : else
530 : {
531 HIT 4502 : read_op = nullptr;
532 4502 : local_ops.push(rd);
533 : }
534 : }
535 : else
536 : {
537 10025 : read_ready = true;
538 : }
539 : }
540 45652 : if (ev & EPOLLOUT)
541 : {
542 41154 : bool had_write_op = (connect_op || write_op);
543 41154 : if (connect_op)
544 : {
545 4502 : auto* cn = connect_op;
546 4502 : if (err)
547 1 : cn->complete(err, 0);
548 : else
549 4501 : cn->perform_io();
550 4502 : connect_op = nullptr;
551 4502 : local_ops.push(cn);
552 : }
553 41154 : if (write_op)
554 : {
555 MIS 0 : auto* wr = write_op;
556 0 : if (err)
557 0 : wr->complete(err, 0);
558 : else
559 0 : wr->perform_io();
560 :
561 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562 : {
563 0 : wr->errn = 0;
564 : }
565 : else
566 : {
567 0 : write_op = nullptr;
568 0 : local_ops.push(wr);
569 : }
570 : }
571 HIT 41154 : if (!had_write_op)
572 36652 : write_ready = true;
573 : }
574 45652 : if (err)
575 : {
576 1 : if (read_op)
577 : {
578 MIS 0 : read_op->complete(err, 0);
579 0 : local_ops.push(std::exchange(read_op, nullptr));
580 : }
581 HIT 1 : if (write_op)
582 : {
583 MIS 0 : write_op->complete(err, 0);
584 0 : local_ops.push(std::exchange(write_op, nullptr));
585 : }
586 HIT 1 : if (connect_op)
587 : {
588 MIS 0 : connect_op->complete(err, 0);
589 0 : local_ops.push(std::exchange(connect_op, nullptr));
590 : }
591 : }
592 HIT 45652 : }
593 :
594 : // Execute first handler inline — the scheduler's work_cleanup
595 : // accounts for this as the "consumed" work item
596 45652 : scheduler_op* first = local_ops.pop();
597 45652 : if (first)
598 : {
599 9004 : scheduler_->post_deferred_completions(local_ops);
600 9004 : (*first)();
601 : }
602 : else
603 : {
604 36648 : scheduler_->compensating_work_started();
605 : }
606 45652 : }
607 :
608 210 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609 210 : : epoll_fd_(-1)
610 210 : , event_fd_(-1)
611 210 : , timer_fd_(-1)
612 210 : , outstanding_work_(0)
613 210 : , stopped_(false)
614 210 : , task_running_{false}
615 210 : , task_interrupted_(false)
616 420 : , state_(0)
617 : {
618 210 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619 210 : if (epoll_fd_ < 0)
620 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
621 :
622 HIT 210 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623 210 : if (event_fd_ < 0)
624 : {
625 MIS 0 : int errn = errno;
626 0 : ::close(epoll_fd_);
627 0 : detail::throw_system_error(make_err(errn), "eventfd");
628 : }
629 :
630 HIT 210 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631 210 : if (timer_fd_ < 0)
632 : {
633 MIS 0 : int errn = errno;
634 0 : ::close(event_fd_);
635 0 : ::close(epoll_fd_);
636 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
637 : }
638 :
639 HIT 210 : epoll_event ev{};
640 210 : ev.events = EPOLLIN | EPOLLET;
641 210 : ev.data.ptr = nullptr;
642 210 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643 : {
644 MIS 0 : int errn = errno;
645 0 : ::close(timer_fd_);
646 0 : ::close(event_fd_);
647 0 : ::close(epoll_fd_);
648 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
649 : }
650 :
651 HIT 210 : epoll_event timer_ev{};
652 210 : timer_ev.events = EPOLLIN | EPOLLERR;
653 210 : timer_ev.data.ptr = &timer_fd_;
654 210 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655 : {
656 MIS 0 : int errn = errno;
657 0 : ::close(timer_fd_);
658 0 : ::close(event_fd_);
659 0 : ::close(epoll_fd_);
660 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661 : }
662 :
663 HIT 210 : timer_svc_ = &get_timer_service(ctx, *this);
664 210 : timer_svc_->set_on_earliest_changed(
665 4926 : timer_service::callback(this, [](void* p) {
666 4716 : auto* self = static_cast<epoll_scheduler*>(p);
667 4716 : self->timerfd_stale_.store(true, std::memory_order_release);
668 4716 : if (self->task_running_.load(std::memory_order_acquire))
669 MIS 0 : self->interrupt_reactor();
670 HIT 4716 : }));
671 :
672 : // Initialize resolver service
673 210 : get_resolver_service(ctx, *this);
674 :
675 : // Initialize signal service
676 210 : get_signal_service(ctx, *this);
677 :
678 : // Push task sentinel to interleave reactor runs with handler execution
679 210 : completed_ops_.push(&task_op_);
680 210 : }
681 :
682 420 : inline epoll_scheduler::~epoll_scheduler()
683 : {
684 210 : if (timer_fd_ >= 0)
685 210 : ::close(timer_fd_);
686 210 : if (event_fd_ >= 0)
687 210 : ::close(event_fd_);
688 210 : if (epoll_fd_ >= 0)
689 210 : ::close(epoll_fd_);
690 420 : }
691 :
692 : inline void
693 210 : epoll_scheduler::shutdown()
694 : {
695 : {
696 210 : std::unique_lock lock(mutex_);
697 :
698 459 : while (auto* h = completed_ops_.pop())
699 : {
700 249 : if (h == &task_op_)
701 210 : continue;
702 39 : lock.unlock();
703 39 : h->destroy();
704 39 : lock.lock();
705 249 : }
706 :
707 210 : signal_all(lock);
708 210 : }
709 :
710 210 : if (event_fd_ >= 0)
711 210 : interrupt_reactor();
712 210 : }
713 :
714 : inline void
715 6565 : epoll_scheduler::post(std::coroutine_handle<> h) const
716 : {
717 : struct post_handler final : scheduler_op
718 : {
719 : std::coroutine_handle<> h_;
720 :
721 6565 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
722 :
723 13130 : ~post_handler() override = default;
724 :
725 6559 : void operator()() override
726 : {
727 6559 : auto h = h_;
728 6559 : delete this;
729 6559 : h.resume();
730 6559 : }
731 :
732 6 : void destroy() override
733 : {
734 6 : auto h = h_;
735 6 : delete this;
736 6 : h.destroy();
737 6 : }
738 : };
739 :
740 6565 : auto ph = std::make_unique<post_handler>(h);
741 :
742 : // Fast path: same thread posts to private queue
743 : // Only count locally; work_cleanup batches to global counter
744 6565 : if (auto* ctx = epoll::find_context(this))
745 : {
746 4893 : ++ctx->private_outstanding_work;
747 4893 : ctx->private_queue.push(ph.release());
748 4893 : return;
749 : }
750 :
751 : // Slow path: cross-thread post requires mutex
752 1672 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
753 :
754 1672 : std::unique_lock lock(mutex_);
755 1672 : completed_ops_.push(ph.release());
756 1672 : wake_one_thread_and_unlock(lock);
757 6565 : }
758 :
759 : inline void
760 57935 : epoll_scheduler::post(scheduler_op* h) const
761 : {
762 : // Fast path: same thread posts to private queue
763 : // Only count locally; work_cleanup batches to global counter
764 57935 : if (auto* ctx = epoll::find_context(this))
765 : {
766 57905 : ++ctx->private_outstanding_work;
767 57905 : ctx->private_queue.push(h);
768 57905 : return;
769 : }
770 :
771 : // Slow path: cross-thread post requires mutex
772 30 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773 :
774 30 : std::unique_lock lock(mutex_);
775 30 : completed_ops_.push(h);
776 30 : wake_one_thread_and_unlock(lock);
777 30 : }
778 :
779 : inline bool
780 709 : epoll_scheduler::running_in_this_thread() const noexcept
781 : {
782 709 : for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
783 456 : if (c->key == this)
784 456 : return true;
785 253 : return false;
786 : }
787 :
788 : inline void
789 205 : epoll_scheduler::stop()
790 : {
791 205 : std::unique_lock lock(mutex_);
792 205 : if (!stopped_)
793 : {
794 170 : stopped_ = true;
795 170 : signal_all(lock);
796 170 : interrupt_reactor();
797 : }
798 205 : }
799 :
800 : inline bool
801 18 : epoll_scheduler::stopped() const noexcept
802 : {
803 18 : std::unique_lock lock(mutex_);
804 36 : return stopped_;
805 18 : }
806 :
807 : inline void
808 52 : epoll_scheduler::restart()
809 : {
810 52 : std::unique_lock lock(mutex_);
811 52 : stopped_ = false;
812 52 : }
813 :
814 : inline std::size_t
815 188 : epoll_scheduler::run()
816 : {
817 376 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
818 : {
819 30 : stop();
820 30 : return 0;
821 : }
822 :
823 158 : epoll::thread_context_guard ctx(this);
824 158 : std::unique_lock lock(mutex_);
825 :
826 158 : std::size_t n = 0;
827 : for (;;)
828 : {
829 110265 : if (!do_one(lock, -1, &ctx.frame_))
830 158 : break;
831 110107 : if (n != (std::numeric_limits<std::size_t>::max)())
832 110107 : ++n;
833 110107 : if (!lock.owns_lock())
834 52038 : lock.lock();
835 : }
836 158 : return n;
837 158 : }
838 :
839 : inline std::size_t
840 2 : epoll_scheduler::run_one()
841 : {
842 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
843 : {
844 MIS 0 : stop();
845 0 : return 0;
846 : }
847 :
848 HIT 2 : epoll::thread_context_guard ctx(this);
849 2 : std::unique_lock lock(mutex_);
850 2 : return do_one(lock, -1, &ctx.frame_);
851 2 : }
852 :
853 : inline std::size_t
854 34 : epoll_scheduler::wait_one(long usec)
855 : {
856 68 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
857 : {
858 7 : stop();
859 7 : return 0;
860 : }
861 :
862 27 : epoll::thread_context_guard ctx(this);
863 27 : std::unique_lock lock(mutex_);
864 27 : return do_one(lock, usec, &ctx.frame_);
865 27 : }
866 :
867 : inline std::size_t
868 4 : epoll_scheduler::poll()
869 : {
870 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
871 : {
872 1 : stop();
873 1 : return 0;
874 : }
875 :
876 3 : epoll::thread_context_guard ctx(this);
877 3 : std::unique_lock lock(mutex_);
878 :
879 3 : std::size_t n = 0;
880 : for (;;)
881 : {
882 7 : if (!do_one(lock, 0, &ctx.frame_))
883 3 : break;
884 4 : if (n != (std::numeric_limits<std::size_t>::max)())
885 4 : ++n;
886 4 : if (!lock.owns_lock())
887 4 : lock.lock();
888 : }
889 3 : return n;
890 3 : }
891 :
892 : inline std::size_t
893 4 : epoll_scheduler::poll_one()
894 : {
895 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
896 : {
897 2 : stop();
898 2 : return 0;
899 : }
900 :
901 2 : epoll::thread_context_guard ctx(this);
902 2 : std::unique_lock lock(mutex_);
903 2 : return do_one(lock, 0, &ctx.frame_);
904 2 : }
905 :
906 : inline void
907 9077 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908 : {
909 9077 : epoll_event ev{};
910 9077 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
911 9077 : ev.data.ptr = desc;
912 :
913 9077 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915 :
916 HIT 9077 : desc->registered_events = ev.events;
917 9077 : desc->fd = fd;
918 9077 : desc->scheduler_ = this;
919 :
920 9077 : std::lock_guard lock(desc->mutex);
921 9077 : desc->read_ready = false;
922 9077 : desc->write_ready = false;
923 9077 : }
924 :
925 : inline void
926 9077 : epoll_scheduler::deregister_descriptor(int fd) const
927 : {
928 9077 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929 9077 : }
930 :
931 : inline void
932 14664 : epoll_scheduler::work_started() noexcept
933 : {
934 14664 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
935 14664 : }
936 :
937 : inline void
938 21074 : epoll_scheduler::work_finished() noexcept
939 : {
940 42148 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
941 163 : stop();
942 21074 : }
943 :
944 : inline void
945 36648 : epoll_scheduler::compensating_work_started() const noexcept
946 : {
947 36648 : auto* ctx = epoll::find_context(this);
948 36648 : if (ctx)
949 36648 : ++ctx->private_outstanding_work;
950 36648 : }
951 :
952 : inline void
953 MIS 0 : epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
954 : {
955 : // Note: outstanding_work_ was already incremented when posting
956 0 : std::unique_lock lock(mutex_);
957 0 : completed_ops_.splice(queue);
958 0 : if (count > 0)
959 0 : maybe_unlock_and_signal_one(lock);
960 0 : }
961 :
962 : inline void
963 HIT 9004 : epoll_scheduler::post_deferred_completions(op_queue& ops) const
964 : {
965 9004 : if (ops.empty())
966 9004 : return;
967 :
968 : // Fast path: if on scheduler thread, use private queue
969 MIS 0 : if (auto* ctx = epoll::find_context(this))
970 : {
971 0 : ctx->private_queue.splice(ops);
972 0 : return;
973 : }
974 :
975 : // Slow path: add to global queue and wake a thread
976 0 : std::unique_lock lock(mutex_);
977 0 : completed_ops_.splice(ops);
978 0 : wake_one_thread_and_unlock(lock);
979 0 : }
980 :
981 : inline void
982 HIT 406 : epoll_scheduler::interrupt_reactor() const
983 : {
984 : // Only write if not already armed to avoid redundant writes
985 406 : bool expected = false;
986 406 : if (eventfd_armed_.compare_exchange_strong(
987 : expected, true, std::memory_order_release,
988 : std::memory_order_relaxed))
989 : {
990 280 : std::uint64_t val = 1;
991 280 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992 : }
993 406 : }
994 :
995 : inline void
996 380 : epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
997 : {
998 380 : state_ |= 1;
999 380 : cond_.notify_all();
1000 380 : }
1001 :
1002 : inline bool
1003 1702 : epoll_scheduler::maybe_unlock_and_signal_one(
1004 : std::unique_lock<std::mutex>& lock) const
1005 : {
1006 1702 : state_ |= 1;
1007 1702 : if (state_ > 1)
1008 : {
1009 MIS 0 : lock.unlock();
1010 0 : cond_.notify_one();
1011 0 : return true;
1012 : }
1013 HIT 1702 : return false;
1014 : }
1015 :
1016 : inline bool
1017 139335 : epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1018 : {
1019 139335 : state_ |= 1;
1020 139335 : bool have_waiters = state_ > 1;
1021 139335 : lock.unlock();
1022 139335 : if (have_waiters)
1023 MIS 0 : cond_.notify_one();
1024 HIT 139335 : return have_waiters;
1025 : }
1026 :
1027 : inline void
1028 MIS 0 : epoll_scheduler::clear_signal() const
1029 : {
1030 0 : state_ &= ~std::size_t(1);
1031 0 : }
1032 :
1033 : inline void
1034 0 : epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1035 : {
1036 0 : while ((state_ & 1) == 0)
1037 : {
1038 0 : state_ += 2;
1039 0 : cond_.wait(lock);
1040 0 : state_ -= 2;
1041 : }
1042 0 : }
1043 :
1044 : inline void
1045 0 : epoll_scheduler::wait_for_signal_for(
1046 : std::unique_lock<std::mutex>& lock, long timeout_us) const
1047 : {
1048 0 : if ((state_ & 1) == 0)
1049 : {
1050 0 : state_ += 2;
1051 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1052 0 : state_ -= 2;
1053 : }
1054 0 : }
1055 :
1056 : inline void
1057 HIT 1702 : epoll_scheduler::wake_one_thread_and_unlock(
1058 : std::unique_lock<std::mutex>& lock) const
1059 : {
1060 1702 : if (maybe_unlock_and_signal_one(lock))
1061 MIS 0 : return;
1062 :
1063 HIT 1702 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1064 : {
1065 26 : task_interrupted_ = true;
1066 26 : lock.unlock();
1067 26 : interrupt_reactor();
1068 : }
1069 : else
1070 : {
1071 1676 : lock.unlock();
1072 : }
1073 : }
1074 :
1075 110142 : inline epoll_scheduler::work_cleanup::~work_cleanup()
1076 : {
1077 110142 : if (ctx)
1078 : {
1079 110142 : long produced = ctx->private_outstanding_work;
1080 110142 : if (produced > 1)
1081 7 : scheduler->outstanding_work_.fetch_add(
1082 : produced - 1, std::memory_order_relaxed);
1083 110135 : else if (produced < 1)
1084 15414 : scheduler->work_finished();
1085 110142 : ctx->private_outstanding_work = 0;
1086 :
1087 110142 : if (!ctx->private_queue.empty())
1088 : {
1089 58080 : lock->lock();
1090 58080 : scheduler->completed_ops_.splice(ctx->private_queue);
1091 : }
1092 : }
1093 : else
1094 : {
1095 MIS 0 : scheduler->work_finished();
1096 : }
1097 HIT 110142 : }
1098 :
1099 76854 : inline epoll_scheduler::task_cleanup::~task_cleanup()
1100 : {
1101 38427 : if (!ctx)
1102 MIS 0 : return;
1103 :
1104 HIT 38427 : if (ctx->private_outstanding_work > 0)
1105 : {
1106 4705 : scheduler->outstanding_work_.fetch_add(
1107 4705 : ctx->private_outstanding_work, std::memory_order_relaxed);
1108 4705 : ctx->private_outstanding_work = 0;
1109 : }
1110 :
1111 38427 : if (!ctx->private_queue.empty())
1112 : {
1113 4705 : if (!lock->owns_lock())
1114 MIS 0 : lock->lock();
1115 HIT 4705 : scheduler->completed_ops_.splice(ctx->private_queue);
1116 : }
1117 38427 : }
1118 :
1119 : inline void
1120 9406 : epoll_scheduler::update_timerfd() const
1121 : {
1122 9406 : auto nearest = timer_svc_->nearest_expiry();
1123 :
1124 9406 : itimerspec ts{};
1125 9406 : int flags = 0;
1126 :
1127 9406 : if (nearest == timer_service::time_point::max())
1128 : {
1129 : // No timers - disarm by setting to 0 (relative)
1130 : }
1131 : else
1132 : {
1133 9361 : auto now = std::chrono::steady_clock::now();
1134 9361 : if (nearest <= now)
1135 : {
1136 : // Use 1ns instead of 0 - zero disarms the timerfd
1137 197 : ts.it_value.tv_nsec = 1;
1138 : }
1139 : else
1140 : {
1141 9164 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142 9164 : nearest - now)
1143 9164 : .count();
1144 9164 : ts.it_value.tv_sec = nsec / 1000000000;
1145 9164 : ts.it_value.tv_nsec = nsec % 1000000000;
1146 : // Ensure non-zero to avoid disarming if duration rounds to 0
1147 9164 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148 MIS 0 : ts.it_value.tv_nsec = 1;
1149 : }
1150 : }
1151 :
1152 HIT 9406 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
1154 HIT 9406 : }
1155 :
1156 : inline void
1157 38427 : epoll_scheduler::run_task(
1158 : std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1159 : {
1160 38427 : int timeout_ms = task_interrupted_ ? 0 : -1;
1161 :
1162 38427 : if (lock.owns_lock())
1163 9234 : lock.unlock();
1164 :
1165 38427 : task_cleanup on_exit{this, &lock, ctx};
1166 :
1167 : // Flush deferred timerfd programming before blocking
1168 38427 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169 4701 : update_timerfd();
1170 :
1171 : // Event loop runs without mutex held
1172 : epoll_event events[128];
1173 38427 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174 :
1175 38427 : if (nfds < 0 && errno != EINTR)
1176 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
1177 :
1178 HIT 38427 : bool check_timers = false;
1179 38427 : op_queue local_ops;
1180 :
1181 : // Process events without holding the mutex
1182 88883 : for (int i = 0; i < nfds; ++i)
1183 : {
1184 50456 : if (events[i].data.ptr == nullptr)
1185 : {
1186 : std::uint64_t val;
1187 : // Mutex released above; analyzer can't track unlock via ref
1188 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189 70 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190 70 : eventfd_armed_.store(false, std::memory_order_relaxed);
1191 70 : continue;
1192 70 : }
1193 :
1194 50386 : if (events[i].data.ptr == &timer_fd_)
1195 : {
1196 : std::uint64_t expirations;
1197 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198 : [[maybe_unused]] auto r =
1199 4705 : ::read(timer_fd_, &expirations, sizeof(expirations));
1200 4705 : check_timers = true;
1201 4705 : continue;
1202 4705 : }
1203 :
1204 : // Deferred I/O: just set ready events and enqueue descriptor
1205 : // No per-descriptor mutex locking in reactor hot path!
1206 45681 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207 45681 : desc->add_ready_events(events[i].events);
1208 :
1209 : // Only enqueue if not already enqueued
1210 45681 : bool expected = false;
1211 45681 : if (desc->is_enqueued_.compare_exchange_strong(
1212 : expected, true, std::memory_order_release,
1213 : std::memory_order_relaxed))
1214 : {
1215 45681 : local_ops.push(desc);
1216 : }
1217 : }
1218 :
1219 : // Process timers only when timerfd fires
1220 38427 : if (check_timers)
1221 : {
1222 4705 : timer_svc_->process_expired();
1223 4705 : update_timerfd();
1224 : }
1225 :
1226 38427 : lock.lock();
1227 :
1228 38427 : if (!local_ops.empty())
1229 28737 : completed_ops_.splice(local_ops);
1230 38427 : }
1231 :
1232 : inline std::size_t
1233 110303 : epoll_scheduler::do_one(
1234 : std::unique_lock<std::mutex>& lock,
1235 : long timeout_us,
1236 : epoll::scheduler_context* ctx)
1237 : {
1238 : for (;;)
1239 : {
1240 148730 : if (stopped_)
1241 159 : return 0;
1242 :
1243 148571 : scheduler_op* op = completed_ops_.pop();
1244 :
1245 : // Handle reactor sentinel - time to poll for I/O
1246 148571 : if (op == &task_op_)
1247 : {
1248 38429 : bool more_handlers = !completed_ops_.empty();
1249 :
1250 : // Nothing to run the reactor for: no pending work to wait on,
1251 : // or caller requested a non-blocking poll
1252 47665 : if (!more_handlers &&
1253 18472 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1254 : timeout_us == 0))
1255 : {
1256 2 : completed_ops_.push(&task_op_);
1257 2 : return 0;
1258 : }
1259 :
1260 38427 : task_interrupted_ = more_handlers || timeout_us == 0;
1261 38427 : task_running_.store(true, std::memory_order_release);
1262 :
1263 38427 : if (more_handlers)
1264 29193 : unlock_and_signal_one(lock);
1265 :
1266 38427 : run_task(lock, ctx);
1267 :
1268 38427 : task_running_.store(false, std::memory_order_relaxed);
1269 38427 : completed_ops_.push(&task_op_);
1270 38427 : continue;
1271 38427 : }
1272 :
1273 : // Handle operation
1274 110142 : if (op != nullptr)
1275 : {
1276 110142 : bool more = !completed_ops_.empty();
1277 :
1278 110142 : if (more)
1279 110142 : ctx->unassisted = !unlock_and_signal_one(lock);
1280 : else
1281 : {
1282 MIS 0 : ctx->unassisted = false;
1283 0 : lock.unlock();
1284 : }
1285 :
1286 HIT 110142 : work_cleanup on_exit{this, &lock, ctx};
1287 :
1288 110142 : (*op)();
1289 110142 : return 1;
1290 110142 : }
1291 :
1292 : // No pending work to wait on, or caller requested non-blocking poll
1293 MIS 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1294 : timeout_us == 0)
1295 0 : return 0;
1296 :
1297 0 : clear_signal();
1298 0 : if (timeout_us < 0)
1299 0 : wait_for_signal(lock);
1300 : else
1301 0 : wait_for_signal_for(lock, timeout_us);
1302 HIT 38427 : }
1303 : }
1304 :
1305 : } // namespace boost::corosio::detail
1306 :
1307 : #endif // BOOST_COROSIO_HAS_EPOLL
1308 :
1309 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|