Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11 : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 :
15 : #include <coroutine>
16 : #include <cstddef>
17 : #include <exception>
18 :
19 : namespace boost {
20 : namespace capy {
21 : namespace detail {
22 :
23 : class strand_queue;
24 :
25 : //----------------------------------------------------------
26 :
27 : // Metadata stored before the coroutine frame
28 : struct frame_prefix
29 : {
30 : frame_prefix* next;
31 : strand_queue* queue;
32 : std::size_t alloc_size;
33 : };
34 :
35 : //----------------------------------------------------------
36 :
37 : /** Wrapper coroutine for strand queue dispatch operations.
38 :
39 : This coroutine wraps a target coroutine handle and resumes
40 : it when dispatched. The wrapper ensures control returns to
41 : the dispatch loop after the target suspends or completes.
42 :
43 : The promise contains an intrusive list node for queue
44 : storage and supports a custom allocator that recycles
45 : coroutine frames via a free list.
46 : */
47 : struct strand_op
48 : {
49 : struct promise_type
50 : {
51 : promise_type* next = nullptr;
52 :
53 : void*
54 : operator new(
55 : std::size_t size,
56 : strand_queue& q,
57 : std::coroutine_handle<void>);
58 :
59 : void
60 : operator delete(void* p, std::size_t);
61 :
62 : strand_op
63 377 : get_return_object() noexcept
64 : {
65 377 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
66 : }
67 :
68 : std::suspend_always
69 377 : initial_suspend() noexcept
70 : {
71 377 : return {};
72 : }
73 :
74 : std::suspend_always
75 375 : final_suspend() noexcept
76 : {
77 375 : return {};
78 : }
79 :
80 : void
81 375 : return_void() noexcept
82 : {
83 375 : }
84 :
85 : void
86 0 : unhandled_exception()
87 : {
88 0 : std::terminate();
89 : }
90 : };
91 :
92 : std::coroutine_handle<promise_type> h_;
93 : };
94 :
95 : //----------------------------------------------------------
96 :
97 : /** Single-threaded dispatch queue for coroutine handles.
98 :
99 : This queue stores coroutine handles and resumes them
100 : sequentially when dispatch() is called. Each pushed
101 : handle is wrapped in a strand_op coroutine that ensures
102 : control returns to the dispatch loop after the target
103 : suspends or completes.
104 :
105 : The queue uses an intrusive singly-linked list through
106 : the promise type to avoid separate node allocations.
107 : A free list recycles wrapper coroutine frames to reduce
108 : allocation overhead during repeated push/dispatch cycles.
109 :
110 : @par Thread Safety
111 : This class is not thread-safe. All operations must be
112 : called from a single thread.
113 : */
114 : class strand_queue
115 : {
116 : using promise_type = strand_op::promise_type;
117 :
118 : promise_type* head_ = nullptr;
119 : promise_type* tail_ = nullptr;
120 : frame_prefix* free_list_ = nullptr;
121 :
122 : friend struct strand_op::promise_type;
123 :
124 : static
125 : strand_op
126 377 : make_strand_op(
127 : strand_queue& q,
128 : std::coroutine_handle<void> target)
129 : {
130 : (void)q;
131 : target.resume();
132 : co_return;
133 754 : }
134 :
135 : public:
136 3798 : strand_queue() = default;
137 :
138 : strand_queue(strand_queue const&) = delete;
139 : strand_queue& operator=(strand_queue const&) = delete;
140 :
141 : /** Destructor.
142 :
143 : Destroys any pending wrappers without resuming them,
144 : then frees all memory in the free list.
145 : */
146 3806 : ~strand_queue()
147 : {
148 : // Destroy pending wrappers
149 3808 : while(head_)
150 : {
151 2 : promise_type* p = head_;
152 2 : head_ = p->next;
153 :
154 2 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155 2 : h.destroy();
156 : }
157 :
158 : // Free the free list memory
159 3916 : while(free_list_)
160 : {
161 110 : frame_prefix* prefix = free_list_;
162 110 : free_list_ = prefix->next;
163 110 : ::operator delete(prefix);
164 : }
165 3806 : }
166 :
167 : /** Returns true if there are no pending operations.
168 : */
169 : bool
170 34 : empty() const noexcept
171 : {
172 34 : return head_ == nullptr;
173 : }
174 :
175 : /** Push a coroutine handle to the queue.
176 :
177 : Creates a wrapper coroutine and appends it to the
178 : queue. The wrapper will resume the target handle
179 : when dispatch() processes it.
180 :
181 : @param h The coroutine handle to dispatch.
182 : */
183 : void
184 377 : push(std::coroutine_handle<void> h)
185 : {
186 377 : strand_op op = make_strand_op(*this, h);
187 :
188 377 : promise_type* p = &op.h_.promise();
189 377 : p->next = nullptr;
190 :
191 377 : if(tail_)
192 345 : tail_->next = p;
193 : else
194 32 : head_ = p;
195 377 : tail_ = p;
196 377 : }
197 :
198 : /** Resume all queued coroutines in sequence.
199 :
200 : Processes each wrapper in FIFO order, resuming its
201 : target coroutine. After each target suspends or
202 : completes, the wrapper is destroyed and its frame
203 : is added to the free list for reuse.
204 :
205 : Coroutines resumed during dispatch may push new
206 : handles, which will also be processed in the same
207 : dispatch call.
208 :
209 : @warning Not thread-safe. Do not call while another
210 : thread may be calling push().
211 : */
212 : void
213 7 : dispatch()
214 : {
215 117 : while(head_)
216 : {
217 110 : promise_type* p = head_;
218 110 : head_ = p->next;
219 110 : if(!head_)
220 7 : tail_ = nullptr;
221 :
222 110 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223 110 : h.resume();
224 110 : h.destroy();
225 : }
226 7 : }
227 :
228 : /** Batch of taken items for thread-safe dispatch. */
229 : struct taken_batch
230 : {
231 : promise_type* head = nullptr;
232 : promise_type* tail = nullptr;
233 : };
234 :
235 : /** Take all pending items atomically.
236 :
237 : Removes all items from the queue and returns them
238 : as a batch. The queue is left empty.
239 :
240 : @return The batch of taken items.
241 : */
242 : taken_batch
243 24 : take_all() noexcept
244 : {
245 24 : taken_batch batch{head_, tail_};
246 24 : head_ = tail_ = nullptr;
247 24 : return batch;
248 : }
249 :
250 : /** Dispatch a batch of taken items.
251 :
252 : @param batch The batch to dispatch.
253 :
254 : @note This is thread-safe w.r.t. push() because it doesn't
255 : access the queue's free_list_. Frames are deleted directly
256 : rather than recycled.
257 : */
258 : static
259 : void
260 24 : dispatch_batch(taken_batch& batch)
261 : {
262 289 : while(batch.head)
263 : {
264 265 : promise_type* p = batch.head;
265 265 : batch.head = p->next;
266 :
267 265 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268 265 : h.resume();
269 : // Don't use h.destroy() - it would call operator delete which
270 : // accesses the queue's free_list_ (race with push).
271 : // Instead, manually free the frame without recycling.
272 : // h.address() returns the frame base (what operator new returned).
273 265 : frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274 265 : ::operator delete(prefix);
275 : }
276 24 : batch.tail = nullptr;
277 24 : }
278 : };
279 :
280 : //----------------------------------------------------------
281 :
282 : inline
283 : void*
284 377 : strand_op::promise_type::operator new(
285 : std::size_t size,
286 : strand_queue& q,
287 : std::coroutine_handle<void>)
288 : {
289 : // Total size includes prefix
290 377 : std::size_t alloc_size = size + sizeof(frame_prefix);
291 : void* raw;
292 :
293 : // Try to reuse from free list
294 377 : if(q.free_list_)
295 : {
296 2 : frame_prefix* prefix = q.free_list_;
297 2 : q.free_list_ = prefix->next;
298 2 : raw = prefix;
299 : }
300 : else
301 : {
302 375 : raw = ::operator new(alloc_size);
303 : }
304 :
305 : // Initialize prefix
306 377 : frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307 377 : prefix->next = nullptr;
308 377 : prefix->queue = &q;
309 377 : prefix->alloc_size = alloc_size;
310 :
311 : // Return pointer AFTER the prefix (this is where coroutine frame goes)
312 377 : return prefix + 1;
313 : }
314 :
315 : inline
316 : void
317 112 : strand_op::promise_type::operator delete(void* p, std::size_t)
318 : {
319 : // Calculate back to get the prefix
320 112 : frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321 :
322 : // Add to free list
323 112 : prefix->next = prefix->queue->free_list_;
324 112 : prefix->queue->free_list_ = prefix;
325 112 : }
326 :
327 : } // namespace detail
328 : } // namespace capy
329 : } // namespace boost
330 :
331 : #endif
|