LCOV - code coverage report
Current view: top level - libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 95.6 % 91 87
Test Date: 2026-01-18 18:26:31 Functions: 91.3 % 23 21

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #include "src/ex/detail/strand_queue.hpp"
      11              : #include <boost/capy/ex/detail/strand_service.hpp>
      12              : #include <boost/capy/ex/any_coro.hpp>
      13              : 
      14              : #include <atomic>
      15              : #include <coroutine>
      16              : #include <mutex>
      17              : #include <thread>
      18              : #include <utility>
      19              : 
      20              : namespace boost {
      21              : namespace capy {
      22              : namespace detail {
      23              : 
      24              : //----------------------------------------------------------
      25              : 
      26              : /** Implementation state for a strand.
      27              : 
      28              :     Each strand_impl provides serialization for coroutines
      29              :     dispatched through strands that share it.
      30              : */
      31              : struct strand_impl
      32              : {
      33              :     std::mutex mutex_;
      34              :     strand_queue pending_;
      35              :     bool locked_ = false;
      36              :     std::atomic<std::thread::id> dispatch_thread_{};
      37              :     void* cached_frame_ = nullptr;
      38              : };
      39              : 
      40              : //----------------------------------------------------------
      41              : 
      42              : /** Invoker coroutine for strand dispatch.
      43              : 
      44              :     Uses custom allocator to recycle frame - one allocation
      45              :     per strand_impl lifetime, stored in trailer for recovery.
      46              : */
      47              : struct strand_invoker
      48              : {
      49              :     struct promise_type
      50              :     {
      51           24 :         void* operator new(std::size_t n, strand_impl& impl)
      52              :         {
      53           24 :             constexpr auto A = alignof(strand_impl*);
      54           24 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      55           24 :             std::size_t total = padded + sizeof(strand_impl*);
      56              : 
      57           24 :             void* p = impl.cached_frame_
      58           24 :                 ? std::exchange(impl.cached_frame_, nullptr)
      59            9 :                 : ::operator new(total);
      60              : 
      61              :             // Trailer lets delete recover impl
      62           24 :             *reinterpret_cast<strand_impl**>(
      63           24 :                 static_cast<char*>(p) + padded) = &impl;
      64           24 :             return p;
      65              :         }
      66              : 
      67           24 :         void operator delete(void* p, std::size_t n) noexcept
      68              :         {
      69           24 :             constexpr auto A = alignof(strand_impl*);
      70           24 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      71              : 
      72           24 :             auto* impl = *reinterpret_cast<strand_impl**>(
      73              :                 static_cast<char*>(p) + padded);
      74              : 
      75           24 :             if (!impl->cached_frame_)
      76           24 :                 impl->cached_frame_ = p;
      77              :             else
      78            0 :                 ::operator delete(p);
      79           24 :         }
      80              : 
      81           24 :         strand_invoker get_return_object() noexcept
      82           24 :         { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
      83              : 
      84           24 :         std::suspend_always initial_suspend() noexcept { return {}; }
      85           24 :         std::suspend_never final_suspend() noexcept { return {}; }
      86           24 :         void return_void() noexcept {}
      87            0 :         void unhandled_exception() { std::terminate(); }
      88              :     };
      89              : 
      90              :     std::coroutine_handle<promise_type> h_;
      91              : };
      92              : 
      93              : //----------------------------------------------------------
      94              : 
      95              : /** Concrete implementation of strand_service.
      96              : 
      97              :     Holds the fixed pool of strand_impl objects.
      98              : */
      99              : class strand_service_impl : public strand_service
     100              : {
     101              :     static constexpr std::size_t num_impls = 211;
     102              : 
     103              :     strand_impl impls_[num_impls];
     104              :     std::size_t salt_ = 0;
     105              :     std::mutex mutex_;
     106              : 
     107              : public:
     108              :     explicit
     109           18 :     strand_service_impl(execution_context&)
     110         3816 :     {
     111           18 :     }
     112              : 
     113              :     strand_impl*
     114           23 :     get_implementation() override
     115              :     {
     116           23 :         std::lock_guard<std::mutex> lock(mutex_);
     117           23 :         std::size_t index = salt_++;
     118           23 :         index = index % num_impls;
     119           23 :         return &impls_[index];
     120           23 :     }
     121              : 
     122              : protected:
     123              :     void
     124           18 :     shutdown() override
     125              :     {
     126         3816 :         for(std::size_t i = 0; i < num_impls; ++i)
     127              :         {
     128         3798 :             std::lock_guard<std::mutex> lock(impls_[i].mutex_);
     129         3798 :             impls_[i].locked_ = true;
     130              : 
     131         3798 :             if(impls_[i].cached_frame_)
     132              :             {
     133            9 :                 ::operator delete(impls_[i].cached_frame_);
     134            9 :                 impls_[i].cached_frame_ = nullptr;
     135              :             }
     136         3798 :         }
     137           18 :     }
     138              : 
     139              : private:
     140              :     static bool
     141          265 :     enqueue(strand_impl& impl, any_coro h)
     142              :     {
     143          265 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     144          265 :         impl.pending_.push(h);
     145          265 :         if(!impl.locked_)
     146              :         {
     147           24 :             impl.locked_ = true;
     148           24 :             return true;
     149              :         }
     150          241 :         return false;
     151          265 :     }
     152              : 
     153              :     static void
     154           24 :     dispatch_pending(strand_impl& impl)
     155              :     {
     156           24 :         strand_queue::taken_batch batch;
     157              :         {
     158           24 :             std::lock_guard<std::mutex> lock(impl.mutex_);
     159           24 :             batch = impl.pending_.take_all();
     160           24 :         }
     161           24 :         impl.pending_.dispatch_batch(batch);
     162           24 :     }
     163              : 
     164              :     static bool
     165           24 :     try_unlock(strand_impl& impl)
     166              :     {
     167           24 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     168           24 :         if(impl.pending_.empty())
     169              :         {
     170           24 :             impl.locked_ = false;
     171           24 :             return true;
     172              :         }
     173            0 :         return false;
     174           24 :     }
     175              : 
     176              :     static void
     177           24 :     set_dispatch_thread(strand_impl& impl) noexcept
     178              :     {
     179           24 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     180           24 :     }
     181              : 
     182              :     static void
     183           24 :     clear_dispatch_thread(strand_impl& impl) noexcept
     184              :     {
     185           24 :         impl.dispatch_thread_.store(std::thread::id{});
     186           24 :     }
     187              : 
     188              :     // Loops until queue empty (aggressive). Alternative: per-batch fairness
     189              :     // (repost after each batch to let other work run) - explore if starvation observed.
     190              :     static strand_invoker
     191           24 :     make_invoker(strand_impl& impl)
     192              :     {
     193              :         strand_impl* p = &impl;
     194              :         for(;;)
     195              :         {
     196              :             set_dispatch_thread(*p);
     197              :             dispatch_pending(*p);
     198              :             if(try_unlock(*p))
     199              :             {
     200              :                 clear_dispatch_thread(*p);
     201              :                 co_return;
     202              :             }
     203              :         }
     204           48 :     }
     205              : 
     206              :     friend class strand_service;
     207              : };
     208              : 
     209              : //----------------------------------------------------------
     210              : 
     211           18 : strand_service::
     212           18 : strand_service()
     213           18 :     : service()
     214              : {
     215           18 : }
     216              : 
     217           18 : strand_service::
     218              : ~strand_service() = default;
     219              : 
     220              : bool
     221            4 : strand_service::
     222              : running_in_this_thread(strand_impl& impl) noexcept
     223              : {
     224            4 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     225              : }
     226              : 
     227              : any_coro
     228            3 : strand_service::
     229              : dispatch(strand_impl& impl, any_executor_ref ex, any_coro h)
     230              : {
     231            3 :     if(running_in_this_thread(impl))
     232            0 :         return h;
     233              : 
     234            3 :     if(strand_service_impl::enqueue(impl, h))
     235            3 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     236              : 
     237            3 :     return std::noop_coroutine();
     238              : }
     239              : 
     240              : void
     241          262 : strand_service::
     242              : post(strand_impl& impl, any_executor_ref ex, any_coro h)
     243              : {
     244          262 :     if(strand_service_impl::enqueue(impl, h))
     245           21 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     246          262 : }
     247              : 
     248              : strand_service&
     249           25 : get_strand_service(execution_context& ctx)
     250              : {
     251           25 :     return ctx.use_service<strand_service_impl>();
     252              : }
     253              : 
     254              : } // namespace detail
     255              : } // namespace capy
     256              : } // namespace boost
        

Generated by: LCOV version 2.3