CPPuddle
Loading...
Searching...
No Matches
executor_pools_management.hpp
Go to the documentation of this file.
1// Copyright (c) 2020-2024 Gregor Daiß
2//
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6#ifndef EXECUTOR_POOLS_MANAGEMENT_HPP
7#define EXECUTOR_POOLS_MANAGEMENT_HPP
8
9#include <algorithm>
10#include <array>
11#include <cassert>
12#include <deque>
13#include <iostream>
14#include <memory>
15#include <mutex>
16#include <queue>
17#include <tuple>
18#include <type_traits>
19
21
22// Need to cuda/hip definitions for default params when NOT
23// drawing from an executor pool
24#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
25#include <hpx/config.hpp>
26#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
27#include <hpx/async_cuda/cuda_executor.hpp>
28#endif
29#endif
30
31// Redefintion required for non-recycling executors
32// Without it, default constructing the executors (independent) would not work
33#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
34// Do only define if Kokkos is not found
35#ifndef KOKKOS_ENABLE_SERIAL
36namespace hpx { namespace kokkos {
37enum class execution_space_mode { global, independent };
38}}
39#endif
40#endif
41
42namespace cppuddle {
43namespace executor_recycling {
44namespace detail {
45
47template<typename mutex_array_t>
48auto make_scoped_lock_from_array(mutex_array_t& mutexes)
49{
50 return std::apply([](auto&... mutexes) { return std::scoped_lock{mutexes...}; },
51 mutexes);
52}
53
54template <typename Interface> class round_robin_pool_impl {
55private:
56 std::deque<Interface> pool{};
57 std::vector<size_t> ref_counters{};
58 size_t current_interface{0};
59
60public:
61 template <typename... Ts>
62 round_robin_pool_impl(size_t number_of_executors, Ts... executor_args) {
63 ref_counters.reserve(number_of_executors);
64 for (int i = 0; i < number_of_executors; i++) {
65 pool.emplace_back(executor_args...);
66 ref_counters.emplace_back(0);
67 }
68 }
69 // return a tuple with the interface and its index (to release it later)
70 std::tuple<Interface &, size_t> get_interface() {
71 assert(!(pool.empty()));
72 size_t last_interface = current_interface;
73 current_interface = (current_interface + 1) % pool.size();
74 ref_counters[last_interface]++;
75 std::tuple<Interface &, size_t> ret(pool[last_interface], last_interface);
76 return ret;
77 }
78 void release_interface(size_t index) { ref_counters[index]--; }
79 bool interface_available(size_t load_limit) {
80 return *(std::min_element(std::begin(ref_counters),
81 std::end(ref_counters))) < load_limit;
82 }
84 return *(
85 std::min_element(std::begin(ref_counters), std::end(ref_counters)));
86 }
87 // TODO Remove
88 /* size_t get_next_device_id() { */
89 /* return 0; // single gpu pool */
90 /* } */
91};
92
93template <typename Interface> class priority_pool_impl {
94private:
95 std::deque<Interface> pool{};
96 std::vector<size_t> ref_counters{}; // Ref counters
97 std::vector<size_t> priorities{}; // Ref counters
98public:
99 template <typename... Ts>
100 priority_pool_impl(size_t number_of_executors, Ts... executor_args) {
101 ref_counters.reserve(number_of_executors);
102 priorities.reserve(number_of_executors);
103 for (auto i = 0; i < number_of_executors; i++) {
104 pool.emplace_back(executor_args...);
105 ref_counters.emplace_back(0);
106 priorities.emplace_back(i);
107 }
108 }
109 // return a tuple with the interface and its index (to release it later)
110 std::tuple<Interface &, size_t> get_interface() {
111 auto &interface = pool[priorities[0]];
112 ref_counters[priorities[0]]++;
113 std::tuple<Interface &, size_t> ret(interface, priorities[0]);
114 std::make_heap(std::begin(priorities), std::end(priorities),
115 [this](const size_t &first, const size_t &second) -> bool {
116 return ref_counters[first] > ref_counters[second];
117 });
118 return ret;
119 }
120 void release_interface(size_t index) {
121 ref_counters[index]--;
122 std::make_heap(std::begin(priorities), std::end(priorities),
123 [this](const size_t &first, const size_t &second) -> bool {
124 return ref_counters[first] > ref_counters[second];
125 });
126 }
127 bool interface_available(size_t load_limit) {
128 return ref_counters[priorities[0]] < load_limit;
129 }
130 size_t get_current_load() { return ref_counters[priorities[0]]; }
131 // TODO remove
132 /* size_t get_next_device_id() { */
133 /* return 0; // single gpu pool */
134 /* } */
135};
136
139public:
140 template <typename Interface, typename Pool, typename... Ts>
141 static void init(size_t number_of_executors, Ts ... executor_args) {
142 executor_pool_implementation<Interface, Pool>::init(number_of_executors,
143 executor_args...);
144 }
145 template <typename Interface, typename Pool, typename... Ts>
146 static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args) {
147 executor_pool_implementation<Interface, Pool>::init_all_executor_pools(number_of_executors,
148 executor_args...);
149 }
150 template <typename Interface, typename Pool, typename... Ts>
151 static void init_executor_pool(size_t pool_id, size_t number_of_executors, Ts ... executor_args) {
152 executor_pool_implementation<Interface, Pool>::init_executor_pool(pool_id, number_of_executors,
153 executor_args...);
154 }
155 template <typename Interface, typename Pool> static void cleanup() {
156 executor_pool_implementation<Interface, Pool>::cleanup();
157 }
158 template <typename Interface, typename Pool>
159 static std::tuple<Interface &, size_t> get_interface(const size_t gpu_id) {
160 return executor_pool_implementation<Interface, Pool>::get_interface(gpu_id);
161 }
162 template <typename Interface, typename Pool>
163 static void release_interface(size_t index, const size_t gpu_id) noexcept {
164 executor_pool_implementation<Interface, Pool>::release_interface(index,
165 gpu_id);
166 }
167 template <typename Interface, typename Pool>
168 static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept {
169 return executor_pool_implementation<Interface, Pool>::interface_available(
170 load_limit, gpu_id);
171 }
172 template <typename Interface, typename Pool>
173 static size_t get_current_load(const size_t gpu_id = 0) noexcept {
174 return executor_pool_implementation<Interface, Pool>::get_current_load(
175 gpu_id);
176 }
177 template <typename Interface, typename Pool>
178 static size_t get_next_device_id(const size_t number_gpus) noexcept {
179 // TODO add round robin and min strategy
180 return cppuddle::get_device_id(number_gpus);
181 }
182
183 template <typename Interface, typename Pool>
184 static void set_device_selector(std::function<void(size_t)> select_gpu_function) {
185 executor_pool_implementation<Interface, Pool>::set_device_selector(select_gpu_function);
186 }
187
188 template <typename Interface, typename Pool>
189 static void select_device(size_t gpu_id) {
190 executor_pool_implementation<Interface, Pool>::select_device(gpu_id);
191 }
192
193private:
194 executor_pool() = default;
195
196private:
197 template <typename Interface, typename Pool> class executor_pool_implementation {
198 public:
200 template <typename... Ts>
201 static void init(size_t number_of_executors, Ts ... executor_args) {
202 /* static_assert(sizeof...(Ts) == sizeof...(Ts) && cppuddle::max_number_gpus == 1, */
203 /* "deprecated executor_pool::init does not support multigpu"); */
204 auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
205 instance().executorpools.emplace_back(number_of_executors, executor_args...);
206 assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
207 }
208
210 template <typename... Ts>
211 static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args) {
212 auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
213 if (number_of_executors > 0) {
214 for (size_t gpu_id = 0; gpu_id < cppuddle::max_number_gpus; gpu_id++) {
215 instance().select_gpu_function(gpu_id);
216 instance().executorpools.emplace_back(number_of_executors,
217 executor_args...);
218 }
219 }
220 assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
221 }
222
225 template <typename... Ts>
226 static void init_executor_pool(size_t gpu_id, size_t number_of_executors, Ts ... executor_args) {
227 auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
228 if (number_of_executors > 0) {
229 instance().select_gpu_function(gpu_id);
230 instance().executorpools.emplace_back(number_of_executors,
231 executor_args...);
232 }
233 assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
234 }
235
236 // TODO add/rename into finalize?
237 static void cleanup() {
238 auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
239 assert(instance().executorpools.size() == cppuddle::max_number_gpus);
240 instance().executorpools.clear();
241 }
242
243 static std::tuple<Interface &, size_t> get_interface(const size_t gpu_id = 0) {
244 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
245 assert(gpu_id < instance().executorpools.size());
246 return instance().executorpools[gpu_id].get_interface();
247 }
248 static void release_interface(size_t index, const size_t gpu_id = 0) {
249 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
250 assert(gpu_id < instance().executorpools.size());
251 instance().executorpools[gpu_id].release_interface(index);
252 }
253 static bool interface_available(size_t load_limit, const size_t gpu_id = 0) {
254 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
255 assert(gpu_id < instance().executorpools.size());
256 return instance().executorpools[gpu_id].interface_available(load_limit);
257 }
258 static size_t get_current_load(const size_t gpu_id = 0) {
259 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
260 assert(gpu_id < instance().executorpools.size());
261 return instance().executorpools[gpu_id].get_current_load();
262 }
263 // TODO deprecated! Remove...
264 /* static size_t get_next_device_id(const size_t gpu_id = 0) { */
265 /* std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]); */
266 /* assert(instance().executorpools.size() == cppuddle::max_number_gpus); */
267 /* return instance().executorpools[gpu_id].get_next_device_id(); */
268 /* } */
269
270 static void set_device_selector(std::function<void(size_t)> select_gpu_function) {
271 auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
272 instance().select_gpu_function = select_gpu_function;
273 }
274
275 static void select_device(size_t gpu_id) {
276 instance().select_gpu_function(gpu_id);
277 }
278
279 private:
280 executor_pool_implementation() = default;
281 cppuddle::mutex_t pool_mut{};
282 std::function<void(size_t)> select_gpu_function = [](size_t gpu_id) {
283 // By default no multi gpu support
284 assert(cppuddle::max_number_gpus == 1 || instance().executorpools.size() == 1);
285 assert(gpu_id == 0);
286 };
287
288 std::deque<Pool> executorpools{};
289 std::array<cppuddle::mutex_t, cppuddle::max_number_gpus> gpu_mutexes;
290
291 static executor_pool_implementation& instance(void) {
292 static executor_pool_implementation pool_instance{};
293 return pool_instance;
294 }
295
296 public:
297 ~executor_pool_implementation() = default;
298 // Bunch of constructors we don't need
299 executor_pool_implementation(executor_pool_implementation const &other) =
300 delete;
301 executor_pool_implementation &
302 operator=(executor_pool_implementation const &other) = delete;
303 executor_pool_implementation(executor_pool_implementation &&other) = delete;
304 executor_pool_implementation &
305 operator=(executor_pool_implementation &&other) = delete;
306 };
307
308public:
309 ~executor_pool() = default;
310 // Bunch of constructors we don't need
311 executor_pool(executor_pool const &other) = delete;
312 executor_pool &operator=(executor_pool const &other) = delete;
313 executor_pool(executor_pool &&other) = delete;
315};
316
317#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
318
319// Warn about suboptimal performance without recycling
320#pragma message \
321"Warning: Building without executor recycling! Use only for performance testing! \
322For better performance configure CPPuddle with CPPUDDLE_WITH_EXECUTOR_RECYCLING=ON!"
323
327template <typename Interface, typename Pool> class executor_interface {
328public:
329
330 template <typename Dummy = Interface>
331 explicit executor_interface(size_t gpu_id,
332 std::enable_if_t<std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value, size_t> = 0)
333 : gpu_id(gpu_id), interface(gpu_id) {}
334 template <typename Dummy = Interface>
335 explicit executor_interface(std::enable_if_t<!std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value, size_t> = 0)
336 : gpu_id(gpu_id), interface(hpx::kokkos::execution_space_mode::independent) {}
337
338 executor_interface(const executor_interface &other) = delete;
339 executor_interface &operator=(const executor_interface &other) = delete;
340 executor_interface(executor_interface &&other) = delete;
341 executor_interface &operator=(executor_interface &&other) = delete;
343 }
344
345 template <typename F, typename... Ts>
346 inline decltype(auto) post(F &&f, Ts &&... ts) {
347 return interface.post(std::forward<F>(f), std::forward<Ts>(ts)...);
348 }
349
350 template <typename F, typename... Ts>
351 inline decltype(auto) async_execute(F &&f, Ts &&... ts) {
352 return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
353 }
354
355 inline decltype(auto) get_future() {
356 return interface.get_future();
357 }
358
359 // allow implict conversion
360 operator Interface &() { // NOLINT
361 return interface;
362 }
363
364private:
365 size_t gpu_id;
366
367public:
368 Interface interface;
369};
370#else
374template <typename Interface, typename Pool> class executor_interface {
375public:
376 explicit executor_interface(size_t gpu_id)
377 : t(executor_pool::get_interface<Interface, Pool>(gpu_id)),
378 interface(std::get<0>(t)), interface_index(std::get<1>(t)), gpu_id(gpu_id) {}
379
385 executor_pool::release_interface<Interface, Pool>(interface_index, gpu_id);
386 }
387
388 template <typename F, typename... Ts>
389 inline decltype(auto) post(F &&f, Ts &&... ts) {
390 return interface.post(std::forward<F>(f), std::forward<Ts>(ts)...);
391 }
392
393 template <typename F, typename... Ts>
394 inline decltype(auto) async_execute(F &&f, Ts &&... ts) {
395 return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
396 }
397
398 inline decltype(auto) get_future() {
399 return interface.get_future();
400 }
401
402 // allow implict conversion
403 operator Interface &() { // NOLINT
404 return interface;
405 }
406
407private:
408 std::tuple<Interface &, size_t> t;
409 size_t interface_index;
410 size_t gpu_id;
411
412public:
413 Interface &interface;
414};
415#endif
416
417} // namespace detail
418} // namespace executor_recycling
419} // namespace cppuddle
420
421#endif
Definition executor_pools_management.hpp:374
decltype(auto) async_execute(F &&f, Ts &&... ts)
Definition executor_pools_management.hpp:394
executor_interface(size_t gpu_id)
Definition executor_pools_management.hpp:376
decltype(auto) get_future()
Definition executor_pools_management.hpp:398
Interface & interface
Definition executor_pools_management.hpp:413
executor_interface & operator=(const executor_interface &other)=delete
executor_interface & operator=(executor_interface &&other)=delete
executor_interface(executor_interface &&other)=delete
decltype(auto) post(F &&f, Ts &&... ts)
Definition executor_pools_management.hpp:389
~executor_interface()
Definition executor_pools_management.hpp:384
executor_interface(const executor_interface &other)=delete
Access/Concurrency Control for executor pool implementation.
Definition executor_pools_management.hpp:138
static void set_device_selector(std::function< void(size_t)> select_gpu_function)
Definition executor_pools_management.hpp:184
executor_pool(executor_pool const &other)=delete
static void init(size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:141
static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept
Definition executor_pools_management.hpp:168
static size_t get_next_device_id(const size_t number_gpus) noexcept
Definition executor_pools_management.hpp:178
static void select_device(size_t gpu_id)
Definition executor_pools_management.hpp:189
static void init_executor_pool(size_t pool_id, size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:151
static void cleanup()
Definition executor_pools_management.hpp:155
static size_t get_current_load(const size_t gpu_id=0) noexcept
Definition executor_pools_management.hpp:173
executor_pool & operator=(executor_pool const &other)=delete
static std::tuple< Interface &, size_t > get_interface(const size_t gpu_id)
Definition executor_pools_management.hpp:159
static void release_interface(size_t index, const size_t gpu_id) noexcept
Definition executor_pools_management.hpp:163
executor_pool & operator=(executor_pool &&other)=delete
static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:146
Definition executor_pools_management.hpp:93
priority_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition executor_pools_management.hpp:100
std::tuple< Interface &, size_t > get_interface()
Definition executor_pools_management.hpp:110
size_t get_current_load()
Definition executor_pools_management.hpp:130
bool interface_available(size_t load_limit)
Definition executor_pools_management.hpp:127
void release_interface(size_t index)
Definition executor_pools_management.hpp:120
Definition executor_pools_management.hpp:54
void release_interface(size_t index)
Definition executor_pools_management.hpp:78
std::tuple< Interface &, size_t > get_interface()
Definition executor_pools_management.hpp:70
size_t get_current_load()
Definition executor_pools_management.hpp:83
bool interface_available(size_t load_limit)
Definition executor_pools_management.hpp:79
round_robin_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition executor_pools_management.hpp:62
auto make_scoped_lock_from_array(mutex_array_t &mutexes)
Turns a std::array_mutex into an scoped lock.
Definition executor_pools_management.hpp:48
Definition config.hpp:31
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition config.hpp:59
constexpr size_t max_number_gpus
Definition config.hpp:52
std::mutex mutex_t
Definition config.hpp:36
void cleanup()
Deletes all buffers currently marked as unused.
Definition buffer_manager.hpp:48