GCC Code Coverage Report


Directory: ./
File: libs/capy/src/ex/detail/strand_service.cpp
Date: 2026-01-18 18:26:31
Exec Total Coverage
Lines: 87 91 95.6%
Functions: 21 22 95.5%
Branches: 27 31 87.1%

Line Branch Exec Source
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <boost/capy/ex/any_coro.hpp>
13
14 #include <atomic>
15 #include <coroutine>
16 #include <mutex>
17 #include <thread>
18 #include <utility>
19
20 namespace boost {
21 namespace capy {
22 namespace detail {
23
24 //----------------------------------------------------------
25
26 /** Implementation state for a strand.
27
28 Each strand_impl provides serialization for coroutines
29 dispatched through strands that share it.
30 */
31 struct strand_impl
32 {
33 std::mutex mutex_;
34 strand_queue pending_;
35 bool locked_ = false;
36 std::atomic<std::thread::id> dispatch_thread_{};
37 void* cached_frame_ = nullptr;
38 };
39
40 //----------------------------------------------------------
41
42 /** Invoker coroutine for strand dispatch.
43
44 Uses custom allocator to recycle frame - one allocation
45 per strand_impl lifetime, stored in trailer for recovery.
46 */
47 struct strand_invoker
48 {
49 struct promise_type
50 {
51 24 void* operator new(std::size_t n, strand_impl& impl)
52 {
53 24 constexpr auto A = alignof(strand_impl*);
54 24 std::size_t padded = (n + A - 1) & ~(A - 1);
55 24 std::size_t total = padded + sizeof(strand_impl*);
56
57 24 void* p = impl.cached_frame_
58
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 9 times.
24 ? std::exchange(impl.cached_frame_, nullptr)
59
1/1
✓ Branch 1 taken 9 times.
9 : ::operator new(total);
60
61 // Trailer lets delete recover impl
62 24 *reinterpret_cast<strand_impl**>(
63 24 static_cast<char*>(p) + padded) = &impl;
64 24 return p;
65 }
66
67 24 void operator delete(void* p, std::size_t n) noexcept
68 {
69 24 constexpr auto A = alignof(strand_impl*);
70 24 std::size_t padded = (n + A - 1) & ~(A - 1);
71
72 24 auto* impl = *reinterpret_cast<strand_impl**>(
73 static_cast<char*>(p) + padded);
74
75
1/2
✓ Branch 0 taken 24 times.
✗ Branch 1 not taken.
24 if (!impl->cached_frame_)
76 24 impl->cached_frame_ = p;
77 else
78 ::operator delete(p);
79 24 }
80
81 24 strand_invoker get_return_object() noexcept
82 24 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
83
84 24 std::suspend_always initial_suspend() noexcept { return {}; }
85 24 std::suspend_never final_suspend() noexcept { return {}; }
86 24 void return_void() noexcept {}
87 void unhandled_exception() { std::terminate(); }
88 };
89
90 std::coroutine_handle<promise_type> h_;
91 };
92
93 //----------------------------------------------------------
94
95 /** Concrete implementation of strand_service.
96
97 Holds the fixed pool of strand_impl objects.
98 */
99 class strand_service_impl : public strand_service
100 {
101 static constexpr std::size_t num_impls = 211;
102
103 strand_impl impls_[num_impls];
104 std::size_t salt_ = 0;
105 std::mutex mutex_;
106
107 public:
108 explicit
109 18 strand_service_impl(execution_context&)
110 3816 {
111 18 }
112
113 strand_impl*
114 23 get_implementation() override
115 {
116
1/1
✓ Branch 1 taken 23 times.
23 std::lock_guard<std::mutex> lock(mutex_);
117 23 std::size_t index = salt_++;
118 23 index = index % num_impls;
119 23 return &impls_[index];
120 23 }
121
122 protected:
123 void
124 18 shutdown() override
125 {
126
2/2
✓ Branch 0 taken 3798 times.
✓ Branch 1 taken 18 times.
3816 for(std::size_t i = 0; i < num_impls; ++i)
127 {
128
1/1
✓ Branch 1 taken 3798 times.
3798 std::lock_guard<std::mutex> lock(impls_[i].mutex_);
129 3798 impls_[i].locked_ = true;
130
131
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 3789 times.
3798 if(impls_[i].cached_frame_)
132 {
133 9 ::operator delete(impls_[i].cached_frame_);
134 9 impls_[i].cached_frame_ = nullptr;
135 }
136 3798 }
137 18 }
138
139 private:
140 static bool
141 265 enqueue(strand_impl& impl, any_coro h)
142 {
143
1/1
✓ Branch 1 taken 265 times.
265 std::lock_guard<std::mutex> lock(impl.mutex_);
144
1/1
✓ Branch 1 taken 265 times.
265 impl.pending_.push(h);
145
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 241 times.
265 if(!impl.locked_)
146 {
147 24 impl.locked_ = true;
148 24 return true;
149 }
150 241 return false;
151 265 }
152
153 static void
154 24 dispatch_pending(strand_impl& impl)
155 {
156 24 strand_queue::taken_batch batch;
157 {
158
1/1
✓ Branch 1 taken 24 times.
24 std::lock_guard<std::mutex> lock(impl.mutex_);
159 24 batch = impl.pending_.take_all();
160 24 }
161
1/1
✓ Branch 1 taken 24 times.
24 impl.pending_.dispatch_batch(batch);
162 24 }
163
164 static bool
165 24 try_unlock(strand_impl& impl)
166 {
167
1/1
✓ Branch 1 taken 24 times.
24 std::lock_guard<std::mutex> lock(impl.mutex_);
168
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 if(impl.pending_.empty())
169 {
170 24 impl.locked_ = false;
171 24 return true;
172 }
173 return false;
174 24 }
175
176 static void
177 24 set_dispatch_thread(strand_impl& impl) noexcept
178 {
179 24 impl.dispatch_thread_.store(std::this_thread::get_id());
180 24 }
181
182 static void
183 24 clear_dispatch_thread(strand_impl& impl) noexcept
184 {
185 24 impl.dispatch_thread_.store(std::thread::id{});
186 24 }
187
188 // Loops until queue empty (aggressive). Alternative: per-batch fairness
189 // (repost after each batch to let other work run) - explore if starvation observed.
190 static strand_invoker
191
1/1
✓ Branch 1 taken 24 times.
24 make_invoker(strand_impl& impl)
192 {
193 strand_impl* p = &impl;
194 for(;;)
195 {
196 set_dispatch_thread(*p);
197 dispatch_pending(*p);
198 if(try_unlock(*p))
199 {
200 clear_dispatch_thread(*p);
201 co_return;
202 }
203 }
204 48 }
205
206 friend class strand_service;
207 };
208
209 //----------------------------------------------------------
210
211 18 strand_service::
212 18 strand_service()
213 18 : service()
214 {
215 18 }
216
217 36 strand_service::
218 ~strand_service() = default;
219
220 bool
221 4 strand_service::
222 running_in_this_thread(strand_impl& impl) noexcept
223 {
224 4 return impl.dispatch_thread_.load() == std::this_thread::get_id();
225 }
226
227 any_coro
228 3 strand_service::
229 dispatch(strand_impl& impl, any_executor_ref ex, any_coro h)
230 {
231
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if(running_in_this_thread(impl))
232 return h;
233
234
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if(strand_service_impl::enqueue(impl, h))
235
2/2
✓ Branch 1 taken 3 times.
✓ Branch 5 taken 3 times.
3 ex.post(strand_service_impl::make_invoker(impl).h_);
236
237 3 return std::noop_coroutine();
238 }
239
240 void
241 262 strand_service::
242 post(strand_impl& impl, any_executor_ref ex, any_coro h)
243 {
244
2/2
✓ Branch 1 taken 21 times.
✓ Branch 2 taken 241 times.
262 if(strand_service_impl::enqueue(impl, h))
245
2/2
✓ Branch 1 taken 21 times.
✓ Branch 5 taken 21 times.
21 ex.post(strand_service_impl::make_invoker(impl).h_);
246 262 }
247
248 strand_service&
249 25 get_strand_service(execution_context& ctx)
250 {
251 25 return ctx.use_service<strand_service_impl>();
252 }
253
254 } // namespace detail
255 } // namespace capy
256 } // namespace boost
257