GCC Code Coverage Report


Directory: ./
File: libs/capy/src/ex/detail/strand_queue.hpp
Date: 2026-01-18 18:26:31
Exec Total Coverage
Lines: 74 76 97.4%
Functions: 14 15 93.3%
Branches: 19 19 100.0%

Line Branch Exec Source
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11 #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14
15 #include <coroutine>
16 #include <cstddef>
17 #include <exception>
18
19 namespace boost {
20 namespace capy {
21 namespace detail {
22
23 class strand_queue;
24
25 //----------------------------------------------------------
26
27 // Metadata stored before the coroutine frame
28 struct frame_prefix
29 {
30 frame_prefix* next;
31 strand_queue* queue;
32 std::size_t alloc_size;
33 };
34
35 //----------------------------------------------------------
36
37 /** Wrapper coroutine for strand queue dispatch operations.
38
39 This coroutine wraps a target coroutine handle and resumes
40 it when dispatched. The wrapper ensures control returns to
41 the dispatch loop after the target suspends or completes.
42
43 The promise contains an intrusive list node for queue
44 storage and supports a custom allocator that recycles
45 coroutine frames via a free list.
46 */
47 struct strand_op
48 {
49 struct promise_type
50 {
51 promise_type* next = nullptr;
52
53 void*
54 operator new(
55 std::size_t size,
56 strand_queue& q,
57 std::coroutine_handle<void>);
58
59 void
60 operator delete(void* p, std::size_t);
61
62 strand_op
63 377 get_return_object() noexcept
64 {
65 377 return {std::coroutine_handle<promise_type>::from_promise(*this)};
66 }
67
68 std::suspend_always
69 377 initial_suspend() noexcept
70 {
71 377 return {};
72 }
73
74 std::suspend_always
75 375 final_suspend() noexcept
76 {
77 375 return {};
78 }
79
80 void
81 375 return_void() noexcept
82 {
83 375 }
84
85 void
86 unhandled_exception()
87 {
88 std::terminate();
89 }
90 };
91
92 std::coroutine_handle<promise_type> h_;
93 };
94
95 //----------------------------------------------------------
96
97 /** Single-threaded dispatch queue for coroutine handles.
98
99 This queue stores coroutine handles and resumes them
100 sequentially when dispatch() is called. Each pushed
101 handle is wrapped in a strand_op coroutine that ensures
102 control returns to the dispatch loop after the target
103 suspends or completes.
104
105 The queue uses an intrusive singly-linked list through
106 the promise type to avoid separate node allocations.
107 A free list recycles wrapper coroutine frames to reduce
108 allocation overhead during repeated push/dispatch cycles.
109
110 @par Thread Safety
111 This class is not thread-safe. All operations must be
112 called from a single thread.
113 */
114 class strand_queue
115 {
116 using promise_type = strand_op::promise_type;
117
118 promise_type* head_ = nullptr;
119 promise_type* tail_ = nullptr;
120 frame_prefix* free_list_ = nullptr;
121
122 friend struct strand_op::promise_type;
123
124 static
125 strand_op
126
1/1
✓ Branch 1 taken 377 times.
377 make_strand_op(
127 strand_queue& q,
128 std::coroutine_handle<void> target)
129 {
130 (void)q;
131 target.resume();
132 co_return;
133 754 }
134
135 public:
136 3798 strand_queue() = default;
137
138 strand_queue(strand_queue const&) = delete;
139 strand_queue& operator=(strand_queue const&) = delete;
140
141 /** Destructor.
142
143 Destroys any pending wrappers without resuming them,
144 then frees all memory in the free list.
145 */
146 3806 ~strand_queue()
147 {
148 // Destroy pending wrappers
149
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3806 times.
3808 while(head_)
150 {
151 2 promise_type* p = head_;
152 2 head_ = p->next;
153
154 2 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155 2 h.destroy();
156 }
157
158 // Free the free list memory
159
2/2
✓ Branch 0 taken 110 times.
✓ Branch 1 taken 3806 times.
3916 while(free_list_)
160 {
161 110 frame_prefix* prefix = free_list_;
162 110 free_list_ = prefix->next;
163 110 ::operator delete(prefix);
164 }
165 3806 }
166
167 /** Returns true if there are no pending operations.
168 */
169 bool
170 34 empty() const noexcept
171 {
172 34 return head_ == nullptr;
173 }
174
175 /** Push a coroutine handle to the queue.
176
177 Creates a wrapper coroutine and appends it to the
178 queue. The wrapper will resume the target handle
179 when dispatch() processes it.
180
181 @param h The coroutine handle to dispatch.
182 */
183 void
184 377 push(std::coroutine_handle<void> h)
185 {
186
1/1
✓ Branch 1 taken 377 times.
377 strand_op op = make_strand_op(*this, h);
187
188 377 promise_type* p = &op.h_.promise();
189 377 p->next = nullptr;
190
191
2/2
✓ Branch 0 taken 345 times.
✓ Branch 1 taken 32 times.
377 if(tail_)
192 345 tail_->next = p;
193 else
194 32 head_ = p;
195 377 tail_ = p;
196 377 }
197
198 /** Resume all queued coroutines in sequence.
199
200 Processes each wrapper in FIFO order, resuming its
201 target coroutine. After each target suspends or
202 completes, the wrapper is destroyed and its frame
203 is added to the free list for reuse.
204
205 Coroutines resumed during dispatch may push new
206 handles, which will also be processed in the same
207 dispatch call.
208
209 @warning Not thread-safe. Do not call while another
210 thread may be calling push().
211 */
212 void
213 7 dispatch()
214 {
215
2/2
✓ Branch 0 taken 110 times.
✓ Branch 1 taken 7 times.
117 while(head_)
216 {
217 110 promise_type* p = head_;
218 110 head_ = p->next;
219
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 103 times.
110 if(!head_)
220 7 tail_ = nullptr;
221
222 110 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223
1/1
✓ Branch 1 taken 110 times.
110 h.resume();
224
1/1
✓ Branch 1 taken 110 times.
110 h.destroy();
225 }
226 7 }
227
228 /** Batch of taken items for thread-safe dispatch. */
229 struct taken_batch
230 {
231 promise_type* head = nullptr;
232 promise_type* tail = nullptr;
233 };
234
235 /** Take all pending items atomically.
236
237 Removes all items from the queue and returns them
238 as a batch. The queue is left empty.
239
240 @return The batch of taken items.
241 */
242 taken_batch
243 24 take_all() noexcept
244 {
245 24 taken_batch batch{head_, tail_};
246 24 head_ = tail_ = nullptr;
247 24 return batch;
248 }
249
250 /** Dispatch a batch of taken items.
251
252 @param batch The batch to dispatch.
253
254 @note This is thread-safe w.r.t. push() because it doesn't
255 access the queue's free_list_. Frames are deleted directly
256 rather than recycled.
257 */
258 static
259 void
260 24 dispatch_batch(taken_batch& batch)
261 {
262
2/2
✓ Branch 0 taken 265 times.
✓ Branch 1 taken 24 times.
289 while(batch.head)
263 {
264 265 promise_type* p = batch.head;
265 265 batch.head = p->next;
266
267 265 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268
1/1
✓ Branch 1 taken 265 times.
265 h.resume();
269 // Don't use h.destroy() - it would call operator delete which
270 // accesses the queue's free_list_ (race with push).
271 // Instead, manually free the frame without recycling.
272 // h.address() returns the frame base (what operator new returned).
273 265 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274 265 ::operator delete(prefix);
275 }
276 24 batch.tail = nullptr;
277 24 }
278 };
279
280 //----------------------------------------------------------
281
282 inline
283 void*
284 377 strand_op::promise_type::operator new(
285 std::size_t size,
286 strand_queue& q,
287 std::coroutine_handle<void>)
288 {
289 // Total size includes prefix
290 377 std::size_t alloc_size = size + sizeof(frame_prefix);
291 void* raw;
292
293 // Try to reuse from free list
294
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 375 times.
377 if(q.free_list_)
295 {
296 2 frame_prefix* prefix = q.free_list_;
297 2 q.free_list_ = prefix->next;
298 2 raw = prefix;
299 }
300 else
301 {
302 375 raw = ::operator new(alloc_size);
303 }
304
305 // Initialize prefix
306 377 frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307 377 prefix->next = nullptr;
308 377 prefix->queue = &q;
309 377 prefix->alloc_size = alloc_size;
310
311 // Return pointer AFTER the prefix (this is where coroutine frame goes)
312 377 return prefix + 1;
313 }
314
315 inline
316 void
317 112 strand_op::promise_type::operator delete(void* p, std::size_t)
318 {
319 // Calculate back to get the prefix
320 112 frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321
322 // Add to free list
323 112 prefix->next = prefix->queue->free_list_;
324 112 prefix->queue->free_list_ = prefix;
325 112 }
326
327 } // namespace detail
328 } // namespace capy
329 } // namespace boost
330
331 #endif
332