LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.3 % 365 326 39
Test Date: 2026-02-25 22:54:20 Functions: 97.8 % 45 44 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/io_context.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/native/native_scheduler.hpp>
      18                 : #include <boost/corosio/detail/intrusive.hpp>
      19                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      20                 : #include <boost/capy/error.hpp>
      21                 : #include <boost/capy/ex/execution_context.hpp>
      22                 : #include <boost/capy/ex/executor_ref.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <atomic>
      26                 : #include <chrono>
      27                 : #include <coroutine>
      28                 : #include <cstddef>
      29                 : #include <limits>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <stop_token>
      33                 : #include <utility>
      34                 : #include <vector>
      35                 : 
      36                 : namespace boost::corosio::detail {
      37                 : 
      38                 : struct scheduler;
      39                 : 
      40                 : /*
      41                 :     Timer Service
      42                 :     =============
      43                 : 
      44                 :     Data Structures
      45                 :     ---------------
      46                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      47                 :     error output, stop_token, embedded completion_op. Each concurrent
      48                 :     co_await t.wait() allocates one waiter_node.
      49                 : 
      50                 :     timer_service::implementation holds per-timer state: expiry,
      51                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      52                 :     coroutines can wait on the same timer simultaneously.
      53                 : 
      54                 :     timer_service owns a min-heap of active timers, a free list
      55                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      56                 :     heap is ordered by expiry time; the scheduler queries
      57                 :     nearest_expiry() to set the epoll/timerfd timeout.
      58                 : 
      59                 :     Optimization Strategy
      60                 :     ---------------------
      61                 :     1. Deferred heap insertion — expires_after() stores the expiry
      62                 :        but does not insert into the heap. Insertion happens in wait().
      63                 :     2. Thread-local impl cache — single-slot per-thread cache.
      64                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      65                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      66                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      67                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      68                 : 
      69                 :     Concurrency
      70                 :     -----------
      71                 :     stop_token callbacks can fire from any thread. The impl_
      72                 :     pointer on waiter_node is used as a "still in list" marker.
      73                 : */
      74                 : 
      75                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      76                 : 
      77                 : inline void timer_service_invalidate_cache() noexcept;
      78                 : 
      79                 : // timer_service class body — member function definitions are
      80                 : // out-of-class (after implementation and waiter_node are complete)
      81                 : class BOOST_COROSIO_DECL timer_service final
      82                 :     : public capy::execution_context::service
      83                 :     , public io_object::io_service
      84                 : {
      85                 : public:
      86                 :     using clock_type = std::chrono::steady_clock;
      87                 :     using time_point = clock_type::time_point;
      88                 : 
      89                 :     class callback
      90                 :     {
      91                 :         void* ctx_         = nullptr;
      92                 :         void (*fn_)(void*) = nullptr;
      93                 : 
      94                 :     public:
      95 HIT         349 :         callback() = default;
      96             349 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
      97                 : 
      98                 :         explicit operator bool() const noexcept
      99                 :         {
     100                 :             return fn_ != nullptr;
     101                 :         }
     102            8261 :         void operator()() const
     103                 :         {
     104            8261 :             if (fn_)
     105            8261 :                 fn_(ctx_);
     106            8261 :         }
     107                 :     };
     108                 : 
     109                 :     struct implementation;
     110                 : 
     111                 : private:
     112                 :     struct heap_entry
     113                 :     {
     114                 :         time_point time_;
     115                 :         implementation* timer_;
     116                 :     };
     117                 : 
     118                 :     scheduler* sched_ = nullptr;
     119                 :     mutable std::mutex mutex_;
     120                 :     std::vector<heap_entry> heap_;
     121                 :     implementation* free_list_     = nullptr;
     122                 :     waiter_node* waiter_free_list_ = nullptr;
     123                 :     callback on_earliest_changed_;
     124                 :     // Avoids mutex in nearest_expiry() and empty()
     125                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     126                 :         (std::numeric_limits<std::int64_t>::max)()};
     127                 : 
     128                 : public:
     129             349 :     inline timer_service(capy::execution_context&, scheduler& sched)
     130             349 :         : sched_(&sched)
     131                 :     {
     132             349 :     }
     133                 : 
     134           16584 :     inline scheduler& get_scheduler() noexcept
     135                 :     {
     136           16584 :         return *sched_;
     137                 :     }
     138                 : 
     139             698 :     ~timer_service() override = default;
     140                 : 
     141                 :     timer_service(timer_service const&)            = delete;
     142                 :     timer_service& operator=(timer_service const&) = delete;
     143                 : 
     144             349 :     inline void set_on_earliest_changed(callback cb)
     145                 :     {
     146             349 :         on_earliest_changed_ = cb;
     147             349 :     }
     148                 : 
     149                 :     inline bool empty() const noexcept
     150                 :     {
     151                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     152                 :             (std::numeric_limits<std::int64_t>::max)();
     153                 :     }
     154                 : 
     155           19555 :     inline time_point nearest_expiry() const noexcept
     156                 :     {
     157           19555 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     158           19555 :         return time_point(time_point::duration(ns));
     159                 :     }
     160                 : 
     161                 :     inline void shutdown() override;
     162                 :     inline io_object::implementation* construct() override;
     163                 :     inline void destroy(io_object::implementation* p) override;
     164                 :     inline void destroy_impl(implementation& impl);
     165                 :     inline waiter_node* create_waiter();
     166                 :     inline void destroy_waiter(waiter_node* w);
     167                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     168                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     169                 :     inline std::size_t cancel_timer(implementation& impl);
     170                 :     inline void cancel_waiter(waiter_node* w);
     171                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     172                 :     inline std::size_t process_expired();
     173                 : 
     174                 : private:
     175          100066 :     inline void refresh_cached_nearest() noexcept
     176                 :     {
     177          100066 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     178           99667 :                                 : heap_[0].time_.time_since_epoch().count();
     179          100066 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     180          100066 :     }
     181                 : 
     182                 :     inline void remove_timer_impl(implementation& impl);
     183                 :     inline void up_heap(std::size_t index);
     184                 :     inline void down_heap(std::size_t index);
     185                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     186                 : };
     187                 : 
     188                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     189                 :     : intrusive_list<waiter_node>::node
     190                 : {
     191                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     192                 :     struct completion_op final : scheduler_op
     193                 :     {
     194                 :         waiter_node* waiter_ = nullptr;
     195                 : 
     196                 :         static void do_complete(
     197                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     198                 : 
     199             152 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     200                 : 
     201                 :         void operator()() override;
     202                 :         void destroy() override;
     203                 :     };
     204                 : 
     205                 :     // Per-waiter stop_token cancellation
     206                 :     struct canceller
     207                 :     {
     208                 :         waiter_node* waiter_;
     209                 :         void operator()() const;
     210                 :     };
     211                 : 
     212                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     213                 :     timer_service::implementation* impl_ = nullptr;
     214                 :     timer_service* svc_                  = nullptr;
     215                 :     std::coroutine_handle<> h_;
     216                 :     capy::executor_ref d_;
     217                 :     std::error_code* ec_out_ = nullptr;
     218                 :     std::stop_token token_;
     219                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     220                 :     completion_op op_;
     221                 :     std::error_code ec_value_;
     222                 :     waiter_node* next_free_ = nullptr;
     223                 : 
     224             152 :     waiter_node() noexcept
     225             152 :     {
     226             152 :         op_.waiter_ = this;
     227             152 :     }
     228                 : };
     229                 : 
     230                 : struct timer_service::implementation final : timer::implementation
     231                 : {
     232                 :     using clock_type = std::chrono::steady_clock;
     233                 :     using time_point = clock_type::time_point;
     234                 :     using duration   = clock_type::duration;
     235                 : 
     236                 :     timer_service* svc_ = nullptr;
     237                 :     intrusive_list<waiter_node> waiters_;
     238                 : 
     239                 :     // Free list linkage (reused when impl is on free_list)
     240                 :     implementation* next_free_ = nullptr;
     241                 : 
     242                 :     inline explicit implementation(timer_service& svc) noexcept;
     243                 : 
     244                 :     inline std::coroutine_handle<> wait(
     245                 :         std::coroutine_handle<>,
     246                 :         capy::executor_ref,
     247                 :         std::stop_token,
     248                 :         std::error_code*) override;
     249                 : };
     250                 : 
     251                 : // Thread-local caches avoid hot-path mutex acquisitions:
     252                 : // 1. Impl cache — single-slot, validated by comparing svc_
     253                 : // 2. Waiter cache — single-slot, no service affinity
     254                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     255                 : 
     256                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     257                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     258                 : 
     259                 : inline timer_service::implementation*
     260            8577 : try_pop_tl_cache(timer_service* svc) noexcept
     261                 : {
     262            8577 :     auto* impl = tl_cached_impl.get();
     263            8577 :     if (impl)
     264                 :     {
     265            8394 :         tl_cached_impl.set(nullptr);
     266            8394 :         if (impl->svc_ == svc)
     267            8394 :             return impl;
     268                 :         // Stale impl from a destroyed service
     269 MIS           0 :         delete impl;
     270                 :     }
     271 HIT         183 :     return nullptr;
     272                 : }
     273                 : 
     274                 : inline bool
     275            8575 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     276                 : {
     277            8575 :     if (!tl_cached_impl.get())
     278                 :     {
     279            8523 :         tl_cached_impl.set(impl);
     280            8523 :         return true;
     281                 :     }
     282              52 :     return false;
     283                 : }
     284                 : 
     285                 : inline waiter_node*
     286            8293 : try_pop_waiter_tl_cache() noexcept
     287                 : {
     288            8293 :     auto* w = tl_cached_waiter.get();
     289            8293 :     if (w)
     290                 :     {
     291            8141 :         tl_cached_waiter.set(nullptr);
     292            8141 :         return w;
     293                 :     }
     294             152 :     return nullptr;
     295                 : }
     296                 : 
     297                 : inline bool
     298            8283 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     299                 : {
     300            8283 :     if (!tl_cached_waiter.get())
     301                 :     {
     302            8225 :         tl_cached_waiter.set(w);
     303            8225 :         return true;
     304                 :     }
     305              58 :     return false;
     306                 : }
     307                 : 
     308                 : inline void
     309             349 : timer_service_invalidate_cache() noexcept
     310                 : {
     311             349 :     delete tl_cached_impl.get();
     312             349 :     tl_cached_impl.set(nullptr);
     313                 : 
     314             349 :     delete tl_cached_waiter.get();
     315             349 :     tl_cached_waiter.set(nullptr);
     316             349 : }
     317                 : 
     318                 : // timer_service out-of-class member function definitions
     319                 : 
     320             183 : inline timer_service::implementation::implementation(
     321             183 :     timer_service& svc) noexcept
     322             183 :     : svc_(&svc)
     323                 : {
     324             183 : }
     325                 : 
     326                 : inline void
     327             349 : timer_service::shutdown()
     328                 : {
     329             349 :     timer_service_invalidate_cache();
     330                 : 
     331                 :     // Cancel waiting timers still in the heap.
     332                 :     // Each waiter called work_started() in implementation::wait().
     333                 :     // On IOCP the scheduler shutdown loop exits when outstanding_work_
     334                 :     // reaches zero, so we must call work_finished() here to balance it.
     335                 :     // On other backends this is harmless (their drain loops exit when
     336                 :     // the queue is empty, not based on outstanding_work_).
     337             351 :     for (auto& entry : heap_)
     338                 :     {
     339               2 :         auto* impl = entry.timer_;
     340               4 :         while (auto* w = impl->waiters_.pop_front())
     341                 :         {
     342               2 :             w->stop_cb_.reset();
     343               2 :             auto h = std::exchange(w->h_, {});
     344               2 :             sched_->work_finished();
     345               2 :             if (h)
     346               2 :                 h.destroy();
     347               2 :             delete w;
     348               2 :         }
     349               2 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     350               2 :         delete impl;
     351                 :     }
     352             349 :     heap_.clear();
     353             349 :     cached_nearest_ns_.store(
     354                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     355                 : 
     356                 :     // Delete free-listed impls
     357             401 :     while (free_list_)
     358                 :     {
     359              52 :         auto* next = free_list_->next_free_;
     360              52 :         delete free_list_;
     361              52 :         free_list_ = next;
     362                 :     }
     363                 : 
     364                 :     // Delete free-listed waiters
     365             407 :     while (waiter_free_list_)
     366                 :     {
     367              58 :         auto* next = waiter_free_list_->next_free_;
     368              58 :         delete waiter_free_list_;
     369              58 :         waiter_free_list_ = next;
     370                 :     }
     371             349 : }
     372                 : 
     373                 : inline io_object::implementation*
     374            8577 : timer_service::construct()
     375                 : {
     376            8577 :     implementation* impl = try_pop_tl_cache(this);
     377            8577 :     if (impl)
     378                 :     {
     379            8394 :         impl->svc_        = this;
     380            8394 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     381            8394 :         impl->might_have_pending_waits_ = false;
     382            8394 :         return impl;
     383                 :     }
     384                 : 
     385             183 :     std::lock_guard lock(mutex_);
     386             183 :     if (free_list_)
     387                 :     {
     388 MIS           0 :         impl              = free_list_;
     389               0 :         free_list_        = impl->next_free_;
     390               0 :         impl->next_free_  = nullptr;
     391               0 :         impl->svc_        = this;
     392               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     393               0 :         impl->might_have_pending_waits_ = false;
     394                 :     }
     395                 :     else
     396                 :     {
     397 HIT         183 :         impl = new implementation(*this);
     398                 :     }
     399             183 :     return impl;
     400             183 : }
     401                 : 
     402                 : inline void
     403            8575 : timer_service::destroy(io_object::implementation* p)
     404                 : {
     405            8575 :     destroy_impl(static_cast<implementation&>(*p));
     406            8575 : }
     407                 : 
     408                 : inline void
     409            8575 : timer_service::destroy_impl(implementation& impl)
     410                 : {
     411            8575 :     cancel_timer(impl);
     412                 : 
     413            8575 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     414                 :     {
     415 MIS           0 :         std::lock_guard lock(mutex_);
     416               0 :         remove_timer_impl(impl);
     417               0 :         refresh_cached_nearest();
     418               0 :     }
     419                 : 
     420 HIT        8575 :     if (try_push_tl_cache(&impl))
     421            8523 :         return;
     422                 : 
     423              52 :     std::lock_guard lock(mutex_);
     424              52 :     impl.next_free_ = free_list_;
     425              52 :     free_list_      = &impl;
     426              52 : }
     427                 : 
     428                 : inline waiter_node*
     429            8293 : timer_service::create_waiter()
     430                 : {
     431            8293 :     if (auto* w = try_pop_waiter_tl_cache())
     432            8141 :         return w;
     433                 : 
     434             152 :     std::lock_guard lock(mutex_);
     435             152 :     if (waiter_free_list_)
     436                 :     {
     437 MIS           0 :         auto* w           = waiter_free_list_;
     438               0 :         waiter_free_list_ = w->next_free_;
     439               0 :         w->next_free_     = nullptr;
     440               0 :         return w;
     441                 :     }
     442                 : 
     443 HIT         152 :     return new waiter_node();
     444             152 : }
     445                 : 
     446                 : inline void
     447            8283 : timer_service::destroy_waiter(waiter_node* w)
     448                 : {
     449            8283 :     if (try_push_waiter_tl_cache(w))
     450            8225 :         return;
     451                 : 
     452              58 :     std::lock_guard lock(mutex_);
     453              58 :     w->next_free_     = waiter_free_list_;
     454              58 :     waiter_free_list_ = w;
     455              58 : }
     456                 : 
     457                 : inline std::size_t
     458               6 : timer_service::update_timer(implementation& impl, time_point new_time)
     459                 : {
     460                 :     bool in_heap =
     461               6 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     462               6 :     if (!in_heap && impl.waiters_.empty())
     463 MIS           0 :         return 0;
     464                 : 
     465 HIT           6 :     bool notify = false;
     466               6 :     intrusive_list<waiter_node> canceled;
     467                 : 
     468                 :     {
     469               6 :         std::lock_guard lock(mutex_);
     470                 : 
     471              16 :         while (auto* w = impl.waiters_.pop_front())
     472                 :         {
     473              10 :             w->impl_ = nullptr;
     474              10 :             canceled.push_back(w);
     475              10 :         }
     476                 : 
     477               6 :         if (impl.heap_index_ < heap_.size())
     478                 :         {
     479               6 :             time_point old_time           = heap_[impl.heap_index_].time_;
     480               6 :             heap_[impl.heap_index_].time_ = new_time;
     481                 : 
     482               6 :             if (new_time < old_time)
     483               6 :                 up_heap(impl.heap_index_);
     484                 :             else
     485 MIS           0 :                 down_heap(impl.heap_index_);
     486                 : 
     487 HIT           6 :             notify = (impl.heap_index_ == 0);
     488                 :         }
     489                 : 
     490               6 :         refresh_cached_nearest();
     491               6 :     }
     492                 : 
     493               6 :     std::size_t count = 0;
     494              16 :     while (auto* w = canceled.pop_front())
     495                 :     {
     496              10 :         w->ec_value_ = make_error_code(capy::error::canceled);
     497              10 :         sched_->post(&w->op_);
     498              10 :         ++count;
     499              10 :     }
     500                 : 
     501               6 :     if (notify)
     502               6 :         on_earliest_changed_();
     503                 : 
     504               6 :     return count;
     505                 : }
     506                 : 
     507                 : inline void
     508            8293 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     509                 : {
     510            8293 :     bool notify = false;
     511                 :     {
     512            8293 :         std::lock_guard lock(mutex_);
     513            8293 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     514                 :         {
     515            8271 :             impl.heap_index_ = heap_.size();
     516            8271 :             heap_.push_back({impl.expiry_, &impl});
     517            8271 :             up_heap(heap_.size() - 1);
     518            8271 :             notify = (impl.heap_index_ == 0);
     519            8271 :             refresh_cached_nearest();
     520                 :         }
     521            8293 :         impl.waiters_.push_back(w);
     522            8293 :     }
     523            8293 :     if (notify)
     524            8255 :         on_earliest_changed_();
     525            8293 : }
     526                 : 
     527                 : inline std::size_t
     528            8583 : timer_service::cancel_timer(implementation& impl)
     529                 : {
     530            8583 :     if (!impl.might_have_pending_waits_)
     531            8559 :         return 0;
     532                 : 
     533                 :     // Not in heap and no waiters — just clear the flag
     534              24 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     535 MIS           0 :         impl.waiters_.empty())
     536                 :     {
     537               0 :         impl.might_have_pending_waits_ = false;
     538               0 :         return 0;
     539                 :     }
     540                 : 
     541 HIT          24 :     intrusive_list<waiter_node> canceled;
     542                 : 
     543                 :     {
     544              24 :         std::lock_guard lock(mutex_);
     545              24 :         remove_timer_impl(impl);
     546              52 :         while (auto* w = impl.waiters_.pop_front())
     547                 :         {
     548              28 :             w->impl_ = nullptr;
     549              28 :             canceled.push_back(w);
     550              28 :         }
     551              24 :         refresh_cached_nearest();
     552              24 :     }
     553                 : 
     554              24 :     impl.might_have_pending_waits_ = false;
     555                 : 
     556              24 :     std::size_t count = 0;
     557              52 :     while (auto* w = canceled.pop_front())
     558                 :     {
     559              28 :         w->ec_value_ = make_error_code(capy::error::canceled);
     560              28 :         sched_->post(&w->op_);
     561              28 :         ++count;
     562              28 :     }
     563                 : 
     564              24 :     return count;
     565                 : }
     566                 : 
     567                 : inline void
     568               4 : timer_service::cancel_waiter(waiter_node* w)
     569                 : {
     570                 :     {
     571               4 :         std::lock_guard lock(mutex_);
     572                 :         // Already removed by cancel_timer or process_expired
     573               4 :         if (!w->impl_)
     574 MIS           0 :             return;
     575 HIT           4 :         auto* impl = w->impl_;
     576               4 :         w->impl_   = nullptr;
     577               4 :         impl->waiters_.remove(w);
     578               4 :         if (impl->waiters_.empty())
     579                 :         {
     580               2 :             remove_timer_impl(*impl);
     581               2 :             impl->might_have_pending_waits_ = false;
     582                 :         }
     583               4 :         refresh_cached_nearest();
     584               4 :     }
     585                 : 
     586               4 :     w->ec_value_ = make_error_code(capy::error::canceled);
     587               4 :     sched_->post(&w->op_);
     588                 : }
     589                 : 
     590                 : inline std::size_t
     591               2 : timer_service::cancel_one_waiter(implementation& impl)
     592                 : {
     593               2 :     if (!impl.might_have_pending_waits_)
     594 MIS           0 :         return 0;
     595                 : 
     596 HIT           2 :     waiter_node* w = nullptr;
     597                 : 
     598                 :     {
     599               2 :         std::lock_guard lock(mutex_);
     600               2 :         w = impl.waiters_.pop_front();
     601               2 :         if (!w)
     602 MIS           0 :             return 0;
     603 HIT           2 :         w->impl_ = nullptr;
     604               2 :         if (impl.waiters_.empty())
     605                 :         {
     606 MIS           0 :             remove_timer_impl(impl);
     607               0 :             impl.might_have_pending_waits_ = false;
     608                 :         }
     609 HIT           2 :         refresh_cached_nearest();
     610               2 :     }
     611                 : 
     612               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     613               2 :     sched_->post(&w->op_);
     614               2 :     return 1;
     615                 : }
     616                 : 
     617                 : inline std::size_t
     618           91759 : timer_service::process_expired()
     619                 : {
     620           91759 :     intrusive_list<waiter_node> expired;
     621                 : 
     622                 :     {
     623           91759 :         std::lock_guard lock(mutex_);
     624           91759 :         auto now = clock_type::now();
     625                 : 
     626          100002 :         while (!heap_.empty() && heap_[0].time_ <= now)
     627                 :         {
     628            8243 :             implementation* t = heap_[0].timer_;
     629            8243 :             remove_timer_impl(*t);
     630           16490 :             while (auto* w = t->waiters_.pop_front())
     631                 :             {
     632            8247 :                 w->impl_     = nullptr;
     633            8247 :                 w->ec_value_ = {};
     634            8247 :                 expired.push_back(w);
     635            8247 :             }
     636            8243 :             t->might_have_pending_waits_ = false;
     637                 :         }
     638                 : 
     639           91759 :         refresh_cached_nearest();
     640           91759 :     }
     641                 : 
     642           91759 :     std::size_t count = 0;
     643          100006 :     while (auto* w = expired.pop_front())
     644                 :     {
     645            8247 :         sched_->post(&w->op_);
     646            8247 :         ++count;
     647            8247 :     }
     648                 : 
     649           91759 :     return count;
     650                 : }
     651                 : 
     652                 : inline void
     653            8269 : timer_service::remove_timer_impl(implementation& impl)
     654                 : {
     655            8269 :     std::size_t index = impl.heap_index_;
     656            8269 :     if (index >= heap_.size())
     657 MIS           0 :         return; // Not in heap
     658                 : 
     659 HIT        8269 :     if (index == heap_.size() - 1)
     660                 :     {
     661                 :         // Last element, just pop
     662             111 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     663             111 :         heap_.pop_back();
     664                 :     }
     665                 :     else
     666                 :     {
     667                 :         // Swap with last and reheapify
     668            8158 :         swap_heap(index, heap_.size() - 1);
     669            8158 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     670            8158 :         heap_.pop_back();
     671                 : 
     672            8158 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     673 MIS           0 :             up_heap(index);
     674                 :         else
     675 HIT        8158 :             down_heap(index);
     676                 :     }
     677                 : }
     678                 : 
     679                 : inline void
     680            8277 : timer_service::up_heap(std::size_t index)
     681                 : {
     682           16425 :     while (index > 0)
     683                 :     {
     684            8164 :         std::size_t parent = (index - 1) / 2;
     685            8164 :         if (!(heap_[index].time_ < heap_[parent].time_))
     686              16 :             break;
     687            8148 :         swap_heap(index, parent);
     688            8148 :         index = parent;
     689                 :     }
     690            8277 : }
     691                 : 
     692                 : inline void
     693            8158 : timer_service::down_heap(std::size_t index)
     694                 : {
     695            8158 :     std::size_t child = index * 2 + 1;
     696            8158 :     while (child < heap_.size())
     697                 :     {
     698               4 :         std::size_t min_child = (child + 1 == heap_.size() ||
     699 MIS           0 :                                  heap_[child].time_ < heap_[child + 1].time_)
     700 HIT           4 :             ? child
     701               4 :             : child + 1;
     702                 : 
     703               4 :         if (heap_[index].time_ < heap_[min_child].time_)
     704               4 :             break;
     705                 : 
     706 MIS           0 :         swap_heap(index, min_child);
     707               0 :         index = min_child;
     708               0 :         child = index * 2 + 1;
     709                 :     }
     710 HIT        8158 : }
     711                 : 
     712                 : inline void
     713           16306 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     714                 : {
     715           16306 :     heap_entry tmp                = heap_[i1];
     716           16306 :     heap_[i1]                     = heap_[i2];
     717           16306 :     heap_[i2]                     = tmp;
     718           16306 :     heap_[i1].timer_->heap_index_ = i1;
     719           16306 :     heap_[i2].timer_->heap_index_ = i2;
     720           16306 : }
     721                 : 
     722                 : // waiter_node out-of-class member function definitions
     723                 : 
     724                 : inline void
     725               4 : waiter_node::canceller::operator()() const
     726                 : {
     727               4 :     waiter_->svc_->cancel_waiter(waiter_);
     728               4 : }
     729                 : 
     730                 : inline void
     731 MIS           0 : waiter_node::completion_op::do_complete(
     732                 :     [[maybe_unused]] void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
     733                 : {
     734                 :     // owner is always non-null here. The destroy path (owner == nullptr)
     735                 :     // is unreachable because completion_op overrides destroy() directly,
     736                 :     // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
     737               0 :     BOOST_COROSIO_ASSERT(owner);
     738               0 :     static_cast<completion_op*>(base)->operator()();
     739               0 : }
     740                 : 
     741                 : inline void
     742 HIT        8283 : waiter_node::completion_op::operator()()
     743                 : {
     744            8283 :     auto* w = waiter_;
     745            8283 :     w->stop_cb_.reset();
     746            8283 :     if (w->ec_out_)
     747            8283 :         *w->ec_out_ = w->ec_value_;
     748                 : 
     749            8283 :     auto h      = w->h_;
     750            8283 :     auto d      = w->d_;
     751            8283 :     auto* svc   = w->svc_;
     752            8283 :     auto& sched = svc->get_scheduler();
     753                 : 
     754            8283 :     svc->destroy_waiter(w);
     755                 : 
     756            8283 :     d.post(h);
     757            8283 :     sched.work_finished();
     758            8283 : }
     759                 : 
     760                 : inline void
     761               8 : waiter_node::completion_op::destroy()
     762                 : {
     763                 :     // Called during scheduler shutdown drain when this completion_op is
     764                 :     // in the scheduler's ready queue (posted by cancel_timer() or
     765                 :     // process_expired()). Balances the work_started() from
     766                 :     // implementation::wait(). The scheduler drain loop separately
     767                 :     // balances the work_started() from post(). On IOCP both decrements
     768                 :     // are required for outstanding_work_ to reach zero; on other
     769                 :     // backends this is harmless.
     770                 :     //
     771                 :     // This override also prevents scheduler_op::destroy() from calling
     772                 :     // do_complete(nullptr, ...). See also: timer_service::shutdown()
     773                 :     // which drains waiters still in the timer heap (the other path).
     774               8 :     auto* w = waiter_;
     775               8 :     w->stop_cb_.reset();
     776               8 :     auto h = std::exchange(w->h_, {});
     777               8 :     auto& sched = w->svc_->get_scheduler();
     778               8 :     delete w;
     779               8 :     sched.work_finished();
     780               8 :     if (h)
     781               8 :         h.destroy();
     782               8 : }
     783                 : 
     784                 : inline std::coroutine_handle<>
     785            8293 : timer_service::implementation::wait(
     786                 :     std::coroutine_handle<> h,
     787                 :     capy::executor_ref d,
     788                 :     std::stop_token token,
     789                 :     std::error_code* ec)
     790                 : {
     791                 :     // Already-expired fast path — no waiter_node, no mutex.
     792                 :     // Post instead of dispatch so the coroutine yields to the
     793                 :     // scheduler, allowing other queued work to run.
     794            8293 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     795                 :     {
     796            8271 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     797                 :         {
     798 MIS           0 :             if (ec)
     799               0 :                 *ec = {};
     800               0 :             d.post(h);
     801               0 :             return std::noop_coroutine();
     802                 :         }
     803                 :     }
     804                 : 
     805 HIT        8293 :     auto* w    = svc_->create_waiter();
     806            8293 :     w->impl_   = this;
     807            8293 :     w->svc_    = svc_;
     808            8293 :     w->h_      = h;
     809            8293 :     w->d_      = d;
     810            8293 :     w->token_  = std::move(token);
     811            8293 :     w->ec_out_ = ec;
     812                 : 
     813            8293 :     svc_->insert_waiter(*this, w);
     814            8293 :     might_have_pending_waits_ = true;
     815            8293 :     svc_->get_scheduler().work_started();
     816                 : 
     817            8293 :     if (w->token_.stop_possible())
     818               4 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     819                 : 
     820            8293 :     return std::noop_coroutine();
     821                 : }
     822                 : 
     823                 : // Free functions
     824                 : 
     825                 : struct timer_service_access
     826                 : {
     827            8577 :     static native_scheduler& get_scheduler(io_context& ctx) noexcept
     828                 :     {
     829            8577 :         return static_cast<native_scheduler&>(*ctx.sched_);
     830                 :     }
     831                 : };
     832                 : 
     833                 : // Bypass find_service() mutex by reading the scheduler's cached pointer
     834                 : inline io_object::io_service&
     835            8577 : timer_service_direct(capy::execution_context& ctx) noexcept
     836                 : {
     837            8577 :     return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
     838            8577 :                 .timer_svc_;
     839                 : }
     840                 : 
     841                 : inline std::size_t
     842               6 : timer_service_update_expiry(timer::implementation& base)
     843                 : {
     844               6 :     auto& impl = static_cast<timer_service::implementation&>(base);
     845               6 :     return impl.svc_->update_timer(impl, impl.expiry_);
     846                 : }
     847                 : 
     848                 : inline std::size_t
     849               8 : timer_service_cancel(timer::implementation& base) noexcept
     850                 : {
     851               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     852               8 :     return impl.svc_->cancel_timer(impl);
     853                 : }
     854                 : 
     855                 : inline std::size_t
     856               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     857                 : {
     858               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     859               2 :     return impl.svc_->cancel_one_waiter(impl);
     860                 : }
     861                 : 
     862                 : inline timer_service&
     863             349 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     864                 : {
     865             349 :     return ctx.make_service<timer_service>(sched);
     866                 : }
     867                 : 
     868                 : } // namespace boost::corosio::detail
     869                 : 
     870                 : #endif
        

Generated by: LCOV version 2.3