LCOV - code coverage report
Current view: top level - libs/capy/src/ex/detail - strand_queue.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 97.4 % 76 74
Test Date: 2026-01-18 18:26:31 Functions: 93.3 % 15 14

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      11              : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : 
      15              : #include <coroutine>
      16              : #include <cstddef>
      17              : #include <exception>
      18              : 
      19              : namespace boost {
      20              : namespace capy {
      21              : namespace detail {
      22              : 
      23              : class strand_queue;
      24              : 
      25              : //----------------------------------------------------------
      26              : 
      27              : // Metadata stored before the coroutine frame
      28              : struct frame_prefix
      29              : {
      30              :     frame_prefix* next;
      31              :     strand_queue* queue;
      32              :     std::size_t alloc_size;
      33              : };
      34              : 
      35              : //----------------------------------------------------------
      36              : 
      37              : /** Wrapper coroutine for strand queue dispatch operations.
      38              : 
      39              :     This coroutine wraps a target coroutine handle and resumes
      40              :     it when dispatched. The wrapper ensures control returns to
      41              :     the dispatch loop after the target suspends or completes.
      42              : 
      43              :     The promise contains an intrusive list node for queue
      44              :     storage and supports a custom allocator that recycles
      45              :     coroutine frames via a free list.
      46              : */
      47              : struct strand_op
      48              : {
      49              :     struct promise_type
      50              :     {
      51              :         promise_type* next = nullptr;
      52              : 
      53              :         void*
      54              :         operator new(
      55              :             std::size_t size,
      56              :             strand_queue& q,
      57              :             std::coroutine_handle<void>);
      58              : 
      59              :         void
      60              :         operator delete(void* p, std::size_t);
      61              : 
      62              :         strand_op
      63          377 :         get_return_object() noexcept
      64              :         {
      65          377 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
      66              :         }
      67              : 
      68              :         std::suspend_always
      69          377 :         initial_suspend() noexcept
      70              :         {
      71          377 :             return {};
      72              :         }
      73              : 
      74              :         std::suspend_always
      75          375 :         final_suspend() noexcept
      76              :         {
      77          375 :             return {};
      78              :         }
      79              : 
      80              :         void
      81          375 :         return_void() noexcept
      82              :         {
      83          375 :         }
      84              : 
      85              :         void
      86            0 :         unhandled_exception()
      87              :         {
      88            0 :             std::terminate();
      89              :         }
      90              :     };
      91              : 
      92              :     std::coroutine_handle<promise_type> h_;
      93              : };
      94              : 
      95              : //----------------------------------------------------------
      96              : 
      97              : /** Single-threaded dispatch queue for coroutine handles.
      98              : 
      99              :     This queue stores coroutine handles and resumes them
     100              :     sequentially when dispatch() is called. Each pushed
     101              :     handle is wrapped in a strand_op coroutine that ensures
     102              :     control returns to the dispatch loop after the target
     103              :     suspends or completes.
     104              : 
     105              :     The queue uses an intrusive singly-linked list through
     106              :     the promise type to avoid separate node allocations.
     107              :     A free list recycles wrapper coroutine frames to reduce
     108              :     allocation overhead during repeated push/dispatch cycles.
     109              : 
     110              :     @par Thread Safety
     111              :     This class is not thread-safe. All operations must be
     112              :     called from a single thread.
     113              : */
     114              : class strand_queue
     115              : {
     116              :     using promise_type = strand_op::promise_type;
     117              : 
     118              :     promise_type* head_ = nullptr;
     119              :     promise_type* tail_ = nullptr;
     120              :     frame_prefix* free_list_ = nullptr;
     121              : 
     122              :     friend struct strand_op::promise_type;
     123              : 
     124              :     static
     125              :     strand_op
     126          377 :     make_strand_op(
     127              :         strand_queue& q,
     128              :         std::coroutine_handle<void> target)
     129              :     {
     130              :         (void)q;
     131              :         target.resume();
     132              :         co_return;
     133          754 :     }
     134              : 
     135              : public:
     136         3798 :     strand_queue() = default;
     137              : 
     138              :     strand_queue(strand_queue const&) = delete;
     139              :     strand_queue& operator=(strand_queue const&) = delete;
     140              : 
     141              :     /** Destructor.
     142              : 
     143              :         Destroys any pending wrappers without resuming them,
     144              :         then frees all memory in the free list.
     145              :     */
     146         3806 :     ~strand_queue()
     147              :     {
     148              :         // Destroy pending wrappers
     149         3808 :         while(head_)
     150              :         {
     151            2 :             promise_type* p = head_;
     152            2 :             head_ = p->next;
     153              : 
     154            2 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     155            2 :             h.destroy();
     156              :         }
     157              : 
     158              :         // Free the free list memory
     159         3916 :         while(free_list_)
     160              :         {
     161          110 :             frame_prefix* prefix = free_list_;
     162          110 :             free_list_ = prefix->next;
     163          110 :             ::operator delete(prefix);
     164              :         }
     165         3806 :     }
     166              : 
     167              :     /** Returns true if there are no pending operations.
     168              :     */
     169              :     bool
     170           34 :     empty() const noexcept
     171              :     {
     172           34 :         return head_ == nullptr;
     173              :     }
     174              : 
     175              :     /** Push a coroutine handle to the queue.
     176              : 
     177              :         Creates a wrapper coroutine and appends it to the
     178              :         queue. The wrapper will resume the target handle
     179              :         when dispatch() processes it.
     180              : 
     181              :         @param h The coroutine handle to dispatch.
     182              :     */
     183              :     void
     184          377 :     push(std::coroutine_handle<void> h)
     185              :     {
     186          377 :         strand_op op = make_strand_op(*this, h);
     187              : 
     188          377 :         promise_type* p = &op.h_.promise();
     189          377 :         p->next = nullptr;
     190              : 
     191          377 :         if(tail_)
     192          345 :             tail_->next = p;
     193              :         else
     194           32 :             head_ = p;
     195          377 :         tail_ = p;
     196          377 :     }
     197              : 
     198              :     /** Resume all queued coroutines in sequence.
     199              : 
     200              :         Processes each wrapper in FIFO order, resuming its
     201              :         target coroutine. After each target suspends or
     202              :         completes, the wrapper is destroyed and its frame
     203              :         is added to the free list for reuse.
     204              : 
     205              :         Coroutines resumed during dispatch may push new
     206              :         handles, which will also be processed in the same
     207              :         dispatch call.
     208              : 
     209              :         @warning Not thread-safe. Do not call while another
     210              :             thread may be calling push().
     211              :     */
     212              :     void
     213            7 :     dispatch()
     214              :     {
     215          117 :         while(head_)
     216              :         {
     217          110 :             promise_type* p = head_;
     218          110 :             head_ = p->next;
     219          110 :             if(!head_)
     220            7 :                 tail_ = nullptr;
     221              : 
     222          110 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     223          110 :             h.resume();
     224          110 :             h.destroy();
     225              :         }
     226            7 :     }
     227              : 
     228              :     /** Batch of taken items for thread-safe dispatch. */
     229              :     struct taken_batch
     230              :     {
     231              :         promise_type* head = nullptr;
     232              :         promise_type* tail = nullptr;
     233              :     };
     234              : 
     235              :     /** Take all pending items atomically.
     236              : 
     237              :         Removes all items from the queue and returns them
     238              :         as a batch. The queue is left empty.
     239              : 
     240              :         @return The batch of taken items.
     241              :     */
     242              :     taken_batch
     243           24 :     take_all() noexcept
     244              :     {
     245           24 :         taken_batch batch{head_, tail_};
     246           24 :         head_ = tail_ = nullptr;
     247           24 :         return batch;
     248              :     }
     249              : 
     250              :     /** Dispatch a batch of taken items.
     251              : 
     252              :         @param batch The batch to dispatch.
     253              : 
     254              :         @note This is thread-safe w.r.t. push() because it doesn't
     255              :             access the queue's free_list_. Frames are deleted directly
     256              :             rather than recycled.
     257              :     */
     258              :     static
     259              :     void
     260           24 :     dispatch_batch(taken_batch& batch)
     261              :     {
     262          289 :         while(batch.head)
     263              :         {
     264          265 :             promise_type* p = batch.head;
     265          265 :             batch.head = p->next;
     266              : 
     267          265 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     268          265 :             h.resume();
     269              :             // Don't use h.destroy() - it would call operator delete which
     270              :             // accesses the queue's free_list_ (race with push).
     271              :             // Instead, manually free the frame without recycling.
     272              :             // h.address() returns the frame base (what operator new returned).
     273          265 :             frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
     274          265 :             ::operator delete(prefix);
     275              :         }
     276           24 :         batch.tail = nullptr;
     277           24 :     }
     278              : };
     279              : 
     280              : //----------------------------------------------------------
     281              : 
     282              : inline
     283              : void*
     284          377 : strand_op::promise_type::operator new(
     285              :     std::size_t size,
     286              :     strand_queue& q,
     287              :     std::coroutine_handle<void>)
     288              : {
     289              :     // Total size includes prefix
     290          377 :     std::size_t alloc_size = size + sizeof(frame_prefix);
     291              :     void* raw;
     292              :     
     293              :     // Try to reuse from free list
     294          377 :     if(q.free_list_)
     295              :     {
     296            2 :         frame_prefix* prefix = q.free_list_;
     297            2 :         q.free_list_ = prefix->next;
     298            2 :         raw = prefix;
     299              :     }
     300              :     else
     301              :     {
     302          375 :         raw = ::operator new(alloc_size);
     303              :     }
     304              : 
     305              :     // Initialize prefix
     306          377 :     frame_prefix* prefix = static_cast<frame_prefix*>(raw);
     307          377 :     prefix->next = nullptr;
     308          377 :     prefix->queue = &q;
     309          377 :     prefix->alloc_size = alloc_size;
     310              : 
     311              :     // Return pointer AFTER the prefix (this is where coroutine frame goes)
     312          377 :     return prefix + 1;
     313              : }
     314              : 
     315              : inline
     316              : void
     317          112 : strand_op::promise_type::operator delete(void* p, std::size_t)
     318              : {
     319              :     // Calculate back to get the prefix
     320          112 :     frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
     321              : 
     322              :     // Add to free list
     323          112 :     prefix->next = prefix->queue->free_list_;
     324          112 :     prefix->queue->free_list_ = prefix;
     325          112 : }
     326              : 
     327              : } // namespace detail
     328              : } // namespace capy
     329              : } // namespace boost
     330              : 
     331              : #endif
        

Generated by: LCOV version 2.3