CPPuddle
executor_pools_management.hpp
Go to the documentation of this file.
1 // Copyright (c) 2020-2024 Gregor Daiß
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #ifndef EXECUTOR_POOLS_MANAGEMENT_HPP
7 #define EXECUTOR_POOLS_MANAGEMENT_HPP
8 
9 #include <algorithm>
10 #include <array>
11 #include <cassert>
12 #include <deque>
13 #include <iostream>
14 #include <memory>
15 #include <mutex>
16 #include <queue>
17 #include <tuple>
18 #include <type_traits>
19 
21 
22 // Need to cuda/hip definitions for default params when NOT
23 // drawing from an executor pool
24 #if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
25 #include <hpx/config.hpp>
26 #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
27 #include <hpx/async_cuda/cuda_executor.hpp>
28 #endif
29 #endif
30 
31 // Redefintion required for non-recycling executors
32 // Without it, default constructing the executors (independent) would not work
33 #if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
34 // Do only define if Kokkos is not found
35 #ifndef KOKKOS_ENABLE_SERIAL
36 namespace hpx { namespace kokkos {
37 enum class execution_space_mode { global, independent };
38 }}
39 #endif
40 #endif
41 
42 namespace cppuddle {
43 namespace executor_recycling {
44 namespace detail {
45 
47 template<typename mutex_array_t>
48 auto make_scoped_lock_from_array(mutex_array_t& mutexes)
49 {
50  return std::apply([](auto&... mutexes) { return std::scoped_lock{mutexes...}; },
51  mutexes);
52 }
53 
54 template <typename Interface> class round_robin_pool_impl {
55 private:
56  std::deque<Interface> pool{};
57  std::vector<size_t> ref_counters{};
58  size_t current_interface{0};
59 
60 public:
61  template <typename... Ts>
62  round_robin_pool_impl(size_t number_of_executors, Ts... executor_args) {
63  ref_counters.reserve(number_of_executors);
64  for (int i = 0; i < number_of_executors; i++) {
65  pool.emplace_back(executor_args...);
66  ref_counters.emplace_back(0);
67  }
68  }
69  // return a tuple with the interface and its index (to release it later)
70  std::tuple<Interface &, size_t> get_interface() {
71  assert(!(pool.empty()));
72  size_t last_interface = current_interface;
73  current_interface = (current_interface + 1) % pool.size();
74  ref_counters[last_interface]++;
75  std::tuple<Interface &, size_t> ret(pool[last_interface], last_interface);
76  return ret;
77  }
78  void release_interface(size_t index) { ref_counters[index]--; }
79  bool interface_available(size_t load_limit) {
80  return *(std::min_element(std::begin(ref_counters),
81  std::end(ref_counters))) < load_limit;
82  }
83  size_t get_current_load() {
84  return *(
85  std::min_element(std::begin(ref_counters), std::end(ref_counters)));
86  }
87  // TODO Remove
88  /* size_t get_next_device_id() { */
89  /* return 0; // single gpu pool */
90  /* } */
91 };
92 
93 template <typename Interface> class priority_pool_impl {
94 private:
95  std::deque<Interface> pool{};
96  std::vector<size_t> ref_counters{}; // Ref counters
97  std::vector<size_t> priorities{}; // Ref counters
98 public:
99  template <typename... Ts>
100  priority_pool_impl(size_t number_of_executors, Ts... executor_args) {
101  ref_counters.reserve(number_of_executors);
102  priorities.reserve(number_of_executors);
103  for (auto i = 0; i < number_of_executors; i++) {
104  pool.emplace_back(executor_args...);
105  ref_counters.emplace_back(0);
106  priorities.emplace_back(i);
107  }
108  }
109  // return a tuple with the interface and its index (to release it later)
110  std::tuple<Interface &, size_t> get_interface() {
111  auto &interface = pool[priorities[0]];
112  ref_counters[priorities[0]]++;
113  std::tuple<Interface &, size_t> ret(interface, priorities[0]);
114  std::make_heap(std::begin(priorities), std::end(priorities),
115  [this](const size_t &first, const size_t &second) -> bool {
116  return ref_counters[first] > ref_counters[second];
117  });
118  return ret;
119  }
120  void release_interface(size_t index) {
121  ref_counters[index]--;
122  std::make_heap(std::begin(priorities), std::end(priorities),
123  [this](const size_t &first, const size_t &second) -> bool {
124  return ref_counters[first] > ref_counters[second];
125  });
126  }
127  bool interface_available(size_t load_limit) {
128  return ref_counters[priorities[0]] < load_limit;
129  }
130  size_t get_current_load() { return ref_counters[priorities[0]]; }
131  // TODO remove
132  /* size_t get_next_device_id() { */
133  /* return 0; // single gpu pool */
134  /* } */
135 };
136 
139 public:
140  template <typename Interface, typename Pool, typename... Ts>
141  static void init(size_t number_of_executors, Ts ... executor_args) {
142  executor_pool_implementation<Interface, Pool>::init(number_of_executors,
143  executor_args...);
144  }
145  template <typename Interface, typename Pool, typename... Ts>
146  static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args) {
147  executor_pool_implementation<Interface, Pool>::init_all_executor_pools(number_of_executors,
148  executor_args...);
149  }
150  template <typename Interface, typename Pool, typename... Ts>
151  static void init_executor_pool(size_t pool_id, size_t number_of_executors, Ts ... executor_args) {
152  executor_pool_implementation<Interface, Pool>::init_executor_pool(pool_id, number_of_executors,
153  executor_args...);
154  }
155  template <typename Interface, typename Pool> static void cleanup() {
157  }
158  template <typename Interface, typename Pool>
159  static std::tuple<Interface &, size_t> get_interface(const size_t gpu_id) {
160  return executor_pool_implementation<Interface, Pool>::get_interface(gpu_id);
161  }
162  template <typename Interface, typename Pool>
163  static void release_interface(size_t index, const size_t gpu_id) noexcept {
164  executor_pool_implementation<Interface, Pool>::release_interface(index,
165  gpu_id);
166  }
167  template <typename Interface, typename Pool>
168  static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept {
169  return executor_pool_implementation<Interface, Pool>::interface_available(
170  load_limit, gpu_id);
171  }
172  template <typename Interface, typename Pool>
173  static size_t get_current_load(const size_t gpu_id = 0) noexcept {
174  return executor_pool_implementation<Interface, Pool>::get_current_load(
175  gpu_id);
176  }
177  template <typename Interface, typename Pool>
178  static size_t get_next_device_id(const size_t number_gpus) noexcept {
179  // TODO add round robin and min strategy
180  return cppuddle::get_device_id(number_gpus);
181  }
182 
183  template <typename Interface, typename Pool>
184  static void set_device_selector(std::function<void(size_t)> select_gpu_function) {
185  executor_pool_implementation<Interface, Pool>::set_device_selector(select_gpu_function);
186  }
187 
188  template <typename Interface, typename Pool>
189  static void select_device(size_t gpu_id) {
190  executor_pool_implementation<Interface, Pool>::select_device(gpu_id);
191  }
192 
193 private:
194  executor_pool() = default;
195 
196 private:
197  template <typename Interface, typename Pool> class executor_pool_implementation {
198  public:
200  template <typename... Ts>
201  static void init(size_t number_of_executors, Ts ... executor_args) {
202  /* static_assert(sizeof...(Ts) == sizeof...(Ts) && cppuddle::max_number_gpus == 1, */
203  /* "deprecated executor_pool::init does not support multigpu"); */
204  auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
205  instance().executorpools.emplace_back(number_of_executors, executor_args...);
206  assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
207  }
208 
210  template <typename... Ts>
211  static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args) {
212  auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
213  if (number_of_executors > 0) {
214  for (size_t gpu_id = 0; gpu_id < cppuddle::max_number_gpus; gpu_id++) {
215  instance().select_gpu_function(gpu_id);
216  instance().executorpools.emplace_back(number_of_executors,
217  executor_args...);
218  }
219  }
220  assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
221  }
222 
225  template <typename... Ts>
226  static void init_executor_pool(size_t gpu_id, size_t number_of_executors, Ts ... executor_args) {
227  auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
228  if (number_of_executors > 0) {
229  instance().select_gpu_function(gpu_id);
230  instance().executorpools.emplace_back(number_of_executors,
231  executor_args...);
232  }
233  assert(instance().executorpools.size() <= cppuddle::max_number_gpus);
234  }
235 
236  // TODO add/rename into finalize?
237  static void cleanup() {
238  auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
239  assert(instance().executorpools.size() == cppuddle::max_number_gpus);
240  instance().executorpools.clear();
241  }
242 
243  static std::tuple<Interface &, size_t> get_interface(const size_t gpu_id = 0) {
244  std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
245  assert(gpu_id < instance().executorpools.size());
246  return instance().executorpools[gpu_id].get_interface();
247  }
248  static void release_interface(size_t index, const size_t gpu_id = 0) {
249  std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
250  assert(gpu_id < instance().executorpools.size());
251  instance().executorpools[gpu_id].release_interface(index);
252  }
253  static bool interface_available(size_t load_limit, const size_t gpu_id = 0) {
254  std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
255  assert(gpu_id < instance().executorpools.size());
256  return instance().executorpools[gpu_id].interface_available(load_limit);
257  }
258  static size_t get_current_load(const size_t gpu_id = 0) {
259  std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
260  assert(gpu_id < instance().executorpools.size());
261  return instance().executorpools[gpu_id].get_current_load();
262  }
263  // TODO deprecated! Remove...
264  /* static size_t get_next_device_id(const size_t gpu_id = 0) { */
265  /* std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]); */
266  /* assert(instance().executorpools.size() == cppuddle::max_number_gpus); */
267  /* return instance().executorpools[gpu_id].get_next_device_id(); */
268  /* } */
269 
270  static void set_device_selector(std::function<void(size_t)> select_gpu_function) {
271  auto guard = make_scoped_lock_from_array(instance().gpu_mutexes);
272  instance().select_gpu_function = select_gpu_function;
273  }
274 
275  static void select_device(size_t gpu_id) {
276  instance().select_gpu_function(gpu_id);
277  }
278 
279  private:
280  executor_pool_implementation() = default;
281  cppuddle::mutex_t pool_mut{};
282  std::function<void(size_t)> select_gpu_function = [](size_t gpu_id) {
283  // By default no multi gpu support
284  assert(cppuddle::max_number_gpus == 1 || instance().executorpools.size() == 1);
285  assert(gpu_id == 0);
286  };
287 
288  std::deque<Pool> executorpools{};
289  std::array<cppuddle::mutex_t, cppuddle::max_number_gpus> gpu_mutexes;
290 
291  static executor_pool_implementation& instance(void) {
292  static executor_pool_implementation pool_instance{};
293  return pool_instance;
294  }
295 
296  public:
297  ~executor_pool_implementation() = default;
298  // Bunch of constructors we don't need
299  executor_pool_implementation(executor_pool_implementation const &other) =
300  delete;
301  executor_pool_implementation &
302  operator=(executor_pool_implementation const &other) = delete;
303  executor_pool_implementation(executor_pool_implementation &&other) = delete;
304  executor_pool_implementation &
305  operator=(executor_pool_implementation &&other) = delete;
306  };
307 
308 public:
309  ~executor_pool() = default;
310  // Bunch of constructors we don't need
311  executor_pool(executor_pool const &other) = delete;
312  executor_pool &operator=(executor_pool const &other) = delete;
313  executor_pool(executor_pool &&other) = delete;
315 };
316 
317 #if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
318 
319 // Warn about suboptimal performance without recycling
320 #pragma message \
321 "Warning: Building without executor recycling! Use only for performance testing! \
322 For better performance configure CPPuddle with CPPUDDLE_WITH_EXECUTOR_RECYCLING=ON!"
323 
327 template <typename Interface, typename Pool> class executor_interface {
328 public:
329 
330  template <typename Dummy = Interface>
331  explicit executor_interface(size_t gpu_id,
332  std::enable_if_t<std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value, size_t> = 0)
333  : gpu_id(gpu_id), interface(gpu_id) {}
334  template <typename Dummy = Interface>
335  explicit executor_interface(std::enable_if_t<!std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value, size_t> = 0)
336  : gpu_id(gpu_id), interface(hpx::kokkos::execution_space_mode::independent) {}
337 
338  executor_interface(const executor_interface &other) = delete;
339  executor_interface &operator=(const executor_interface &other) = delete;
340  executor_interface(executor_interface &&other) = delete;
343  }
344 
345  template <typename F, typename... Ts>
346  inline decltype(auto) post(F &&f, Ts &&... ts) {
347  return interface.post(std::forward<F>(f), std::forward<Ts>(ts)...);
348  }
349 
350  template <typename F, typename... Ts>
351  inline decltype(auto) async_execute(F &&f, Ts &&... ts) {
352  return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
353  }
354 
355  inline decltype(auto) get_future() {
356  return interface.get_future();
357  }
358 
359  // allow implict conversion
360  operator Interface &() { // NOLINT
361  return interface;
362  }
363 
364 private:
365  size_t gpu_id;
366 
367 public:
368  Interface interface;
369 };
370 #else
374 template <typename Interface, typename Pool> class executor_interface {
375 public:
376  explicit executor_interface(size_t gpu_id)
377  : t(executor_pool::get_interface<Interface, Pool>(gpu_id)),
378  interface(std::get<0>(t)), interface_index(std::get<1>(t)), gpu_id(gpu_id) {}
379 
380  executor_interface(const executor_interface &other) = delete;
385  executor_pool::release_interface<Interface, Pool>(interface_index, gpu_id);
386  }
387 
388  template <typename F, typename... Ts>
389  inline decltype(auto) post(F &&f, Ts &&... ts) {
390  return interface.post(std::forward<F>(f), std::forward<Ts>(ts)...);
391  }
392 
393  template <typename F, typename... Ts>
394  inline decltype(auto) async_execute(F &&f, Ts &&... ts) {
395  return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
396  }
397 
398  inline decltype(auto) get_future() {
399  return interface.get_future();
400  }
401 
402  // allow implict conversion
403  operator Interface &() { // NOLINT
404  return interface;
405  }
406 
407 private:
408  std::tuple<Interface &, size_t> t;
409  size_t interface_index;
410  size_t gpu_id;
411 
412 public:
413  Interface &interface;
414 };
415 #endif
416 
417 } // namespace detail
418 } // namespace executor_recycling
419 } // namespace cppuddle
420 
421 #endif
Definition: executor_pools_management.hpp:374
decltype(auto) async_execute(F &&f, Ts &&... ts)
Definition: executor_pools_management.hpp:394
executor_interface(size_t gpu_id)
Definition: executor_pools_management.hpp:376
executor_interface & operator=(executor_interface &&other)=delete
decltype(auto) get_future()
Definition: executor_pools_management.hpp:398
Interface & interface
Definition: executor_pools_management.hpp:413
executor_interface & operator=(const executor_interface &other)=delete
executor_interface(executor_interface &&other)=delete
decltype(auto) post(F &&f, Ts &&... ts)
Definition: executor_pools_management.hpp:389
~executor_interface()
Definition: executor_pools_management.hpp:384
executor_interface(const executor_interface &other)=delete
Access/Concurrency Control for executor pool implementation.
Definition: executor_pools_management.hpp:138
static void set_device_selector(std::function< void(size_t)> select_gpu_function)
Definition: executor_pools_management.hpp:184
executor_pool(executor_pool const &other)=delete
executor_pool & operator=(executor_pool const &other)=delete
static void init(size_t number_of_executors, Ts ... executor_args)
Definition: executor_pools_management.hpp:141
static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept
Definition: executor_pools_management.hpp:168
static size_t get_next_device_id(const size_t number_gpus) noexcept
Definition: executor_pools_management.hpp:178
static void select_device(size_t gpu_id)
Definition: executor_pools_management.hpp:189
static void init_executor_pool(size_t pool_id, size_t number_of_executors, Ts ... executor_args)
Definition: executor_pools_management.hpp:151
static void cleanup()
Definition: executor_pools_management.hpp:155
executor_pool & operator=(executor_pool &&other)=delete
static size_t get_current_load(const size_t gpu_id=0) noexcept
Definition: executor_pools_management.hpp:173
static std::tuple< Interface &, size_t > get_interface(const size_t gpu_id)
Definition: executor_pools_management.hpp:159
static void release_interface(size_t index, const size_t gpu_id) noexcept
Definition: executor_pools_management.hpp:163
static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args)
Definition: executor_pools_management.hpp:146
Definition: executor_pools_management.hpp:93
std::tuple< Interface &, size_t > get_interface()
Definition: executor_pools_management.hpp:110
priority_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition: executor_pools_management.hpp:100
size_t get_current_load()
Definition: executor_pools_management.hpp:130
bool interface_available(size_t load_limit)
Definition: executor_pools_management.hpp:127
void release_interface(size_t index)
Definition: executor_pools_management.hpp:120
Definition: executor_pools_management.hpp:54
std::tuple< Interface &, size_t > get_interface()
Definition: executor_pools_management.hpp:70
void release_interface(size_t index)
Definition: executor_pools_management.hpp:78
size_t get_current_load()
Definition: executor_pools_management.hpp:83
bool interface_available(size_t load_limit)
Definition: executor_pools_management.hpp:79
round_robin_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition: executor_pools_management.hpp:62
auto make_scoped_lock_from_array(mutex_array_t &mutexes)
Turns a std::array_mutex into an scoped lock.
Definition: executor_pools_management.hpp:48
Definition: config.hpp:31
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition: config.hpp:59
constexpr size_t max_number_gpus
Definition: config.hpp:52
std::mutex mutex_t
Definition: config.hpp:36
Definition: aggregation_executors_and_allocators.hpp:1042
void cleanup()
Deletes all buffers currently marked as unused.
Definition: buffer_manager.hpp:48