TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/select/select_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <sys/socket.h>
34 : #include <unistd.h>
35 : #include <errno.h>
36 : #include <fcntl.h>
37 :
38 : #include <algorithm>
39 : #include <atomic>
40 : #include <chrono>
41 : #include <condition_variable>
42 : #include <cstddef>
43 : #include <limits>
44 : #include <mutex>
45 : #include <unordered_map>
46 :
47 : namespace boost::corosio::detail {
48 :
49 : struct select_op;
50 :
51 : /** POSIX scheduler using select() for I/O multiplexing.
52 :
53 : This scheduler implements the scheduler interface using the POSIX select()
54 : call for I/O event notification. It uses a single reactor model
55 : where one thread runs select() while other threads wait on a condition
56 : variable for handler work. This design provides:
57 :
58 : - Handler parallelism: N posted handlers can execute on N threads
59 : - No thundering herd: condition_variable wakes exactly one thread
60 : - Portability: Works on all POSIX systems
61 :
62 : The design mirrors epoll_scheduler for behavioral consistency:
63 : - Same single-reactor thread coordination model
64 : - Same work counting semantics
65 : - Same timer integration pattern
66 :
67 : Known Limitations:
68 : - FD_SETSIZE (~1024) limits maximum concurrent connections
69 : - O(n) scanning: rebuilds fd_sets each iteration
70 : - Level-triggered only (no edge-triggered mode)
71 :
72 : @par Thread Safety
73 : All public member functions are thread-safe.
74 : */
75 : class BOOST_COROSIO_DECL select_scheduler final
76 : : public native_scheduler
77 : , public capy::execution_context::service
78 : {
79 : public:
80 : using key_type = scheduler;
81 :
82 : /** Construct the scheduler.
83 :
84 : Creates a self-pipe for reactor interruption.
85 :
86 : @param ctx Reference to the owning execution_context.
87 : @param concurrency_hint Hint for expected thread count (unused).
88 : */
89 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90 :
91 : ~select_scheduler() override;
92 :
93 : select_scheduler(select_scheduler const&) = delete;
94 : select_scheduler& operator=(select_scheduler const&) = delete;
95 :
96 : void shutdown() override;
97 : void post(std::coroutine_handle<> h) const override;
98 : void post(scheduler_op* h) const override;
99 : bool running_in_this_thread() const noexcept override;
100 : void stop() override;
101 : bool stopped() const noexcept override;
102 : void restart() override;
103 : std::size_t run() override;
104 : std::size_t run_one() override;
105 : std::size_t wait_one(long usec) override;
106 : std::size_t poll() override;
107 : std::size_t poll_one() override;
108 :
109 : /** Return the maximum file descriptor value supported.
110 :
111 : Returns FD_SETSIZE - 1, the maximum fd value that can be
112 : monitored by select(). Operations with fd >= FD_SETSIZE
113 : will fail with EINVAL.
114 :
115 : @return The maximum supported file descriptor value.
116 : */
117 : static constexpr int max_fd() noexcept
118 : {
119 : return FD_SETSIZE - 1;
120 : }
121 :
122 : /** Register a file descriptor for monitoring.
123 :
124 : @param fd The file descriptor to register.
125 : @param op The operation associated with this fd.
126 : @param events Event mask: 1 = read, 2 = write, 3 = both.
127 : */
128 : void register_fd(int fd, select_op* op, int events) const;
129 :
130 : /** Unregister a file descriptor from monitoring.
131 :
132 : @param fd The file descriptor to unregister.
133 : @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134 : */
135 : void deregister_fd(int fd, int events) const;
136 :
137 : void work_started() noexcept override;
138 : void work_finished() noexcept override;
139 :
140 : // Event flags for register_fd/deregister_fd
141 : static constexpr int event_read = 1;
142 : static constexpr int event_write = 2;
143 :
144 : private:
145 : std::size_t do_one(long timeout_us);
146 : void run_reactor(std::unique_lock<std::mutex>& lock);
147 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 : void interrupt_reactor() const;
149 : long calculate_timeout(long requested_timeout_us) const;
150 :
151 : // Self-pipe for interrupting select()
152 : int pipe_fds_[2]; // [0]=read, [1]=write
153 :
154 : mutable std::mutex mutex_;
155 : mutable std::condition_variable wakeup_event_;
156 : mutable op_queue completed_ops_;
157 : mutable std::atomic<long> outstanding_work_;
158 : std::atomic<bool> stopped_;
159 :
160 : // Per-fd state for tracking registered operations
161 : struct fd_state
162 : {
163 : select_op* read_op = nullptr;
164 : select_op* write_op = nullptr;
165 : };
166 : mutable std::unordered_map<int, fd_state> registered_fds_;
167 : mutable int max_fd_ = -1;
168 :
169 : // Single reactor thread coordination
170 : mutable bool reactor_running_ = false;
171 : mutable bool reactor_interrupted_ = false;
172 : mutable int idle_thread_count_ = 0;
173 :
174 : // Sentinel operation for interleaving reactor runs with handler execution.
175 : // Ensures the reactor runs periodically even when handlers are continuously
176 : // posted, preventing timer starvation.
177 : struct task_op final : scheduler_op
178 : {
179 MIS 0 : void operator()() override {}
180 0 : void destroy() override {}
181 : };
182 : task_op task_op_;
183 : };
184 :
185 : /*
186 : select Scheduler - Single Reactor Model
187 : =======================================
188 :
189 : This scheduler mirrors the epoll_scheduler design but uses select() instead
190 : of epoll for I/O multiplexing. The thread coordination strategy is identical:
191 : one thread becomes the "reactor" while others wait on a condition variable.
192 :
193 : Thread Model
194 : ------------
195 : - ONE thread runs select() at a time (the reactor thread)
196 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197 : - When work is posted, exactly one waiting thread wakes via notify_one()
198 :
199 : Key Differences from epoll
200 : --------------------------
201 : - Uses self-pipe instead of eventfd for interruption (more portable)
202 : - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203 : - FD_SETSIZE limit (~1024 fds on most systems)
204 : - Level-triggered only (no edge-triggered mode)
205 :
206 : Self-Pipe Pattern
207 : -----------------
208 : To interrupt a blocking select() call (e.g., when work is posted or a timer
209 : expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210 : always in the read_fds set, so select() returns immediately. We drain the
211 : pipe to clear the readable state.
212 :
213 : fd-to-op Mapping
214 : ----------------
215 : We use an unordered_map<int, fd_state> to track which operations are
216 : registered for each fd. This allows O(1) lookup when select() returns
217 : ready fds. Each fd can have at most one read op and one write op registered.
218 : */
219 :
220 : namespace select {
221 :
222 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223 : {
224 : select_scheduler const* key;
225 : scheduler_context* next;
226 : };
227 :
228 : inline thread_local_ptr<scheduler_context> context_stack;
229 :
230 : struct thread_context_guard
231 : {
232 : scheduler_context frame_;
233 :
234 HIT 127 : explicit thread_context_guard(select_scheduler const* ctx) noexcept
235 127 : : frame_{ctx, context_stack.get()}
236 : {
237 127 : context_stack.set(&frame_);
238 127 : }
239 :
240 127 : ~thread_context_guard() noexcept
241 : {
242 127 : context_stack.set(frame_.next);
243 127 : }
244 : };
245 :
246 : struct work_guard
247 : {
248 : select_scheduler* self;
249 153835 : ~work_guard()
250 : {
251 153835 : self->work_finished();
252 153835 : }
253 : };
254 :
255 : } // namespace select
256 :
257 139 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258 139 : : pipe_fds_{-1, -1}
259 139 : , outstanding_work_(0)
260 139 : , stopped_(false)
261 139 : , max_fd_(-1)
262 139 : , reactor_running_(false)
263 139 : , reactor_interrupted_(false)
264 278 : , idle_thread_count_(0)
265 : {
266 : // Create self-pipe for interrupting select()
267 139 : if (::pipe(pipe_fds_) < 0)
268 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
269 :
270 : // Set both ends to non-blocking and close-on-exec
271 HIT 417 : for (int i = 0; i < 2; ++i)
272 : {
273 278 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274 278 : if (flags == -1)
275 : {
276 MIS 0 : int errn = errno;
277 0 : ::close(pipe_fds_[0]);
278 0 : ::close(pipe_fds_[1]);
279 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280 : }
281 HIT 278 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282 : {
283 MIS 0 : int errn = errno;
284 0 : ::close(pipe_fds_[0]);
285 0 : ::close(pipe_fds_[1]);
286 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287 : }
288 HIT 278 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289 : {
290 MIS 0 : int errn = errno;
291 0 : ::close(pipe_fds_[0]);
292 0 : ::close(pipe_fds_[1]);
293 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294 : }
295 : }
296 :
297 HIT 139 : timer_svc_ = &get_timer_service(ctx, *this);
298 139 : timer_svc_->set_on_earliest_changed(
299 3071 : timer_service::callback(this, [](void* p) {
300 2932 : static_cast<select_scheduler*>(p)->interrupt_reactor();
301 2932 : }));
302 :
303 : // Initialize resolver service
304 139 : get_resolver_service(ctx, *this);
305 :
306 : // Initialize signal service
307 139 : get_signal_service(ctx, *this);
308 :
309 : // Push task sentinel to interleave reactor runs with handler execution
310 139 : completed_ops_.push(&task_op_);
311 139 : }
312 :
313 278 : inline select_scheduler::~select_scheduler()
314 : {
315 139 : if (pipe_fds_[0] >= 0)
316 139 : ::close(pipe_fds_[0]);
317 139 : if (pipe_fds_[1] >= 0)
318 139 : ::close(pipe_fds_[1]);
319 278 : }
320 :
321 : inline void
322 139 : select_scheduler::shutdown()
323 : {
324 : {
325 139 : std::unique_lock lock(mutex_);
326 :
327 285 : while (auto* h = completed_ops_.pop())
328 : {
329 146 : if (h == &task_op_)
330 139 : continue;
331 7 : lock.unlock();
332 7 : h->destroy();
333 7 : lock.lock();
334 146 : }
335 139 : }
336 :
337 139 : if (pipe_fds_[1] >= 0)
338 139 : interrupt_reactor();
339 :
340 139 : wakeup_event_.notify_all();
341 139 : }
342 :
343 : inline void
344 3280 : select_scheduler::post(std::coroutine_handle<> h) const
345 : {
346 : struct post_handler final : scheduler_op
347 : {
348 : std::coroutine_handle<> h_;
349 :
350 3280 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
351 :
352 6560 : ~post_handler() override = default;
353 :
354 3277 : void operator()() override
355 : {
356 3277 : auto h = h_;
357 3277 : delete this;
358 3277 : h.resume();
359 3277 : }
360 :
361 3 : void destroy() override
362 : {
363 3 : auto h = h_;
364 3 : delete this;
365 3 : h.destroy();
366 3 : }
367 : };
368 :
369 3280 : auto ph = std::make_unique<post_handler>(h);
370 3280 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
371 :
372 3280 : std::unique_lock lock(mutex_);
373 3280 : completed_ops_.push(ph.release());
374 3280 : wake_one_thread_and_unlock(lock);
375 3280 : }
376 :
377 : inline void
378 145006 : select_scheduler::post(scheduler_op* h) const
379 : {
380 145006 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
381 :
382 145006 : std::unique_lock lock(mutex_);
383 145006 : completed_ops_.push(h);
384 145006 : wake_one_thread_and_unlock(lock);
385 145006 : }
386 :
387 : inline bool
388 555 : select_scheduler::running_in_this_thread() const noexcept
389 : {
390 555 : for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
391 358 : if (c->key == this)
392 358 : return true;
393 197 : return false;
394 : }
395 :
396 : inline void
397 105 : select_scheduler::stop()
398 : {
399 105 : bool expected = false;
400 105 : if (stopped_.compare_exchange_strong(
401 : expected, true, std::memory_order_release,
402 : std::memory_order_relaxed))
403 : {
404 : // Wake all threads so they notice stopped_ and exit
405 : {
406 105 : std::lock_guard lock(mutex_);
407 105 : wakeup_event_.notify_all();
408 105 : }
409 105 : interrupt_reactor();
410 : }
411 105 : }
412 :
413 : inline bool
414 3 : select_scheduler::stopped() const noexcept
415 : {
416 3 : return stopped_.load(std::memory_order_acquire);
417 : }
418 :
419 : inline void
420 37 : select_scheduler::restart()
421 : {
422 37 : stopped_.store(false, std::memory_order_release);
423 37 : }
424 :
425 : inline std::size_t
426 101 : select_scheduler::run()
427 : {
428 101 : if (stopped_.load(std::memory_order_acquire))
429 MIS 0 : return 0;
430 :
431 HIT 202 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
432 : {
433 MIS 0 : stop();
434 0 : return 0;
435 : }
436 :
437 HIT 101 : select::thread_context_guard ctx(this);
438 :
439 101 : std::size_t n = 0;
440 153910 : while (do_one(-1))
441 153809 : if (n != (std::numeric_limits<std::size_t>::max)())
442 153809 : ++n;
443 101 : return n;
444 101 : }
445 :
446 : inline std::size_t
447 MIS 0 : select_scheduler::run_one()
448 : {
449 0 : if (stopped_.load(std::memory_order_acquire))
450 0 : return 0;
451 :
452 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
453 : {
454 0 : stop();
455 0 : return 0;
456 : }
457 :
458 0 : select::thread_context_guard ctx(this);
459 0 : return do_one(-1);
460 0 : }
461 :
462 : inline std::size_t
463 HIT 27 : select_scheduler::wait_one(long usec)
464 : {
465 27 : if (stopped_.load(std::memory_order_acquire))
466 3 : return 0;
467 :
468 48 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
469 : {
470 MIS 0 : stop();
471 0 : return 0;
472 : }
473 :
474 HIT 24 : select::thread_context_guard ctx(this);
475 24 : return do_one(usec);
476 24 : }
477 :
478 : inline std::size_t
479 2 : select_scheduler::poll()
480 : {
481 2 : if (stopped_.load(std::memory_order_acquire))
482 MIS 0 : return 0;
483 :
484 HIT 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
485 : {
486 MIS 0 : stop();
487 0 : return 0;
488 : }
489 :
490 HIT 2 : select::thread_context_guard ctx(this);
491 :
492 2 : std::size_t n = 0;
493 4 : while (do_one(0))
494 2 : if (n != (std::numeric_limits<std::size_t>::max)())
495 2 : ++n;
496 2 : return n;
497 2 : }
498 :
499 : inline std::size_t
500 MIS 0 : select_scheduler::poll_one()
501 : {
502 0 : if (stopped_.load(std::memory_order_acquire))
503 0 : return 0;
504 :
505 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
506 : {
507 0 : stop();
508 0 : return 0;
509 : }
510 :
511 0 : select::thread_context_guard ctx(this);
512 0 : return do_one(0);
513 0 : }
514 :
515 : inline void
516 HIT 5714 : select_scheduler::register_fd(int fd, select_op* op, int events) const
517 : {
518 : // Validate fd is within select() limits
519 5714 : if (fd < 0 || fd >= FD_SETSIZE)
520 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
521 :
522 : {
523 HIT 5714 : std::lock_guard lock(mutex_);
524 :
525 5714 : auto& state = registered_fds_[fd];
526 5714 : if (events & event_read)
527 2994 : state.read_op = op;
528 5714 : if (events & event_write)
529 2720 : state.write_op = op;
530 :
531 5714 : if (fd > max_fd_)
532 228 : max_fd_ = fd;
533 5714 : }
534 :
535 : // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
536 : // with the newly registered fd.
537 5714 : interrupt_reactor();
538 5714 : }
539 :
540 : inline void
541 5651 : select_scheduler::deregister_fd(int fd, int events) const
542 : {
543 5651 : std::lock_guard lock(mutex_);
544 :
545 5651 : auto it = registered_fds_.find(fd);
546 5651 : if (it == registered_fds_.end())
547 5493 : return;
548 :
549 158 : if (events & event_read)
550 158 : it->second.read_op = nullptr;
551 158 : if (events & event_write)
552 MIS 0 : it->second.write_op = nullptr;
553 :
554 : // Remove entry if both are null
555 HIT 158 : if (!it->second.read_op && !it->second.write_op)
556 : {
557 158 : registered_fds_.erase(it);
558 :
559 : // Recalculate max_fd_ if needed
560 158 : if (fd == max_fd_)
561 : {
562 157 : max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
563 157 : for (auto& [registered_fd, state] : registered_fds_)
564 : {
565 MIS 0 : if (registered_fd > max_fd_)
566 0 : max_fd_ = registered_fd;
567 : }
568 : }
569 : }
570 HIT 5651 : }
571 :
572 : inline void
573 9225 : select_scheduler::work_started() noexcept
574 : {
575 9225 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
576 9225 : }
577 :
578 : inline void
579 157504 : select_scheduler::work_finished() noexcept
580 : {
581 315008 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
582 104 : stop();
583 157504 : }
584 :
585 : inline void
586 11809 : select_scheduler::interrupt_reactor() const
587 : {
588 11809 : char byte = 1;
589 11809 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590 11809 : }
591 :
592 : inline void
593 148286 : select_scheduler::wake_one_thread_and_unlock(
594 : std::unique_lock<std::mutex>& lock) const
595 : {
596 148286 : if (idle_thread_count_ > 0)
597 : {
598 : // Idle worker exists - wake it via condvar
599 MIS 0 : wakeup_event_.notify_one();
600 0 : lock.unlock();
601 : }
602 HIT 148286 : else if (reactor_running_ && !reactor_interrupted_)
603 : {
604 : // No idle workers but reactor is running - interrupt it
605 2919 : reactor_interrupted_ = true;
606 2919 : lock.unlock();
607 2919 : interrupt_reactor();
608 : }
609 : else
610 : {
611 : // No one to wake
612 145367 : lock.unlock();
613 : }
614 148286 : }
615 :
616 : inline long
617 8330 : select_scheduler::calculate_timeout(long requested_timeout_us) const
618 : {
619 8330 : if (requested_timeout_us == 0)
620 MIS 0 : return 0;
621 :
622 HIT 8330 : auto nearest = timer_svc_->nearest_expiry();
623 8330 : if (nearest == timer_service::time_point::max())
624 37 : return requested_timeout_us;
625 :
626 8293 : auto now = std::chrono::steady_clock::now();
627 8293 : if (nearest <= now)
628 165 : return 0;
629 :
630 : auto timer_timeout_us =
631 8128 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632 8128 : .count();
633 :
634 : // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
635 8128 : constexpr auto long_max =
636 : static_cast<long long>((std::numeric_limits<long>::max)());
637 : auto capped_timer_us =
638 8128 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639 8128 : static_cast<long long>(0)),
640 8128 : long_max);
641 :
642 8128 : if (requested_timeout_us < 0)
643 8128 : return static_cast<long>(capped_timer_us);
644 :
645 : // requested_timeout_us is already long, so min() result fits in long
646 : return static_cast<long>(
647 MIS 0 : (std::min)(static_cast<long long>(requested_timeout_us),
648 0 : capped_timer_us));
649 : }
650 :
651 : inline void
652 HIT 82782 : select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
653 : {
654 : // Calculate timeout considering timers, use 0 if interrupted
655 : long effective_timeout_us =
656 82782 : reactor_interrupted_ ? 0 : calculate_timeout(-1);
657 :
658 : // Build fd_sets from registered_fds_
659 : fd_set read_fds, write_fds, except_fds;
660 1407294 : FD_ZERO(&read_fds);
661 1407294 : FD_ZERO(&write_fds);
662 1407294 : FD_ZERO(&except_fds);
663 :
664 : // Always include the interrupt pipe
665 82782 : FD_SET(pipe_fds_[0], &read_fds);
666 82782 : int nfds = pipe_fds_[0];
667 :
668 : // Add registered fds
669 96285 : for (auto& [fd, state] : registered_fds_)
670 : {
671 13503 : if (state.read_op)
672 10783 : FD_SET(fd, &read_fds);
673 13503 : if (state.write_op)
674 : {
675 2720 : FD_SET(fd, &write_fds);
676 : // Also monitor for errors on connect operations
677 2720 : FD_SET(fd, &except_fds);
678 : }
679 13503 : if (fd > nfds)
680 10787 : nfds = fd;
681 : }
682 :
683 : // Convert timeout to timeval
684 : struct timeval tv;
685 82782 : struct timeval* tv_ptr = nullptr;
686 82782 : if (effective_timeout_us >= 0)
687 : {
688 82745 : tv.tv_sec = effective_timeout_us / 1000000;
689 82745 : tv.tv_usec = effective_timeout_us % 1000000;
690 82745 : tv_ptr = &tv;
691 : }
692 :
693 82782 : lock.unlock();
694 :
695 82782 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696 82782 : int saved_errno = errno;
697 :
698 : // Process timers outside the lock
699 82782 : timer_svc_->process_expired();
700 :
701 82782 : if (ready < 0 && saved_errno != EINTR)
702 MIS 0 : detail::throw_system_error(make_err(saved_errno), "select");
703 :
704 : // Re-acquire lock before modifying completed_ops_
705 HIT 82782 : lock.lock();
706 :
707 : // Drain the interrupt pipe if readable
708 82782 : if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
709 : {
710 : char buf[256];
711 17372 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
712 : {
713 : }
714 : }
715 :
716 : // Process I/O completions
717 82782 : int completions_queued = 0;
718 82782 : if (ready > 0)
719 : {
720 : // Iterate over registered fds (copy keys to avoid iterator invalidation)
721 8686 : std::vector<int> fds_to_check;
722 8686 : fds_to_check.reserve(registered_fds_.size());
723 19506 : for (auto& [fd, state] : registered_fds_)
724 10820 : fds_to_check.push_back(fd);
725 :
726 19506 : for (int fd : fds_to_check)
727 : {
728 10820 : auto it = registered_fds_.find(fd);
729 10820 : if (it == registered_fds_.end())
730 MIS 0 : continue;
731 :
732 HIT 10820 : auto& state = it->second;
733 :
734 : // Check for errors (especially for connect operations)
735 10820 : bool has_error = FD_ISSET(fd, &except_fds);
736 :
737 : // Process read readiness
738 10820 : if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
739 : {
740 2836 : auto* op = state.read_op;
741 : // Claim the op by exchanging to unregistered. Both registering and
742 : // registered states mean the op is ours to complete.
743 2836 : auto prev = op->registered.exchange(
744 : select_registration_state::unregistered,
745 : std::memory_order_acq_rel);
746 2836 : if (prev != select_registration_state::unregistered)
747 : {
748 2836 : state.read_op = nullptr;
749 :
750 2836 : if (has_error)
751 : {
752 MIS 0 : int errn = 0;
753 0 : socklen_t len = sizeof(errn);
754 0 : if (::getsockopt(
755 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
756 0 : errn = errno;
757 0 : if (errn == 0)
758 0 : errn = EIO;
759 0 : op->complete(errn, 0);
760 : }
761 : else
762 : {
763 HIT 2836 : op->perform_io();
764 : }
765 :
766 2836 : completed_ops_.push(op);
767 2836 : ++completions_queued;
768 : }
769 : }
770 :
771 : // Process write readiness
772 10820 : if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
773 : {
774 2720 : auto* op = state.write_op;
775 : // Claim the op by exchanging to unregistered. Both registering and
776 : // registered states mean the op is ours to complete.
777 2720 : auto prev = op->registered.exchange(
778 : select_registration_state::unregistered,
779 : std::memory_order_acq_rel);
780 2720 : if (prev != select_registration_state::unregistered)
781 : {
782 2720 : state.write_op = nullptr;
783 :
784 2720 : if (has_error)
785 : {
786 MIS 0 : int errn = 0;
787 0 : socklen_t len = sizeof(errn);
788 0 : if (::getsockopt(
789 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
790 0 : errn = errno;
791 0 : if (errn == 0)
792 0 : errn = EIO;
793 0 : op->complete(errn, 0);
794 : }
795 : else
796 : {
797 HIT 2720 : op->perform_io();
798 : }
799 :
800 2720 : completed_ops_.push(op);
801 2720 : ++completions_queued;
802 : }
803 : }
804 :
805 : // Clean up empty entries
806 10820 : if (!state.read_op && !state.write_op)
807 5556 : registered_fds_.erase(it);
808 : }
809 8686 : }
810 :
811 82782 : if (completions_queued > 0)
812 : {
813 2840 : if (completions_queued == 1)
814 124 : wakeup_event_.notify_one();
815 : else
816 2716 : wakeup_event_.notify_all();
817 : }
818 82782 : }
819 :
820 : inline std::size_t
821 153938 : select_scheduler::do_one(long timeout_us)
822 : {
823 153938 : std::unique_lock lock(mutex_);
824 :
825 : for (;;)
826 : {
827 236720 : if (stopped_.load(std::memory_order_acquire))
828 101 : return 0;
829 :
830 236619 : scheduler_op* op = completed_ops_.pop();
831 :
832 236619 : if (op == &task_op_)
833 : {
834 82784 : bool more_handlers = !completed_ops_.empty();
835 :
836 82784 : if (!more_handlers)
837 : {
838 16664 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
839 : {
840 MIS 0 : completed_ops_.push(&task_op_);
841 0 : return 0;
842 : }
843 HIT 8332 : if (timeout_us == 0)
844 : {
845 2 : completed_ops_.push(&task_op_);
846 2 : return 0;
847 : }
848 : }
849 :
850 82782 : reactor_interrupted_ = more_handlers || timeout_us == 0;
851 82782 : reactor_running_ = true;
852 :
853 82782 : if (more_handlers && idle_thread_count_ > 0)
854 MIS 0 : wakeup_event_.notify_one();
855 :
856 HIT 82782 : run_reactor(lock);
857 :
858 82782 : reactor_running_ = false;
859 82782 : completed_ops_.push(&task_op_);
860 82782 : continue;
861 82782 : }
862 :
863 153835 : if (op != nullptr)
864 : {
865 153835 : lock.unlock();
866 153835 : select::work_guard g{this};
867 153835 : (*op)();
868 153835 : return 1;
869 153835 : }
870 :
871 MIS 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
872 0 : return 0;
873 :
874 0 : if (timeout_us == 0)
875 0 : return 0;
876 :
877 0 : ++idle_thread_count_;
878 0 : if (timeout_us < 0)
879 0 : wakeup_event_.wait(lock);
880 : else
881 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
882 0 : --idle_thread_count_;
883 HIT 82782 : }
884 153938 : }
885 :
886 : } // namespace boost::corosio::detail
887 :
888 : #endif // BOOST_COROSIO_HAS_SELECT
889 :
890 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|