src/ex/detail/strand_service.cpp

97.8% Lines (89/91) 95.5% Functions (21/22)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 // Sentinel stored in cached_frame_ after shutdown to prevent
30 // in-flight invokers from repopulating a freed cache slot.
31 inline void* const kCacheClosed = reinterpret_cast<void*>(1);
32
33 struct strand_impl
34 {
35 std::mutex mutex_;
36 strand_queue pending_;
37 bool locked_ = false;
38 std::atomic<std::thread::id> dispatch_thread_{};
39 std::atomic<void*> cached_frame_{nullptr};
40 };
41
42 //----------------------------------------------------------
43
44 /** Invoker coroutine for strand dispatch.
45
46 Uses custom allocator to recycle frame - one allocation
47 per strand_impl lifetime, stored in trailer for recovery.
48 */
49 struct strand_invoker
50 {
51 struct promise_type
52 {
53 10x void* operator new(std::size_t n, strand_impl& impl)
54 {
55 10x constexpr auto A = alignof(strand_impl*);
56 10x std::size_t padded = (n + A - 1) & ~(A - 1);
57 10x std::size_t total = padded + sizeof(strand_impl*);
58
59 10x void* p = impl.cached_frame_.exchange(
60 nullptr, std::memory_order_acquire);
61 10x if(!p || p == kCacheClosed)
62 9x p = ::operator new(total);
63
64 // Trailer lets delete recover impl
65 10x *reinterpret_cast<strand_impl**>(
66 10x static_cast<char*>(p) + padded) = &impl;
67 10x return p;
68 }
69
70 10x void operator delete(void* p, std::size_t n) noexcept
71 {
72 10x constexpr auto A = alignof(strand_impl*);
73 10x std::size_t padded = (n + A - 1) & ~(A - 1);
74
75 10x auto* impl = *reinterpret_cast<strand_impl**>(
76 static_cast<char*>(p) + padded);
77
78 10x void* expected = nullptr;
79 10x if(!impl->cached_frame_.compare_exchange_strong(
80 expected, p, std::memory_order_release))
81 ::operator delete(p);
82 10x }
83
84 10x strand_invoker get_return_object() noexcept
85 10x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
86
87 10x std::suspend_always initial_suspend() noexcept { return {}; }
88 10x std::suspend_never final_suspend() noexcept { return {}; }
89 10x void return_void() noexcept {}
90 void unhandled_exception() { std::terminate(); }
91 };
92
93 std::coroutine_handle<promise_type> h_;
94 };
95
96 //----------------------------------------------------------
97
98 /** Concrete implementation of strand_service.
99
100 Holds the fixed pool of strand_impl objects.
101 */
102 class strand_service_impl : public strand_service
103 {
104 static constexpr std::size_t num_impls = 211;
105
106 strand_impl impls_[num_impls];
107 std::size_t salt_ = 0;
108 std::mutex mutex_;
109
110 public:
111 explicit
112 21x strand_service_impl(execution_context&)
113 4452x {
114 21x }
115
116 strand_impl*
117 25x get_implementation() override
118 {
119 25x std::lock_guard<std::mutex> lock(mutex_);
120 25x std::size_t index = salt_++;
121 25x index = index % num_impls;
122 25x return &impls_[index];
123 25x }
124
125 protected:
126 void
127 21x shutdown() override
128 {
129 4452x for(std::size_t i = 0; i < num_impls; ++i)
130 {
131 4431x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
132 4431x impls_[i].locked_ = true;
133
134 4431x void* p = impls_[i].cached_frame_.exchange(
135 kCacheClosed, std::memory_order_acquire);
136 4431x if(p)
137 9x ::operator delete(p);
138 4431x }
139 21x }
140
141 private:
142 static bool
143 328x enqueue(strand_impl& impl, std::coroutine_handle<> h)
144 {
145 328x std::lock_guard<std::mutex> lock(impl.mutex_);
146 328x impl.pending_.push(h);
147 328x if(!impl.locked_)
148 {
149 10x impl.locked_ = true;
150 10x return true;
151 }
152 318x return false;
153 328x }
154
155 static void
156 22x dispatch_pending(strand_impl& impl)
157 {
158 22x strand_queue::taken_batch batch;
159 {
160 22x std::lock_guard<std::mutex> lock(impl.mutex_);
161 22x batch = impl.pending_.take_all();
162 22x }
163 22x impl.pending_.dispatch_batch(batch);
164 22x }
165
166 static bool
167 22x try_unlock(strand_impl& impl)
168 {
169 22x std::lock_guard<std::mutex> lock(impl.mutex_);
170 22x if(impl.pending_.empty())
171 {
172 10x impl.locked_ = false;
173 10x return true;
174 }
175 12x return false;
176 22x }
177
178 static void
179 22x set_dispatch_thread(strand_impl& impl) noexcept
180 {
181 22x impl.dispatch_thread_.store(std::this_thread::get_id());
182 22x }
183
184 static void
185 10x clear_dispatch_thread(strand_impl& impl) noexcept
186 {
187 10x impl.dispatch_thread_.store(std::thread::id{});
188 10x }
189
190 // Loops until queue empty (aggressive). Alternative: per-batch fairness
191 // (repost after each batch to let other work run) - explore if starvation observed.
192 static strand_invoker
193 10x make_invoker(strand_impl& impl)
194 {
195 strand_impl* p = &impl;
196 for(;;)
197 {
198 set_dispatch_thread(*p);
199 dispatch_pending(*p);
200 if(try_unlock(*p))
201 {
202 clear_dispatch_thread(*p);
203 co_return;
204 }
205 }
206 20x }
207
208 friend class strand_service;
209 };
210
211 //----------------------------------------------------------
212
213 21x strand_service::
214 21x strand_service()
215 21x : service()
216 {
217 21x }
218
219 21x strand_service::
220 ~strand_service() = default;
221
222 bool
223 6x strand_service::
224 running_in_this_thread(strand_impl& impl) noexcept
225 {
226 6x return impl.dispatch_thread_.load() == std::this_thread::get_id();
227 }
228
229 std::coroutine_handle<>
230 5x strand_service::
231 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
232 {
233 5x if(running_in_this_thread(impl))
234 2x return h;
235
236 3x if(strand_service_impl::enqueue(impl, h))
237 3x ex.post(strand_service_impl::make_invoker(impl).h_);
238 3x return std::noop_coroutine();
239 }
240
241 void
242 325x strand_service::
243 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
244 {
245 325x if(strand_service_impl::enqueue(impl, h))
246 7x ex.post(strand_service_impl::make_invoker(impl).h_);
247 325x }
248
249 strand_service&
250 25x get_strand_service(execution_context& ctx)
251 {
252 25x return ctx.use_service<strand_service_impl>();
253 }
254
255 } // namespace detail
256 } // namespace capy
257 } // namespace boost
258