LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 77.7 % 358 278 80
Test Date: 2026-02-25 22:29:39 Functions: 88.6 % 35 31 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/native_scheduler.hpp>
      21                 : #include <boost/corosio/detail/scheduler_op.hpp>
      22                 : 
      23                 : #include <boost/corosio/native/detail/select/select_op.hpp>
      24                 : #include <boost/corosio/detail/timer_service.hpp>
      25                 : #include <boost/corosio/detail/make_err.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      28                 : 
      29                 : #include <boost/corosio/detail/except.hpp>
      30                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      31                 : 
      32                 : #include <sys/select.h>
      33                 : #include <sys/socket.h>
      34                 : #include <unistd.h>
      35                 : #include <errno.h>
      36                 : #include <fcntl.h>
      37                 : 
      38                 : #include <algorithm>
      39                 : #include <atomic>
      40                 : #include <chrono>
      41                 : #include <condition_variable>
      42                 : #include <cstddef>
      43                 : #include <limits>
      44                 : #include <mutex>
      45                 : #include <unordered_map>
      46                 : 
      47                 : namespace boost::corosio::detail {
      48                 : 
      49                 : struct select_op;
      50                 : 
      51                 : /** POSIX scheduler using select() for I/O multiplexing.
      52                 : 
      53                 :     This scheduler implements the scheduler interface using the POSIX select()
      54                 :     call for I/O event notification. It uses a single reactor model
      55                 :     where one thread runs select() while other threads wait on a condition
      56                 :     variable for handler work. This design provides:
      57                 : 
      58                 :     - Handler parallelism: N posted handlers can execute on N threads
      59                 :     - No thundering herd: condition_variable wakes exactly one thread
      60                 :     - Portability: Works on all POSIX systems
      61                 : 
      62                 :     The design mirrors epoll_scheduler for behavioral consistency:
      63                 :     - Same single-reactor thread coordination model
      64                 :     - Same work counting semantics
      65                 :     - Same timer integration pattern
      66                 : 
      67                 :     Known Limitations:
      68                 :     - FD_SETSIZE (~1024) limits maximum concurrent connections
      69                 :     - O(n) scanning: rebuilds fd_sets each iteration
      70                 :     - Level-triggered only (no edge-triggered mode)
      71                 : 
      72                 :     @par Thread Safety
      73                 :     All public member functions are thread-safe.
      74                 : */
      75                 : class BOOST_COROSIO_DECL select_scheduler final
      76                 :     : public native_scheduler
      77                 :     , public capy::execution_context::service
      78                 : {
      79                 : public:
      80                 :     using key_type = scheduler;
      81                 : 
      82                 :     /** Construct the scheduler.
      83                 : 
      84                 :         Creates a self-pipe for reactor interruption.
      85                 : 
      86                 :         @param ctx Reference to the owning execution_context.
      87                 :         @param concurrency_hint Hint for expected thread count (unused).
      88                 :     */
      89                 :     select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      90                 : 
      91                 :     ~select_scheduler() override;
      92                 : 
      93                 :     select_scheduler(select_scheduler const&)            = delete;
      94                 :     select_scheduler& operator=(select_scheduler const&) = delete;
      95                 : 
      96                 :     void shutdown() override;
      97                 :     void post(std::coroutine_handle<> h) const override;
      98                 :     void post(scheduler_op* h) const override;
      99                 :     bool running_in_this_thread() const noexcept override;
     100                 :     void stop() override;
     101                 :     bool stopped() const noexcept override;
     102                 :     void restart() override;
     103                 :     std::size_t run() override;
     104                 :     std::size_t run_one() override;
     105                 :     std::size_t wait_one(long usec) override;
     106                 :     std::size_t poll() override;
     107                 :     std::size_t poll_one() override;
     108                 : 
     109                 :     /** Return the maximum file descriptor value supported.
     110                 : 
     111                 :         Returns FD_SETSIZE - 1, the maximum fd value that can be
     112                 :         monitored by select(). Operations with fd >= FD_SETSIZE
     113                 :         will fail with EINVAL.
     114                 : 
     115                 :         @return The maximum supported file descriptor value.
     116                 :     */
     117                 :     static constexpr int max_fd() noexcept
     118                 :     {
     119                 :         return FD_SETSIZE - 1;
     120                 :     }
     121                 : 
     122                 :     /** Register a file descriptor for monitoring.
     123                 : 
     124                 :         @param fd The file descriptor to register.
     125                 :         @param op The operation associated with this fd.
     126                 :         @param events Event mask: 1 = read, 2 = write, 3 = both.
     127                 :     */
     128                 :     void register_fd(int fd, select_op* op, int events) const;
     129                 : 
     130                 :     /** Unregister a file descriptor from monitoring.
     131                 : 
     132                 :         @param fd The file descriptor to unregister.
     133                 :         @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
     134                 :     */
     135                 :     void deregister_fd(int fd, int events) const;
     136                 : 
     137                 :     void work_started() noexcept override;
     138                 :     void work_finished() noexcept override;
     139                 : 
     140                 :     // Event flags for register_fd/deregister_fd
     141                 :     static constexpr int event_read  = 1;
     142                 :     static constexpr int event_write = 2;
     143                 : 
     144                 : private:
     145                 :     std::size_t do_one(long timeout_us);
     146                 :     void run_reactor(std::unique_lock<std::mutex>& lock);
     147                 :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     148                 :     void interrupt_reactor() const;
     149                 :     long calculate_timeout(long requested_timeout_us) const;
     150                 : 
     151                 :     // Self-pipe for interrupting select()
     152                 :     int pipe_fds_[2]; // [0]=read, [1]=write
     153                 : 
     154                 :     mutable std::mutex mutex_;
     155                 :     mutable std::condition_variable wakeup_event_;
     156                 :     mutable op_queue completed_ops_;
     157                 :     mutable std::atomic<long> outstanding_work_;
     158                 :     std::atomic<bool> stopped_;
     159                 : 
     160                 :     // Per-fd state for tracking registered operations
     161                 :     struct fd_state
     162                 :     {
     163                 :         select_op* read_op  = nullptr;
     164                 :         select_op* write_op = nullptr;
     165                 :     };
     166                 :     mutable std::unordered_map<int, fd_state> registered_fds_;
     167                 :     mutable int max_fd_ = -1;
     168                 : 
     169                 :     // Single reactor thread coordination
     170                 :     mutable bool reactor_running_     = false;
     171                 :     mutable bool reactor_interrupted_ = false;
     172                 :     mutable int idle_thread_count_    = 0;
     173                 : 
     174                 :     // Sentinel operation for interleaving reactor runs with handler execution.
     175                 :     // Ensures the reactor runs periodically even when handlers are continuously
     176                 :     // posted, preventing timer starvation.
     177                 :     struct task_op final : scheduler_op
     178                 :     {
     179 MIS           0 :         void operator()() override {}
     180               0 :         void destroy() override {}
     181                 :     };
     182                 :     task_op task_op_;
     183                 : };
     184                 : 
     185                 : /*
     186                 :     select Scheduler - Single Reactor Model
     187                 :     =======================================
     188                 : 
     189                 :     This scheduler mirrors the epoll_scheduler design but uses select() instead
     190                 :     of epoll for I/O multiplexing. The thread coordination strategy is identical:
     191                 :     one thread becomes the "reactor" while others wait on a condition variable.
     192                 : 
     193                 :     Thread Model
     194                 :     ------------
     195                 :     - ONE thread runs select() at a time (the reactor thread)
     196                 :     - OTHER threads wait on wakeup_event_ (condition variable) for handlers
     197                 :     - When work is posted, exactly one waiting thread wakes via notify_one()
     198                 : 
     199                 :     Key Differences from epoll
     200                 :     --------------------------
     201                 :     - Uses self-pipe instead of eventfd for interruption (more portable)
     202                 :     - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
     203                 :     - FD_SETSIZE limit (~1024 fds on most systems)
     204                 :     - Level-triggered only (no edge-triggered mode)
     205                 : 
     206                 :     Self-Pipe Pattern
     207                 :     -----------------
     208                 :     To interrupt a blocking select() call (e.g., when work is posted or a timer
     209                 :     expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
     210                 :     always in the read_fds set, so select() returns immediately. We drain the
     211                 :     pipe to clear the readable state.
     212                 : 
     213                 :     fd-to-op Mapping
     214                 :     ----------------
     215                 :     We use an unordered_map<int, fd_state> to track which operations are
     216                 :     registered for each fd. This allows O(1) lookup when select() returns
     217                 :     ready fds. Each fd can have at most one read op and one write op registered.
     218                 : */
     219                 : 
     220                 : namespace select {
     221                 : 
     222                 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
     223                 : {
     224                 :     select_scheduler const* key;
     225                 :     scheduler_context* next;
     226                 : };
     227                 : 
     228                 : inline thread_local_ptr<scheduler_context> context_stack;
     229                 : 
     230                 : struct thread_context_guard
     231                 : {
     232                 :     scheduler_context frame_;
     233                 : 
     234 HIT         127 :     explicit thread_context_guard(select_scheduler const* ctx) noexcept
     235             127 :         : frame_{ctx, context_stack.get()}
     236                 :     {
     237             127 :         context_stack.set(&frame_);
     238             127 :     }
     239                 : 
     240             127 :     ~thread_context_guard() noexcept
     241                 :     {
     242             127 :         context_stack.set(frame_.next);
     243             127 :     }
     244                 : };
     245                 : 
     246                 : struct work_guard
     247                 : {
     248                 :     select_scheduler* self;
     249          153835 :     ~work_guard()
     250                 :     {
     251          153835 :         self->work_finished();
     252          153835 :     }
     253                 : };
     254                 : 
     255                 : } // namespace select
     256                 : 
     257             139 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
     258             139 :     : pipe_fds_{-1, -1}
     259             139 :     , outstanding_work_(0)
     260             139 :     , stopped_(false)
     261             139 :     , max_fd_(-1)
     262             139 :     , reactor_running_(false)
     263             139 :     , reactor_interrupted_(false)
     264             278 :     , idle_thread_count_(0)
     265                 : {
     266                 :     // Create self-pipe for interrupting select()
     267             139 :     if (::pipe(pipe_fds_) < 0)
     268 MIS           0 :         detail::throw_system_error(make_err(errno), "pipe");
     269                 : 
     270                 :     // Set both ends to non-blocking and close-on-exec
     271 HIT         417 :     for (int i = 0; i < 2; ++i)
     272                 :     {
     273             278 :         int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
     274             278 :         if (flags == -1)
     275                 :         {
     276 MIS           0 :             int errn = errno;
     277               0 :             ::close(pipe_fds_[0]);
     278               0 :             ::close(pipe_fds_[1]);
     279               0 :             detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
     280                 :         }
     281 HIT         278 :         if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
     282                 :         {
     283 MIS           0 :             int errn = errno;
     284               0 :             ::close(pipe_fds_[0]);
     285               0 :             ::close(pipe_fds_[1]);
     286               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
     287                 :         }
     288 HIT         278 :         if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
     289                 :         {
     290 MIS           0 :             int errn = errno;
     291               0 :             ::close(pipe_fds_[0]);
     292               0 :             ::close(pipe_fds_[1]);
     293               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
     294                 :         }
     295                 :     }
     296                 : 
     297 HIT         139 :     timer_svc_ = &get_timer_service(ctx, *this);
     298             139 :     timer_svc_->set_on_earliest_changed(
     299            3071 :         timer_service::callback(this, [](void* p) {
     300            2932 :             static_cast<select_scheduler*>(p)->interrupt_reactor();
     301            2932 :         }));
     302                 : 
     303                 :     // Initialize resolver service
     304             139 :     get_resolver_service(ctx, *this);
     305                 : 
     306                 :     // Initialize signal service
     307             139 :     get_signal_service(ctx, *this);
     308                 : 
     309                 :     // Push task sentinel to interleave reactor runs with handler execution
     310             139 :     completed_ops_.push(&task_op_);
     311             139 : }
     312                 : 
     313             278 : inline select_scheduler::~select_scheduler()
     314                 : {
     315             139 :     if (pipe_fds_[0] >= 0)
     316             139 :         ::close(pipe_fds_[0]);
     317             139 :     if (pipe_fds_[1] >= 0)
     318             139 :         ::close(pipe_fds_[1]);
     319             278 : }
     320                 : 
     321                 : inline void
     322             139 : select_scheduler::shutdown()
     323                 : {
     324                 :     {
     325             139 :         std::unique_lock lock(mutex_);
     326                 : 
     327             285 :         while (auto* h = completed_ops_.pop())
     328                 :         {
     329             146 :             if (h == &task_op_)
     330             139 :                 continue;
     331               7 :             lock.unlock();
     332               7 :             h->destroy();
     333               7 :             lock.lock();
     334             146 :         }
     335             139 :     }
     336                 : 
     337             139 :     if (pipe_fds_[1] >= 0)
     338             139 :         interrupt_reactor();
     339                 : 
     340             139 :     wakeup_event_.notify_all();
     341             139 : }
     342                 : 
     343                 : inline void
     344            3280 : select_scheduler::post(std::coroutine_handle<> h) const
     345                 : {
     346                 :     struct post_handler final : scheduler_op
     347                 :     {
     348                 :         std::coroutine_handle<> h_;
     349                 : 
     350            3280 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     351                 : 
     352            6560 :         ~post_handler() override = default;
     353                 : 
     354            3277 :         void operator()() override
     355                 :         {
     356            3277 :             auto h = h_;
     357            3277 :             delete this;
     358            3277 :             h.resume();
     359            3277 :         }
     360                 : 
     361               3 :         void destroy() override
     362                 :         {
     363               3 :             auto h = h_;
     364               3 :             delete this;
     365               3 :             h.destroy();
     366               3 :         }
     367                 :     };
     368                 : 
     369            3280 :     auto ph = std::make_unique<post_handler>(h);
     370            3280 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     371                 : 
     372            3280 :     std::unique_lock lock(mutex_);
     373            3280 :     completed_ops_.push(ph.release());
     374            3280 :     wake_one_thread_and_unlock(lock);
     375            3280 : }
     376                 : 
     377                 : inline void
     378          145006 : select_scheduler::post(scheduler_op* h) const
     379                 : {
     380          145006 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     381                 : 
     382          145006 :     std::unique_lock lock(mutex_);
     383          145006 :     completed_ops_.push(h);
     384          145006 :     wake_one_thread_and_unlock(lock);
     385          145006 : }
     386                 : 
     387                 : inline bool
     388             555 : select_scheduler::running_in_this_thread() const noexcept
     389                 : {
     390             555 :     for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
     391             358 :         if (c->key == this)
     392             358 :             return true;
     393             197 :     return false;
     394                 : }
     395                 : 
     396                 : inline void
     397             105 : select_scheduler::stop()
     398                 : {
     399             105 :     bool expected = false;
     400             105 :     if (stopped_.compare_exchange_strong(
     401                 :             expected, true, std::memory_order_release,
     402                 :             std::memory_order_relaxed))
     403                 :     {
     404                 :         // Wake all threads so they notice stopped_ and exit
     405                 :         {
     406             105 :             std::lock_guard lock(mutex_);
     407             105 :             wakeup_event_.notify_all();
     408             105 :         }
     409             105 :         interrupt_reactor();
     410                 :     }
     411             105 : }
     412                 : 
     413                 : inline bool
     414               3 : select_scheduler::stopped() const noexcept
     415                 : {
     416               3 :     return stopped_.load(std::memory_order_acquire);
     417                 : }
     418                 : 
     419                 : inline void
     420              37 : select_scheduler::restart()
     421                 : {
     422              37 :     stopped_.store(false, std::memory_order_release);
     423              37 : }
     424                 : 
     425                 : inline std::size_t
     426             101 : select_scheduler::run()
     427                 : {
     428             101 :     if (stopped_.load(std::memory_order_acquire))
     429 MIS           0 :         return 0;
     430                 : 
     431 HIT         202 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     432                 :     {
     433 MIS           0 :         stop();
     434               0 :         return 0;
     435                 :     }
     436                 : 
     437 HIT         101 :     select::thread_context_guard ctx(this);
     438                 : 
     439             101 :     std::size_t n = 0;
     440          153910 :     while (do_one(-1))
     441          153809 :         if (n != (std::numeric_limits<std::size_t>::max)())
     442          153809 :             ++n;
     443             101 :     return n;
     444             101 : }
     445                 : 
     446                 : inline std::size_t
     447 MIS           0 : select_scheduler::run_one()
     448                 : {
     449               0 :     if (stopped_.load(std::memory_order_acquire))
     450               0 :         return 0;
     451                 : 
     452               0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     453                 :     {
     454               0 :         stop();
     455               0 :         return 0;
     456                 :     }
     457                 : 
     458               0 :     select::thread_context_guard ctx(this);
     459               0 :     return do_one(-1);
     460               0 : }
     461                 : 
     462                 : inline std::size_t
     463 HIT          27 : select_scheduler::wait_one(long usec)
     464                 : {
     465              27 :     if (stopped_.load(std::memory_order_acquire))
     466               3 :         return 0;
     467                 : 
     468              48 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     469                 :     {
     470 MIS           0 :         stop();
     471               0 :         return 0;
     472                 :     }
     473                 : 
     474 HIT          24 :     select::thread_context_guard ctx(this);
     475              24 :     return do_one(usec);
     476              24 : }
     477                 : 
     478                 : inline std::size_t
     479               2 : select_scheduler::poll()
     480                 : {
     481               2 :     if (stopped_.load(std::memory_order_acquire))
     482 MIS           0 :         return 0;
     483                 : 
     484 HIT           4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     485                 :     {
     486 MIS           0 :         stop();
     487               0 :         return 0;
     488                 :     }
     489                 : 
     490 HIT           2 :     select::thread_context_guard ctx(this);
     491                 : 
     492               2 :     std::size_t n = 0;
     493               4 :     while (do_one(0))
     494               2 :         if (n != (std::numeric_limits<std::size_t>::max)())
     495               2 :             ++n;
     496               2 :     return n;
     497               2 : }
     498                 : 
     499                 : inline std::size_t
     500 MIS           0 : select_scheduler::poll_one()
     501                 : {
     502               0 :     if (stopped_.load(std::memory_order_acquire))
     503               0 :         return 0;
     504                 : 
     505               0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     506                 :     {
     507               0 :         stop();
     508               0 :         return 0;
     509                 :     }
     510                 : 
     511               0 :     select::thread_context_guard ctx(this);
     512               0 :     return do_one(0);
     513               0 : }
     514                 : 
     515                 : inline void
     516 HIT        5714 : select_scheduler::register_fd(int fd, select_op* op, int events) const
     517                 : {
     518                 :     // Validate fd is within select() limits
     519            5714 :     if (fd < 0 || fd >= FD_SETSIZE)
     520 MIS           0 :         detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
     521                 : 
     522                 :     {
     523 HIT        5714 :         std::lock_guard lock(mutex_);
     524                 : 
     525            5714 :         auto& state = registered_fds_[fd];
     526            5714 :         if (events & event_read)
     527            2994 :             state.read_op = op;
     528            5714 :         if (events & event_write)
     529            2720 :             state.write_op = op;
     530                 : 
     531            5714 :         if (fd > max_fd_)
     532             228 :             max_fd_ = fd;
     533            5714 :     }
     534                 : 
     535                 :     // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
     536                 :     // with the newly registered fd.
     537            5714 :     interrupt_reactor();
     538            5714 : }
     539                 : 
     540                 : inline void
     541            5651 : select_scheduler::deregister_fd(int fd, int events) const
     542                 : {
     543            5651 :     std::lock_guard lock(mutex_);
     544                 : 
     545            5651 :     auto it = registered_fds_.find(fd);
     546            5651 :     if (it == registered_fds_.end())
     547            5493 :         return;
     548                 : 
     549             158 :     if (events & event_read)
     550             158 :         it->second.read_op = nullptr;
     551             158 :     if (events & event_write)
     552 MIS           0 :         it->second.write_op = nullptr;
     553                 : 
     554                 :     // Remove entry if both are null
     555 HIT         158 :     if (!it->second.read_op && !it->second.write_op)
     556                 :     {
     557             158 :         registered_fds_.erase(it);
     558                 : 
     559                 :         // Recalculate max_fd_ if needed
     560             158 :         if (fd == max_fd_)
     561                 :         {
     562             157 :             max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
     563             157 :             for (auto& [registered_fd, state] : registered_fds_)
     564                 :             {
     565 MIS           0 :                 if (registered_fd > max_fd_)
     566               0 :                     max_fd_ = registered_fd;
     567                 :             }
     568                 :         }
     569                 :     }
     570 HIT        5651 : }
     571                 : 
     572                 : inline void
     573            9225 : select_scheduler::work_started() noexcept
     574                 : {
     575            9225 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     576            9225 : }
     577                 : 
     578                 : inline void
     579          157504 : select_scheduler::work_finished() noexcept
     580                 : {
     581          315008 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     582             104 :         stop();
     583          157504 : }
     584                 : 
     585                 : inline void
     586           11809 : select_scheduler::interrupt_reactor() const
     587                 : {
     588           11809 :     char byte               = 1;
     589           11809 :     [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
     590           11809 : }
     591                 : 
     592                 : inline void
     593          148286 : select_scheduler::wake_one_thread_and_unlock(
     594                 :     std::unique_lock<std::mutex>& lock) const
     595                 : {
     596          148286 :     if (idle_thread_count_ > 0)
     597                 :     {
     598                 :         // Idle worker exists - wake it via condvar
     599 MIS           0 :         wakeup_event_.notify_one();
     600               0 :         lock.unlock();
     601                 :     }
     602 HIT      148286 :     else if (reactor_running_ && !reactor_interrupted_)
     603                 :     {
     604                 :         // No idle workers but reactor is running - interrupt it
     605            2919 :         reactor_interrupted_ = true;
     606            2919 :         lock.unlock();
     607            2919 :         interrupt_reactor();
     608                 :     }
     609                 :     else
     610                 :     {
     611                 :         // No one to wake
     612          145367 :         lock.unlock();
     613                 :     }
     614          148286 : }
     615                 : 
     616                 : inline long
     617            8330 : select_scheduler::calculate_timeout(long requested_timeout_us) const
     618                 : {
     619            8330 :     if (requested_timeout_us == 0)
     620 MIS           0 :         return 0;
     621                 : 
     622 HIT        8330 :     auto nearest = timer_svc_->nearest_expiry();
     623            8330 :     if (nearest == timer_service::time_point::max())
     624              37 :         return requested_timeout_us;
     625                 : 
     626            8293 :     auto now = std::chrono::steady_clock::now();
     627            8293 :     if (nearest <= now)
     628             165 :         return 0;
     629                 : 
     630                 :     auto timer_timeout_us =
     631            8128 :         std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
     632            8128 :             .count();
     633                 : 
     634                 :     // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
     635            8128 :     constexpr auto long_max =
     636                 :         static_cast<long long>((std::numeric_limits<long>::max)());
     637                 :     auto capped_timer_us =
     638            8128 :         (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
     639            8128 :                               static_cast<long long>(0)),
     640            8128 :                    long_max);
     641                 : 
     642            8128 :     if (requested_timeout_us < 0)
     643            8128 :         return static_cast<long>(capped_timer_us);
     644                 : 
     645                 :     // requested_timeout_us is already long, so min() result fits in long
     646                 :     return static_cast<long>(
     647 MIS           0 :         (std::min)(static_cast<long long>(requested_timeout_us),
     648               0 :                    capped_timer_us));
     649                 : }
     650                 : 
     651                 : inline void
     652 HIT       82782 : select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
     653                 : {
     654                 :     // Calculate timeout considering timers, use 0 if interrupted
     655                 :     long effective_timeout_us =
     656           82782 :         reactor_interrupted_ ? 0 : calculate_timeout(-1);
     657                 : 
     658                 :     // Build fd_sets from registered_fds_
     659                 :     fd_set read_fds, write_fds, except_fds;
     660         1407294 :     FD_ZERO(&read_fds);
     661         1407294 :     FD_ZERO(&write_fds);
     662         1407294 :     FD_ZERO(&except_fds);
     663                 : 
     664                 :     // Always include the interrupt pipe
     665           82782 :     FD_SET(pipe_fds_[0], &read_fds);
     666           82782 :     int nfds = pipe_fds_[0];
     667                 : 
     668                 :     // Add registered fds
     669           96285 :     for (auto& [fd, state] : registered_fds_)
     670                 :     {
     671           13503 :         if (state.read_op)
     672           10783 :             FD_SET(fd, &read_fds);
     673           13503 :         if (state.write_op)
     674                 :         {
     675            2720 :             FD_SET(fd, &write_fds);
     676                 :             // Also monitor for errors on connect operations
     677            2720 :             FD_SET(fd, &except_fds);
     678                 :         }
     679           13503 :         if (fd > nfds)
     680           10787 :             nfds = fd;
     681                 :     }
     682                 : 
     683                 :     // Convert timeout to timeval
     684                 :     struct timeval tv;
     685           82782 :     struct timeval* tv_ptr = nullptr;
     686           82782 :     if (effective_timeout_us >= 0)
     687                 :     {
     688           82745 :         tv.tv_sec  = effective_timeout_us / 1000000;
     689           82745 :         tv.tv_usec = effective_timeout_us % 1000000;
     690           82745 :         tv_ptr     = &tv;
     691                 :     }
     692                 : 
     693           82782 :     lock.unlock();
     694                 : 
     695           82782 :     int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
     696           82782 :     int saved_errno = errno;
     697                 : 
     698                 :     // Process timers outside the lock
     699           82782 :     timer_svc_->process_expired();
     700                 : 
     701           82782 :     if (ready < 0 && saved_errno != EINTR)
     702 MIS           0 :         detail::throw_system_error(make_err(saved_errno), "select");
     703                 : 
     704                 :     // Re-acquire lock before modifying completed_ops_
     705 HIT       82782 :     lock.lock();
     706                 : 
     707                 :     // Drain the interrupt pipe if readable
     708           82782 :     if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
     709                 :     {
     710                 :         char buf[256];
     711           17372 :         while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
     712                 :         {
     713                 :         }
     714                 :     }
     715                 : 
     716                 :     // Process I/O completions
     717           82782 :     int completions_queued = 0;
     718           82782 :     if (ready > 0)
     719                 :     {
     720                 :         // Iterate over registered fds (copy keys to avoid iterator invalidation)
     721            8686 :         std::vector<int> fds_to_check;
     722            8686 :         fds_to_check.reserve(registered_fds_.size());
     723           19506 :         for (auto& [fd, state] : registered_fds_)
     724           10820 :             fds_to_check.push_back(fd);
     725                 : 
     726           19506 :         for (int fd : fds_to_check)
     727                 :         {
     728           10820 :             auto it = registered_fds_.find(fd);
     729           10820 :             if (it == registered_fds_.end())
     730 MIS           0 :                 continue;
     731                 : 
     732 HIT       10820 :             auto& state = it->second;
     733                 : 
     734                 :             // Check for errors (especially for connect operations)
     735           10820 :             bool has_error = FD_ISSET(fd, &except_fds);
     736                 : 
     737                 :             // Process read readiness
     738           10820 :             if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
     739                 :             {
     740            2836 :                 auto* op = state.read_op;
     741                 :                 // Claim the op by exchanging to unregistered. Both registering and
     742                 :                 // registered states mean the op is ours to complete.
     743            2836 :                 auto prev = op->registered.exchange(
     744                 :                     select_registration_state::unregistered,
     745                 :                     std::memory_order_acq_rel);
     746            2836 :                 if (prev != select_registration_state::unregistered)
     747                 :                 {
     748            2836 :                     state.read_op = nullptr;
     749                 : 
     750            2836 :                     if (has_error)
     751                 :                     {
     752 MIS           0 :                         int errn      = 0;
     753               0 :                         socklen_t len = sizeof(errn);
     754               0 :                         if (::getsockopt(
     755               0 :                                 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     756               0 :                             errn = errno;
     757               0 :                         if (errn == 0)
     758               0 :                             errn = EIO;
     759               0 :                         op->complete(errn, 0);
     760                 :                     }
     761                 :                     else
     762                 :                     {
     763 HIT        2836 :                         op->perform_io();
     764                 :                     }
     765                 : 
     766            2836 :                     completed_ops_.push(op);
     767            2836 :                     ++completions_queued;
     768                 :                 }
     769                 :             }
     770                 : 
     771                 :             // Process write readiness
     772           10820 :             if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
     773                 :             {
     774            2720 :                 auto* op = state.write_op;
     775                 :                 // Claim the op by exchanging to unregistered. Both registering and
     776                 :                 // registered states mean the op is ours to complete.
     777            2720 :                 auto prev = op->registered.exchange(
     778                 :                     select_registration_state::unregistered,
     779                 :                     std::memory_order_acq_rel);
     780            2720 :                 if (prev != select_registration_state::unregistered)
     781                 :                 {
     782            2720 :                     state.write_op = nullptr;
     783                 : 
     784            2720 :                     if (has_error)
     785                 :                     {
     786 MIS           0 :                         int errn      = 0;
     787               0 :                         socklen_t len = sizeof(errn);
     788               0 :                         if (::getsockopt(
     789               0 :                                 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     790               0 :                             errn = errno;
     791               0 :                         if (errn == 0)
     792               0 :                             errn = EIO;
     793               0 :                         op->complete(errn, 0);
     794                 :                     }
     795                 :                     else
     796                 :                     {
     797 HIT        2720 :                         op->perform_io();
     798                 :                     }
     799                 : 
     800            2720 :                     completed_ops_.push(op);
     801            2720 :                     ++completions_queued;
     802                 :                 }
     803                 :             }
     804                 : 
     805                 :             // Clean up empty entries
     806           10820 :             if (!state.read_op && !state.write_op)
     807            5556 :                 registered_fds_.erase(it);
     808                 :         }
     809            8686 :     }
     810                 : 
     811           82782 :     if (completions_queued > 0)
     812                 :     {
     813            2840 :         if (completions_queued == 1)
     814             124 :             wakeup_event_.notify_one();
     815                 :         else
     816            2716 :             wakeup_event_.notify_all();
     817                 :     }
     818           82782 : }
     819                 : 
     820                 : inline std::size_t
     821          153938 : select_scheduler::do_one(long timeout_us)
     822                 : {
     823          153938 :     std::unique_lock lock(mutex_);
     824                 : 
     825                 :     for (;;)
     826                 :     {
     827          236720 :         if (stopped_.load(std::memory_order_acquire))
     828             101 :             return 0;
     829                 : 
     830          236619 :         scheduler_op* op = completed_ops_.pop();
     831                 : 
     832          236619 :         if (op == &task_op_)
     833                 :         {
     834           82784 :             bool more_handlers = !completed_ops_.empty();
     835                 : 
     836           82784 :             if (!more_handlers)
     837                 :             {
     838           16664 :                 if (outstanding_work_.load(std::memory_order_acquire) == 0)
     839                 :                 {
     840 MIS           0 :                     completed_ops_.push(&task_op_);
     841               0 :                     return 0;
     842                 :                 }
     843 HIT        8332 :                 if (timeout_us == 0)
     844                 :                 {
     845               2 :                     completed_ops_.push(&task_op_);
     846               2 :                     return 0;
     847                 :                 }
     848                 :             }
     849                 : 
     850           82782 :             reactor_interrupted_ = more_handlers || timeout_us == 0;
     851           82782 :             reactor_running_     = true;
     852                 : 
     853           82782 :             if (more_handlers && idle_thread_count_ > 0)
     854 MIS           0 :                 wakeup_event_.notify_one();
     855                 : 
     856 HIT       82782 :             run_reactor(lock);
     857                 : 
     858           82782 :             reactor_running_ = false;
     859           82782 :             completed_ops_.push(&task_op_);
     860           82782 :             continue;
     861           82782 :         }
     862                 : 
     863          153835 :         if (op != nullptr)
     864                 :         {
     865          153835 :             lock.unlock();
     866          153835 :             select::work_guard g{this};
     867          153835 :             (*op)();
     868          153835 :             return 1;
     869          153835 :         }
     870                 : 
     871 MIS           0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0)
     872               0 :             return 0;
     873                 : 
     874               0 :         if (timeout_us == 0)
     875               0 :             return 0;
     876                 : 
     877               0 :         ++idle_thread_count_;
     878               0 :         if (timeout_us < 0)
     879               0 :             wakeup_event_.wait(lock);
     880                 :         else
     881               0 :             wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
     882               0 :         --idle_thread_count_;
     883 HIT       82782 :     }
     884          153938 : }
     885                 : 
     886                 : } // namespace boost::corosio::detail
     887                 : 
     888                 : #endif // BOOST_COROSIO_HAS_SELECT
     889                 : 
     890                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
        

Generated by: LCOV version 2.3