| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/capy | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | ||
| 11 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | ||
| 12 | |||
| 13 | #include <boost/capy/detail/config.hpp> | ||
| 14 | |||
| 15 | #include <coroutine> | ||
| 16 | #include <cstddef> | ||
| 17 | #include <exception> | ||
| 18 | |||
| 19 | namespace boost { | ||
| 20 | namespace capy { | ||
| 21 | namespace detail { | ||
| 22 | |||
| 23 | class strand_queue; | ||
| 24 | |||
| 25 | //---------------------------------------------------------- | ||
| 26 | |||
| 27 | // Metadata stored before the coroutine frame | ||
| 28 | struct frame_prefix | ||
| 29 | { | ||
| 30 | frame_prefix* next; | ||
| 31 | strand_queue* queue; | ||
| 32 | std::size_t alloc_size; | ||
| 33 | }; | ||
| 34 | |||
| 35 | //---------------------------------------------------------- | ||
| 36 | |||
| 37 | /** Wrapper coroutine for strand queue dispatch operations. | ||
| 38 | |||
| 39 | This coroutine wraps a target coroutine handle and resumes | ||
| 40 | it when dispatched. The wrapper ensures control returns to | ||
| 41 | the dispatch loop after the target suspends or completes. | ||
| 42 | |||
| 43 | The promise contains an intrusive list node for queue | ||
| 44 | storage and supports a custom allocator that recycles | ||
| 45 | coroutine frames via a free list. | ||
| 46 | */ | ||
| 47 | struct strand_op | ||
| 48 | { | ||
| 49 | struct promise_type | ||
| 50 | { | ||
| 51 | promise_type* next = nullptr; | ||
| 52 | |||
| 53 | void* | ||
| 54 | operator new( | ||
| 55 | std::size_t size, | ||
| 56 | strand_queue& q, | ||
| 57 | std::coroutine_handle<void>); | ||
| 58 | |||
| 59 | void | ||
| 60 | operator delete(void* p, std::size_t); | ||
| 61 | |||
| 62 | strand_op | ||
| 63 | 377 | get_return_object() noexcept | |
| 64 | { | ||
| 65 | 377 | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | |
| 66 | } | ||
| 67 | |||
| 68 | std::suspend_always | ||
| 69 | 377 | initial_suspend() noexcept | |
| 70 | { | ||
| 71 | 377 | return {}; | |
| 72 | } | ||
| 73 | |||
| 74 | std::suspend_always | ||
| 75 | 375 | final_suspend() noexcept | |
| 76 | { | ||
| 77 | 375 | return {}; | |
| 78 | } | ||
| 79 | |||
| 80 | void | ||
| 81 | 375 | return_void() noexcept | |
| 82 | { | ||
| 83 | 375 | } | |
| 84 | |||
| 85 | void | ||
| 86 | ✗ | unhandled_exception() | |
| 87 | { | ||
| 88 | ✗ | std::terminate(); | |
| 89 | } | ||
| 90 | }; | ||
| 91 | |||
| 92 | std::coroutine_handle<promise_type> h_; | ||
| 93 | }; | ||
| 94 | |||
| 95 | //---------------------------------------------------------- | ||
| 96 | |||
| 97 | /** Single-threaded dispatch queue for coroutine handles. | ||
| 98 | |||
| 99 | This queue stores coroutine handles and resumes them | ||
| 100 | sequentially when dispatch() is called. Each pushed | ||
| 101 | handle is wrapped in a strand_op coroutine that ensures | ||
| 102 | control returns to the dispatch loop after the target | ||
| 103 | suspends or completes. | ||
| 104 | |||
| 105 | The queue uses an intrusive singly-linked list through | ||
| 106 | the promise type to avoid separate node allocations. | ||
| 107 | A free list recycles wrapper coroutine frames to reduce | ||
| 108 | allocation overhead during repeated push/dispatch cycles. | ||
| 109 | |||
| 110 | @par Thread Safety | ||
| 111 | This class is not thread-safe. All operations must be | ||
| 112 | called from a single thread. | ||
| 113 | */ | ||
| 114 | class strand_queue | ||
| 115 | { | ||
| 116 | using promise_type = strand_op::promise_type; | ||
| 117 | |||
| 118 | promise_type* head_ = nullptr; | ||
| 119 | promise_type* tail_ = nullptr; | ||
| 120 | frame_prefix* free_list_ = nullptr; | ||
| 121 | |||
| 122 | friend struct strand_op::promise_type; | ||
| 123 | |||
| 124 | static | ||
| 125 | strand_op | ||
| 126 |
1/1✓ Branch 1 taken 377 times.
|
377 | make_strand_op( |
| 127 | strand_queue& q, | ||
| 128 | std::coroutine_handle<void> target) | ||
| 129 | { | ||
| 130 | (void)q; | ||
| 131 | target.resume(); | ||
| 132 | co_return; | ||
| 133 | 754 | } | |
| 134 | |||
| 135 | public: | ||
| 136 | 3798 | strand_queue() = default; | |
| 137 | |||
| 138 | strand_queue(strand_queue const&) = delete; | ||
| 139 | strand_queue& operator=(strand_queue const&) = delete; | ||
| 140 | |||
| 141 | /** Destructor. | ||
| 142 | |||
| 143 | Destroys any pending wrappers without resuming them, | ||
| 144 | then frees all memory in the free list. | ||
| 145 | */ | ||
| 146 | 3806 | ~strand_queue() | |
| 147 | { | ||
| 148 | // Destroy pending wrappers | ||
| 149 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3806 times.
|
3808 | while(head_) |
| 150 | { | ||
| 151 | 2 | promise_type* p = head_; | |
| 152 | 2 | head_ = p->next; | |
| 153 | |||
| 154 | 2 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 155 | 2 | h.destroy(); | |
| 156 | } | ||
| 157 | |||
| 158 | // Free the free list memory | ||
| 159 |
2/2✓ Branch 0 taken 110 times.
✓ Branch 1 taken 3806 times.
|
3916 | while(free_list_) |
| 160 | { | ||
| 161 | 110 | frame_prefix* prefix = free_list_; | |
| 162 | 110 | free_list_ = prefix->next; | |
| 163 | 110 | ::operator delete(prefix); | |
| 164 | } | ||
| 165 | 3806 | } | |
| 166 | |||
| 167 | /** Returns true if there are no pending operations. | ||
| 168 | */ | ||
| 169 | bool | ||
| 170 | 34 | empty() const noexcept | |
| 171 | { | ||
| 172 | 34 | return head_ == nullptr; | |
| 173 | } | ||
| 174 | |||
| 175 | /** Push a coroutine handle to the queue. | ||
| 176 | |||
| 177 | Creates a wrapper coroutine and appends it to the | ||
| 178 | queue. The wrapper will resume the target handle | ||
| 179 | when dispatch() processes it. | ||
| 180 | |||
| 181 | @param h The coroutine handle to dispatch. | ||
| 182 | */ | ||
| 183 | void | ||
| 184 | 377 | push(std::coroutine_handle<void> h) | |
| 185 | { | ||
| 186 |
1/1✓ Branch 1 taken 377 times.
|
377 | strand_op op = make_strand_op(*this, h); |
| 187 | |||
| 188 | 377 | promise_type* p = &op.h_.promise(); | |
| 189 | 377 | p->next = nullptr; | |
| 190 | |||
| 191 |
2/2✓ Branch 0 taken 345 times.
✓ Branch 1 taken 32 times.
|
377 | if(tail_) |
| 192 | 345 | tail_->next = p; | |
| 193 | else | ||
| 194 | 32 | head_ = p; | |
| 195 | 377 | tail_ = p; | |
| 196 | 377 | } | |
| 197 | |||
| 198 | /** Resume all queued coroutines in sequence. | ||
| 199 | |||
| 200 | Processes each wrapper in FIFO order, resuming its | ||
| 201 | target coroutine. After each target suspends or | ||
| 202 | completes, the wrapper is destroyed and its frame | ||
| 203 | is added to the free list for reuse. | ||
| 204 | |||
| 205 | Coroutines resumed during dispatch may push new | ||
| 206 | handles, which will also be processed in the same | ||
| 207 | dispatch call. | ||
| 208 | |||
| 209 | @warning Not thread-safe. Do not call while another | ||
| 210 | thread may be calling push(). | ||
| 211 | */ | ||
| 212 | void | ||
| 213 | 7 | dispatch() | |
| 214 | { | ||
| 215 |
2/2✓ Branch 0 taken 110 times.
✓ Branch 1 taken 7 times.
|
117 | while(head_) |
| 216 | { | ||
| 217 | 110 | promise_type* p = head_; | |
| 218 | 110 | head_ = p->next; | |
| 219 |
2/2✓ Branch 0 taken 7 times.
✓ Branch 1 taken 103 times.
|
110 | if(!head_) |
| 220 | 7 | tail_ = nullptr; | |
| 221 | |||
| 222 | 110 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 223 |
1/1✓ Branch 1 taken 110 times.
|
110 | h.resume(); |
| 224 |
1/1✓ Branch 1 taken 110 times.
|
110 | h.destroy(); |
| 225 | } | ||
| 226 | 7 | } | |
| 227 | |||
| 228 | /** Batch of taken items for thread-safe dispatch. */ | ||
| 229 | struct taken_batch | ||
| 230 | { | ||
| 231 | promise_type* head = nullptr; | ||
| 232 | promise_type* tail = nullptr; | ||
| 233 | }; | ||
| 234 | |||
| 235 | /** Take all pending items atomically. | ||
| 236 | |||
| 237 | Removes all items from the queue and returns them | ||
| 238 | as a batch. The queue is left empty. | ||
| 239 | |||
| 240 | @return The batch of taken items. | ||
| 241 | */ | ||
| 242 | taken_batch | ||
| 243 | 24 | take_all() noexcept | |
| 244 | { | ||
| 245 | 24 | taken_batch batch{head_, tail_}; | |
| 246 | 24 | head_ = tail_ = nullptr; | |
| 247 | 24 | return batch; | |
| 248 | } | ||
| 249 | |||
| 250 | /** Dispatch a batch of taken items. | ||
| 251 | |||
| 252 | @param batch The batch to dispatch. | ||
| 253 | |||
| 254 | @note This is thread-safe w.r.t. push() because it doesn't | ||
| 255 | access the queue's free_list_. Frames are deleted directly | ||
| 256 | rather than recycled. | ||
| 257 | */ | ||
| 258 | static | ||
| 259 | void | ||
| 260 | 24 | dispatch_batch(taken_batch& batch) | |
| 261 | { | ||
| 262 |
2/2✓ Branch 0 taken 265 times.
✓ Branch 1 taken 24 times.
|
289 | while(batch.head) |
| 263 | { | ||
| 264 | 265 | promise_type* p = batch.head; | |
| 265 | 265 | batch.head = p->next; | |
| 266 | |||
| 267 | 265 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 268 |
1/1✓ Branch 1 taken 265 times.
|
265 | h.resume(); |
| 269 | // Don't use h.destroy() - it would call operator delete which | ||
| 270 | // accesses the queue's free_list_ (race with push). | ||
| 271 | // Instead, manually free the frame without recycling. | ||
| 272 | // h.address() returns the frame base (what operator new returned). | ||
| 273 | 265 | frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; | |
| 274 | 265 | ::operator delete(prefix); | |
| 275 | } | ||
| 276 | 24 | batch.tail = nullptr; | |
| 277 | 24 | } | |
| 278 | }; | ||
| 279 | |||
| 280 | //---------------------------------------------------------- | ||
| 281 | |||
| 282 | inline | ||
| 283 | void* | ||
| 284 | 377 | strand_op::promise_type::operator new( | |
| 285 | std::size_t size, | ||
| 286 | strand_queue& q, | ||
| 287 | std::coroutine_handle<void>) | ||
| 288 | { | ||
| 289 | // Total size includes prefix | ||
| 290 | 377 | std::size_t alloc_size = size + sizeof(frame_prefix); | |
| 291 | void* raw; | ||
| 292 | |||
| 293 | // Try to reuse from free list | ||
| 294 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 375 times.
|
377 | if(q.free_list_) |
| 295 | { | ||
| 296 | 2 | frame_prefix* prefix = q.free_list_; | |
| 297 | 2 | q.free_list_ = prefix->next; | |
| 298 | 2 | raw = prefix; | |
| 299 | } | ||
| 300 | else | ||
| 301 | { | ||
| 302 | 375 | raw = ::operator new(alloc_size); | |
| 303 | } | ||
| 304 | |||
| 305 | // Initialize prefix | ||
| 306 | 377 | frame_prefix* prefix = static_cast<frame_prefix*>(raw); | |
| 307 | 377 | prefix->next = nullptr; | |
| 308 | 377 | prefix->queue = &q; | |
| 309 | 377 | prefix->alloc_size = alloc_size; | |
| 310 | |||
| 311 | // Return pointer AFTER the prefix (this is where coroutine frame goes) | ||
| 312 | 377 | return prefix + 1; | |
| 313 | } | ||
| 314 | |||
| 315 | inline | ||
| 316 | void | ||
| 317 | 112 | strand_op::promise_type::operator delete(void* p, std::size_t) | |
| 318 | { | ||
| 319 | // Calculate back to get the prefix | ||
| 320 | 112 | frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; | |
| 321 | |||
| 322 | // Add to free list | ||
| 323 | 112 | prefix->next = prefix->queue->free_list_; | |
| 324 | 112 | prefix->queue->free_list_ = prefix; | |
| 325 | 112 | } | |
| 326 | |||
| 327 | } // namespace detail | ||
| 328 | } // namespace capy | ||
| 329 | } // namespace boost | ||
| 330 | |||
| 331 | #endif | ||
| 332 |