8 #ifndef AGGREGATION_EXECUTOR_POOL_HPP
9 #define AGGREGATION_EXECUTOR_POOL_HPP
12 namespace kernel_aggregation {
15 template <const
char *kernelname,
class Interface,
class Pool>
19 template <
typename... Ts>
20 static void init(
size_t number_of_executors,
size_t slices_per_executor,
23 throw std::runtime_error(
24 std::string(
"Trying to initialize cppuddle aggregation pool twice") +
25 " Agg pool name: " + std::string(kernelname));
28 throw std::runtime_error(
30 "Trying to initialize aggregation with more devices than the "
31 "maximum number of GPUs given at compiletime") +
32 " Agg pool name: " + std::string(kernelname));
34 number_devices = num_devices;
35 for (
size_t gpu_id = 0; gpu_id < number_devices; gpu_id++) {
37 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
38 assert(instance()[gpu_id].aggregation_executor_pool.empty());
39 for (
int i = 0; i < number_of_executors; i++) {
40 instance()[gpu_id].aggregation_executor_pool.emplace_back(slices_per_executor,
43 instance()[gpu_id].slices_per_executor = slices_per_executor;
44 instance()[gpu_id].mode = mode;
46 is_initialized =
true;
51 if (!is_initialized) {
52 throw std::runtime_error(
53 std::string(
"ERROR: Trying to use cppuddle aggregation pool without first calling init!\n") +
54 " Agg poolname: " + std::string(kernelname));
58 std::lock_guard<aggregation_mutex_t> guard(instance()[gpu_id].pool_mutex);
59 assert(!instance()[gpu_id].aggregation_executor_pool.empty());
60 std::optional<hpx::lcos::future<
63 size_t local_id = (instance()[gpu_id].current_interface) %
64 instance()[gpu_id].aggregation_executor_pool.size();
65 ret = instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
67 if (ret.has_value()) {
71 size_t abort_counter = 0;
72 const size_t abort_number = instance()[gpu_id].aggregation_executor_pool.size() + 1;
74 local_id = (++(instance()[gpu_id].current_interface)) %
75 instance()[gpu_id].aggregation_executor_pool.size();
77 instance()[gpu_id].aggregation_executor_pool[local_id].request_executor_slice();
78 if (ret.has_value()) {
82 }
while (abort_counter <= abort_number);
85 if (instance()[gpu_id].growing_pool) {
86 instance()[gpu_id].aggregation_executor_pool.emplace_back(
87 instance()[gpu_id].slices_per_executor, instance()[gpu_id].mode, gpu_id);
88 instance()[gpu_id].current_interface =
89 instance()[gpu_id].aggregation_executor_pool.size() - 1;
90 assert(instance()[gpu_id].aggregation_executor_pool.size() < 20480);
91 ret = instance()[gpu_id]
92 .aggregation_executor_pool[instance()[gpu_id].current_interface]
93 .request_executor_slice();
94 assert(ret.has_value());
101 std::deque<aggregated_executor<Interface>> aggregation_executor_pool;
102 std::atomic<size_t> current_interface{0};
103 size_t slices_per_executor;
105 bool growing_pool{
true};
112 static std::unique_ptr<aggregation_pool[]>& instance(
void) {
113 static std::unique_ptr<aggregation_pool[]> pool_instances{
115 return pool_instances;
117 static inline size_t number_devices = 1;
118 static inline bool is_initialized =
false;
130 template <
typename aggregation_region_t>
132 const size_t max_slices) {
133 constexpr
size_t number_aggregation_executors = 128;
136 if (max_slices == 1) {
139 aggregation_region_t::init(
140 number_aggregation_executors, max_slices, executor_mode, number_gpus);
Slice class - meant as a scope interface to the aggregated executor.
Definition: aggregation_executors_and_allocators.hpp:420
Definition: aggregation_executor_pools.hpp:16
aggregation_pool & operator=(aggregation_pool const &other)=delete
aggregation_pool(aggregation_pool &&other)=delete
aggregation_pool & operator=(aggregation_pool &&other)=delete
~aggregation_pool()=default
static decltype(auto) request_executor_slice(void)
Will always return a valid executor slice.
Definition: aggregation_executor_pools.hpp:50
aggregation_pool(aggregation_pool const &other)=delete
static void init(size_t number_of_executors, size_t slices_per_executor, aggregated_executor_modes mode, size_t num_devices=1)
interface
Definition: aggregation_executor_pools.hpp:20
void init_area_aggregation_pool(const size_t max_slices)
Definition: aggregation_executor_pools.hpp:131
hpx::mutex aggregation_mutex_t
Definition: aggregation_executors_and_allocators.hpp:70
aggregated_executor_modes
Definition: aggregation_executors_and_allocators.hpp:383
Definition: config.hpp:31
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition: config.hpp:59
constexpr size_t max_number_gpus
Definition: config.hpp:52