Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_queue.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <boost/capy/ex/any_coro.hpp>
13 :
14 : #include <atomic>
15 : #include <coroutine>
16 : #include <mutex>
17 : #include <thread>
18 : #include <utility>
19 :
20 : namespace boost {
21 : namespace capy {
22 : namespace detail {
23 :
24 : //----------------------------------------------------------
25 :
26 : /** Implementation state for a strand.
27 :
28 : Each strand_impl provides serialization for coroutines
29 : dispatched through strands that share it.
30 : */
31 : struct strand_impl
32 : {
33 : std::mutex mutex_;
34 : strand_queue pending_;
35 : bool locked_ = false;
36 : std::atomic<std::thread::id> dispatch_thread_{};
37 : void* cached_frame_ = nullptr;
38 : };
39 :
40 : //----------------------------------------------------------
41 :
42 : /** Invoker coroutine for strand dispatch.
43 :
44 : Uses custom allocator to recycle frame - one allocation
45 : per strand_impl lifetime, stored in trailer for recovery.
46 : */
47 : struct strand_invoker
48 : {
49 : struct promise_type
50 : {
51 24 : void* operator new(std::size_t n, strand_impl& impl)
52 : {
53 24 : constexpr auto A = alignof(strand_impl*);
54 24 : std::size_t padded = (n + A - 1) & ~(A - 1);
55 24 : std::size_t total = padded + sizeof(strand_impl*);
56 :
57 24 : void* p = impl.cached_frame_
58 24 : ? std::exchange(impl.cached_frame_, nullptr)
59 9 : : ::operator new(total);
60 :
61 : // Trailer lets delete recover impl
62 24 : *reinterpret_cast<strand_impl**>(
63 24 : static_cast<char*>(p) + padded) = &impl;
64 24 : return p;
65 : }
66 :
67 24 : void operator delete(void* p, std::size_t n) noexcept
68 : {
69 24 : constexpr auto A = alignof(strand_impl*);
70 24 : std::size_t padded = (n + A - 1) & ~(A - 1);
71 :
72 24 : auto* impl = *reinterpret_cast<strand_impl**>(
73 : static_cast<char*>(p) + padded);
74 :
75 24 : if (!impl->cached_frame_)
76 24 : impl->cached_frame_ = p;
77 : else
78 0 : ::operator delete(p);
79 24 : }
80 :
81 24 : strand_invoker get_return_object() noexcept
82 24 : { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
83 :
84 24 : std::suspend_always initial_suspend() noexcept { return {}; }
85 24 : std::suspend_never final_suspend() noexcept { return {}; }
86 24 : void return_void() noexcept {}
87 0 : void unhandled_exception() { std::terminate(); }
88 : };
89 :
90 : std::coroutine_handle<promise_type> h_;
91 : };
92 :
93 : //----------------------------------------------------------
94 :
95 : /** Concrete implementation of strand_service.
96 :
97 : Holds the fixed pool of strand_impl objects.
98 : */
99 : class strand_service_impl : public strand_service
100 : {
101 : static constexpr std::size_t num_impls = 211;
102 :
103 : strand_impl impls_[num_impls];
104 : std::size_t salt_ = 0;
105 : std::mutex mutex_;
106 :
107 : public:
108 : explicit
109 18 : strand_service_impl(execution_context&)
110 3816 : {
111 18 : }
112 :
113 : strand_impl*
114 23 : get_implementation() override
115 : {
116 23 : std::lock_guard<std::mutex> lock(mutex_);
117 23 : std::size_t index = salt_++;
118 23 : index = index % num_impls;
119 23 : return &impls_[index];
120 23 : }
121 :
122 : protected:
123 : void
124 18 : shutdown() override
125 : {
126 3816 : for(std::size_t i = 0; i < num_impls; ++i)
127 : {
128 3798 : std::lock_guard<std::mutex> lock(impls_[i].mutex_);
129 3798 : impls_[i].locked_ = true;
130 :
131 3798 : if(impls_[i].cached_frame_)
132 : {
133 9 : ::operator delete(impls_[i].cached_frame_);
134 9 : impls_[i].cached_frame_ = nullptr;
135 : }
136 3798 : }
137 18 : }
138 :
139 : private:
140 : static bool
141 265 : enqueue(strand_impl& impl, any_coro h)
142 : {
143 265 : std::lock_guard<std::mutex> lock(impl.mutex_);
144 265 : impl.pending_.push(h);
145 265 : if(!impl.locked_)
146 : {
147 24 : impl.locked_ = true;
148 24 : return true;
149 : }
150 241 : return false;
151 265 : }
152 :
153 : static void
154 24 : dispatch_pending(strand_impl& impl)
155 : {
156 24 : strand_queue::taken_batch batch;
157 : {
158 24 : std::lock_guard<std::mutex> lock(impl.mutex_);
159 24 : batch = impl.pending_.take_all();
160 24 : }
161 24 : impl.pending_.dispatch_batch(batch);
162 24 : }
163 :
164 : static bool
165 24 : try_unlock(strand_impl& impl)
166 : {
167 24 : std::lock_guard<std::mutex> lock(impl.mutex_);
168 24 : if(impl.pending_.empty())
169 : {
170 24 : impl.locked_ = false;
171 24 : return true;
172 : }
173 0 : return false;
174 24 : }
175 :
176 : static void
177 24 : set_dispatch_thread(strand_impl& impl) noexcept
178 : {
179 24 : impl.dispatch_thread_.store(std::this_thread::get_id());
180 24 : }
181 :
182 : static void
183 24 : clear_dispatch_thread(strand_impl& impl) noexcept
184 : {
185 24 : impl.dispatch_thread_.store(std::thread::id{});
186 24 : }
187 :
188 : // Loops until queue empty (aggressive). Alternative: per-batch fairness
189 : // (repost after each batch to let other work run) - explore if starvation observed.
190 : static strand_invoker
191 24 : make_invoker(strand_impl& impl)
192 : {
193 : strand_impl* p = &impl;
194 : for(;;)
195 : {
196 : set_dispatch_thread(*p);
197 : dispatch_pending(*p);
198 : if(try_unlock(*p))
199 : {
200 : clear_dispatch_thread(*p);
201 : co_return;
202 : }
203 : }
204 48 : }
205 :
206 : friend class strand_service;
207 : };
208 :
209 : //----------------------------------------------------------
210 :
211 18 : strand_service::
212 18 : strand_service()
213 18 : : service()
214 : {
215 18 : }
216 :
217 18 : strand_service::
218 : ~strand_service() = default;
219 :
220 : bool
221 4 : strand_service::
222 : running_in_this_thread(strand_impl& impl) noexcept
223 : {
224 4 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
225 : }
226 :
227 : any_coro
228 3 : strand_service::
229 : dispatch(strand_impl& impl, any_executor_ref ex, any_coro h)
230 : {
231 3 : if(running_in_this_thread(impl))
232 0 : return h;
233 :
234 3 : if(strand_service_impl::enqueue(impl, h))
235 3 : ex.post(strand_service_impl::make_invoker(impl).h_);
236 :
237 3 : return std::noop_coroutine();
238 : }
239 :
240 : void
241 262 : strand_service::
242 : post(strand_impl& impl, any_executor_ref ex, any_coro h)
243 : {
244 262 : if(strand_service_impl::enqueue(impl, h))
245 21 : ex.post(strand_service_impl::make_invoker(impl).h_);
246 262 : }
247 :
248 : strand_service&
249 25 : get_strand_service(execution_context& ctx)
250 : {
251 25 : return ctx.use_service<strand_service_impl>();
252 : }
253 :
254 : } // namespace detail
255 : } // namespace capy
256 : } // namespace boost
|