CPPuddle
aggregation_executor_pools.hpp
Go to the documentation of this file.
1 // Copyright (c) 2022-2024 Gregor Daiß
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
7 
8 #ifndef AGGREGATION_EXECUTOR_POOL_HPP
9 #define AGGREGATION_EXECUTOR_POOL_HPP
10 
11 namespace cppuddle {
12 namespace kernel_aggregation {
13 namespace detail {
14 
15 template <const char *kernelname, class Interface, class Pool>
17 public:
19  template <typename... Ts>
20  static void init(size_t number_of_executors, size_t slices_per_executor,
21  aggregated_executor_modes mode, size_t num_devices = 1) {
22  if (is_initialized) {
23  throw std::runtime_error(
24  std::string("Trying to initialize cppuddle aggregation pool twice") +
25  " Agg pool name: " + std::string(kernelname));
26  }
27  if (num_devices > cppuddle::max_number_gpus) {
28  throw std::runtime_error(
29  std::string(
30  "Trying to initialize aggregation with more devices than the "
31  "maximum number of GPUs given at compiletime") +
32  " Agg pool name: " + std::string(kernelname));
33  }
34  number_devices = num_devices;
35  for (size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) {
36 
37  std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
38  assert(instance()[gpu_id].aggregation_executor_pool.empty());
39  for (int i = 0; i < number_of_executors; i++) {
40  instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor,
41  mode, gpu_id);
42  }
43  instance()[gpu_id].slices_per_executor = slices_per_executor;
44  instance()[gpu_id].mode = mode;
45  }
46  is_initialized = true;
47  }
48 
50  static decltype(auto) request_executor_slice(void) {
51  if (!is_initialized) {
52  throw std::runtime_error(
53  std::string("ERROR: Trying to use cppuddle aggregation pool without first calling init!\n") +
54  " Agg poolname: " + std::string(kernelname));
55  }
56  const size_t gpu_id = cppuddle::get_device_id(number_devices);
57  /* const size_t gpu_id = 1; */
58  std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
59  assert(!instance()[gpu_id].aggregation_executor_pool.empty());
60  std::optional<hpx::lcos::future<
62  ret;
63  size_t local_id = (instance()[gpu_id].current_interface) %
64  instance()[gpu_id].aggregation_executor_pool.size();
65  ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
66  // Expected case: current aggregation executor is free
67  if (ret.has_value()) {
68  return ret;
69  }
70  // current interface is bad -> find free one
71  size_t abort_counter = 0;
72  const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1;
73  do {
74  local_id = (++(instance()[gpu_id].current_interface)) % // increment interface
75  instance()[gpu_id].aggregation_executor_pool.size();
76  ret =
77  instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
78  if (ret.has_value()) {
79  return ret;
80  }
81  abort_counter++;
82  } while (abort_counter <= abort_number);
83  // Everything's busy -> create new aggregation executor (growing pool) OR
84  // return empty optional
85  if (instance()[gpu_id].growing_pool) {
86  instance()[gpu_id].aggregation_executor_pool.emplace_back(
87  instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id);
88  instance()[gpu_id].current_interface =
89  instance()[gpu_id].aggregation_executor_pool.size() - 1;
90  assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480);
91  ret = instance()[gpu_id]
92  .aggregation_executor_pool[instance()[gpu_id].current_interface]
93  .request_executor_slice();
94  assert(ret.has_value()); // fresh executor -- should always have slices
95  // available
96  }
97  return ret;
98  }
99 
100 private:
101  std::deque<aggregated_executor<Interface>> aggregation_executor_pool;
102  std::atomic<size_t> current_interface{0};
103  size_t slices_per_executor;
105  bool growing_pool{true};
106 
107 private:
110  aggregation_mutex_t pool_mutex;
112  static std::unique_ptr<aggregation_pool[]>& instance(void) {
113  static std::unique_ptr<aggregation_pool[]> pool_instances{
115  return pool_instances;
116  }
117  static inline size_t number_devices = 1;
118  static inline bool is_initialized = false;
119  aggregation_pool() = default;
120 
121 public:
122  ~aggregation_pool() = default;
123  // Bunch of constructors we don't need
124  aggregation_pool(aggregation_pool const &other) = delete;
125  aggregation_pool &operator=(aggregation_pool const &other) = delete;
128 };
129 
130 template <typename aggregation_region_t>
132  const size_t max_slices) {
133  constexpr size_t number_aggregation_executors = 128;
134  constexpr size_t number_gpus = cppuddle::max_number_gpus;
136  if (max_slices == 1) {
137  executor_mode = aggregated_executor_modes::STRICT;
138  }
139  aggregation_region_t::init(
140  number_aggregation_executors, max_slices, executor_mode, number_gpus);
141 }
142 
143 
144 } // namespace detail
145 } // namespace kernel_aggregation
146 } // namespace cppuddle
147 
148 #endif
Slice class - meant as a scope interface to the aggregated executor.
Definition: aggregation_executors_and_allocators.hpp:420
Definition: aggregation_executor_pools.hpp:16
aggregation_pool & operator=(aggregation_pool const &other)=delete
aggregation_pool(aggregation_pool &&other)=delete
aggregation_pool & operator=(aggregation_pool &&other)=delete
static decltype(auto) request_executor_slice(void)
Will always return a valid executor slice.
Definition: aggregation_executor_pools.hpp:50
aggregation_pool(aggregation_pool const &other)=delete
static void init(size_t number_of_executors, size_t slices_per_executor, aggregated_executor_modes mode, size_t num_devices=1)
interface
Definition: aggregation_executor_pools.hpp:20
void init_area_aggregation_pool(const size_t max_slices)
Definition: aggregation_executor_pools.hpp:131
hpx::mutex aggregation_mutex_t
Definition: aggregation_executors_and_allocators.hpp:70
aggregated_executor_modes
Definition: aggregation_executors_and_allocators.hpp:383
Definition: config.hpp:31
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition: config.hpp:59
constexpr size_t max_number_gpus
Definition: config.hpp:52