CPPuddle
Loading...
Searching...
No Matches
aggregation_executor_pools.hpp
Go to the documentation of this file.
1// Copyright (c) 2022-2024 Gregor Daiß
2//
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
7
8#ifndef AGGREGATION_EXECUTOR_POOL_HPP
9#define AGGREGATION_EXECUTOR_POOL_HPP
10
11namespace cppuddle {
12namespace kernel_aggregation {
13namespace detail {
14
15template <const char *kernelname, class Interface, class Pool>
17public:
19 template <typename... Ts>
20 static void init(size_t number_of_executors, size_t slices_per_executor,
21 aggregated_executor_modes mode, size_t num_devices = 1) {
22 if (is_initialized) {
23 throw std::runtime_error(
24 std::string("Trying to initialize cppuddle aggregation pool twice") +
25 " Agg pool name: " + std::string(kernelname));
26 }
27 if (num_devices > cppuddle::max_number_gpus) {
28 throw std::runtime_error(
29 std::string(
30 "Trying to initialize aggregation with more devices than the "
31 "maximum number of GPUs given at compiletime") +
32 " Agg pool name: " + std::string(kernelname));
33 }
34 number_devices = num_devices;
35 for (size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) {
36
37 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
38 assert(instance()[gpu_id].aggregation_executor_pool.empty());
39 for (int i = 0; i < number_of_executors; i++) {
40 instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor,
41 mode, gpu_id);
42 }
43 instance()[gpu_id].slices_per_executor = slices_per_executor;
44 instance()[gpu_id].mode = mode;
45 }
46 is_initialized = true;
47 }
48
50 static decltype(auto) request_executor_slice(void) {
51 if (!is_initialized) {
52 throw std::runtime_error(
53 std::string("ERROR: Trying to use cppuddle aggregation pool without first calling init!\n") +
54 " Agg poolname: " + std::string(kernelname));
55 }
56 const size_t gpu_id = cppuddle::get_device_id(number_devices);
57 /* const size_t gpu_id = 1; */
58 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
59 assert(!instance()[gpu_id].aggregation_executor_pool.empty());
60 std::optional<hpx::lcos::future<
62 ret;
63 size_t local_id = (instance()[gpu_id].current_interface) %
64 instance()[gpu_id].aggregation_executor_pool.size();
65 ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
66 // Expected case: current aggregation executor is free
67 if (ret.has_value()) {
68 return ret;
69 }
70 // current interface is bad -> find free one
71 size_t abort_counter = 0;
72 const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1;
73 do {
74 local_id = (++(instance()[gpu_id].current_interface)) % // increment interface
75 instance()[gpu_id].aggregation_executor_pool.size();
76 ret =
77 instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
78 if (ret.has_value()) {
79 return ret;
80 }
82 } while (abort_counter <= abort_number);
83 // Everything's busy -> create new aggregation executor (growing pool) OR
84 // return empty optional
85 if (instance()[gpu_id].growing_pool) {
86 instance()[gpu_id].aggregation_executor_pool.emplace_back(
87 instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id);
88 instance()[gpu_id].current_interface =
89 instance()[gpu_id].aggregation_executor_pool.size() - 1;
90 assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480);
91 ret = instance()[gpu_id]
92 .aggregation_executor_pool[instance()[gpu_id].current_interface]
93 .request_executor_slice();
94 assert(ret.has_value()); // fresh executor -- should always have slices
95 // available
96 }
97 return ret;
98 }
99
100private:
101 std::deque<aggregated_executor<Interface>> aggregation_executor_pool;
102 std::atomic<size_t> current_interface{0};
103 size_t slices_per_executor;
105 bool growing_pool{true};
106
107private:
110 aggregation_mutex_t pool_mutex;
112 static std::unique_ptr<aggregation_pool[]>& instance(void) {
113 static std::unique_ptr<aggregation_pool[]> pool_instances{
115 return pool_instances;
116 }
117 static inline size_t number_devices = 1;
118 static inline bool is_initialized = false;
119 aggregation_pool() = default;
120
121public:
122 ~aggregation_pool() = default;
123 // Bunch of constructors we don't need
128};
129
130template <typename aggregation_region_t>
132 const size_t max_slices) {
133 constexpr size_t number_aggregation_executors = 128;
134 constexpr size_t number_gpus = cppuddle::max_number_gpus;
135 aggregated_executor_modes executor_mode = aggregated_executor_modes::EAGER;
136 if (max_slices == 1) {
137 executor_mode = aggregated_executor_modes::STRICT;
138 }
139 aggregation_region_t::init(
141}
142
143
144} // namespace detail
145} // namespace kernel_aggregation
146} // namespace cppuddle
147
148#endif
Slice class - meant as a scope interface to the aggregated executor.
Definition aggregation_executors_and_allocators.hpp:420
Definition aggregation_executor_pools.hpp:16
aggregation_pool(aggregation_pool &&other)=delete
aggregation_pool & operator=(aggregation_pool const &other)=delete
aggregation_pool & operator=(aggregation_pool &&other)=delete
static decltype(auto) request_executor_slice(void)
Will always return a valid executor slice.
Definition aggregation_executor_pools.hpp:50
aggregation_pool(aggregation_pool const &other)=delete
static void init(size_t number_of_executors, size_t slices_per_executor, aggregated_executor_modes mode, size_t num_devices=1)
interface
Definition aggregation_executor_pools.hpp:20
void init_area_aggregation_pool(const size_t max_slices)
Definition aggregation_executor_pools.hpp:131
hpx::mutex aggregation_mutex_t
Definition aggregation_executors_and_allocators.hpp:70
aggregated_executor_modes
Definition aggregation_executors_and_allocators.hpp:383
Definition config.hpp:31
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition config.hpp:59
constexpr size_t max_number_gpus
Definition config.hpp:52