6#ifndef EXECUTOR_POOLS_MANAGEMENT_HPP
7#define EXECUTOR_POOLS_MANAGEMENT_HPP
24#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
25#include <hpx/config.hpp>
26#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
27#include <hpx/async_cuda/cuda_executor.hpp>
33#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
35#ifndef KOKKOS_ENABLE_SERIAL
36namespace hpx {
namespace kokkos {
37enum class execution_space_mode { global, independent };
43namespace executor_recycling {
47template<
typename mutex_array_t>
50 return std::apply([](
auto&... mutexes) {
return std::scoped_lock{mutexes...}; },
56 std::deque<Interface> pool{};
57 std::vector<size_t> ref_counters{};
58 size_t current_interface{0};
61 template <
typename... Ts>
63 ref_counters.reserve(number_of_executors);
64 for (
int i = 0; i < number_of_executors; i++) {
65 pool.emplace_back(executor_args...);
66 ref_counters.emplace_back(0);
71 assert(!(pool.empty()));
72 size_t last_interface = current_interface;
73 current_interface = (current_interface + 1) % pool.size();
74 ref_counters[last_interface]++;
75 std::tuple<Interface &, size_t> ret(pool[last_interface], last_interface);
80 return *(std::min_element(std::begin(ref_counters),
81 std::end(ref_counters))) < load_limit;
85 std::min_element(std::begin(ref_counters), std::end(ref_counters)));
95 std::deque<Interface> pool{};
96 std::vector<size_t> ref_counters{};
97 std::vector<size_t> priorities{};
99 template <
typename... Ts>
101 ref_counters.reserve(number_of_executors);
102 priorities.reserve(number_of_executors);
103 for (
auto i = 0; i < number_of_executors; i++) {
104 pool.emplace_back(executor_args...);
105 ref_counters.emplace_back(0);
106 priorities.emplace_back(i);
111 auto &
interface = pool[priorities[0]];
112 ref_counters[priorities[0]]++;
113 std::tuple<Interface &, size_t> ret(interface, priorities[0]);
114 std::make_heap(std::begin(priorities), std::end(priorities),
115 [
this](
const size_t &first,
const size_t &second) ->
bool {
116 return ref_counters[first] > ref_counters[second];
121 ref_counters[index]--;
122 std::make_heap(std::begin(priorities), std::end(priorities),
123 [
this](
const size_t &first,
const size_t &second) ->
bool {
124 return ref_counters[first] > ref_counters[second];
128 return ref_counters[priorities[0]] < load_limit;
140 template <
typename Interface,
typename Pool,
typename... Ts>
141 static void init(
size_t number_of_executors, Ts ... executor_args) {
142 executor_pool_implementation<Interface, Pool>::init(number_of_executors,
145 template <
typename Interface,
typename Pool,
typename... Ts>
147 executor_pool_implementation<Interface, Pool>::init_all_executor_pools(number_of_executors,
150 template <
typename Interface,
typename Pool,
typename... Ts>
152 executor_pool_implementation<Interface, Pool>::init_executor_pool(pool_id, number_of_executors,
155 template <
typename Interface,
typename Pool>
static void cleanup() {
156 executor_pool_implementation<Interface, Pool>::cleanup();
158 template <
typename Interface,
typename Pool>
160 return executor_pool_implementation<Interface, Pool>::get_interface(gpu_id);
162 template <
typename Interface,
typename Pool>
164 executor_pool_implementation<Interface, Pool>::release_interface(index,
167 template <
typename Interface,
typename Pool>
169 return executor_pool_implementation<Interface, Pool>::interface_available(
172 template <
typename Interface,
typename Pool>
174 return executor_pool_implementation<Interface, Pool>::get_current_load(
177 template <
typename Interface,
typename Pool>
183 template <
typename Interface,
typename Pool>
185 executor_pool_implementation<Interface, Pool>::set_device_selector(select_gpu_function);
188 template <
typename Interface,
typename Pool>
190 executor_pool_implementation<Interface, Pool>::select_device(gpu_id);
197 template <
typename Interface,
typename Pool>
class executor_pool_implementation {
200 template <
typename... Ts>
201 static void init(
size_t number_of_executors, Ts ... executor_args) {
205 instance().executorpools.emplace_back(number_of_executors, executor_args...);
210 template <
typename... Ts>
211 static void init_all_executor_pools(
size_t number_of_executors, Ts ... executor_args) {
213 if (number_of_executors > 0) {
215 instance().select_gpu_function(gpu_id);
216 instance().executorpools.emplace_back(number_of_executors,
225 template <
typename... Ts>
226 static void init_executor_pool(
size_t gpu_id,
size_t number_of_executors, Ts ... executor_args) {
228 if (number_of_executors > 0) {
229 instance().select_gpu_function(gpu_id);
230 instance().executorpools.emplace_back(number_of_executors,
240 instance().executorpools.clear();
243 static std::tuple<Interface &, size_t> get_interface(
const size_t gpu_id = 0) {
244 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
245 assert(gpu_id < instance().executorpools.size());
246 return instance().executorpools[gpu_id].get_interface();
248 static void release_interface(
size_t index,
const size_t gpu_id = 0) {
249 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
250 assert(gpu_id < instance().executorpools.size());
251 instance().executorpools[gpu_id].release_interface(index);
253 static bool interface_available(
size_t load_limit,
const size_t gpu_id = 0) {
254 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
255 assert(gpu_id < instance().executorpools.size());
256 return instance().executorpools[gpu_id].interface_available(load_limit);
258 static size_t get_current_load(
const size_t gpu_id = 0) {
259 std::lock_guard<cppuddle::mutex_t> guard(instance().gpu_mutexes[gpu_id]);
260 assert(gpu_id < instance().executorpools.size());
261 return instance().executorpools[gpu_id].get_current_load();
270 static void set_device_selector(std::function<
void(
size_t)> select_gpu_function) {
272 instance().select_gpu_function = select_gpu_function;
275 static void select_device(
size_t gpu_id) {
276 instance().select_gpu_function(gpu_id);
280 executor_pool_implementation() =
default;
282 std::function<void(
size_t)> select_gpu_function = [](
size_t gpu_id) {
288 std::deque<Pool> executorpools{};
289 std::array<cppuddle::mutex_t, cppuddle::max_number_gpus> gpu_mutexes;
291 static executor_pool_implementation& instance(
void) {
292 static executor_pool_implementation pool_instance{};
293 return pool_instance;
297 ~executor_pool_implementation() =
default;
299 executor_pool_implementation(executor_pool_implementation
const &other) =
301 executor_pool_implementation &
302 operator=(executor_pool_implementation
const &other) =
delete;
303 executor_pool_implementation(executor_pool_implementation &&other) =
delete;
304 executor_pool_implementation &
305 operator=(executor_pool_implementation &&other) =
delete;
317#if defined(CPPUDDLE_DEACTIVATE_EXECUTOR_RECYCLING)
321"Warning: Building without executor recycling! Use only for performance testing! \
322For better performance configure CPPuddle with CPPUDDLE_WITH_EXECUTOR_RECYCLING=ON!"
330 template <
typename Dummy = Interface>
332 std::enable_if_t<std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value,
size_t> = 0)
334 template <
typename Dummy = Interface>
335 explicit executor_interface(std::enable_if_t<!std::is_same<hpx::cuda::experimental::cuda_executor, Dummy>::value,
size_t> = 0)
336 : gpu_id(gpu_id),
interface(hpx::kokkos::execution_space_mode::independent) {}
338 executor_interface(
const executor_interface &other) =
delete;
339 executor_interface &
operator=(
const executor_interface &other) =
delete;
340 executor_interface(executor_interface &&other) =
delete;
341 executor_interface &
operator=(executor_interface &&other) =
delete;
345 template <
typename F,
typename... Ts>
346 inline decltype(
auto)
post(F &&f, Ts &&... ts) {
347 return interface.
post(std::forward<F>(f), std::forward<Ts>(ts)...);
350 template <
typename F,
typename... Ts>
352 return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
360 operator Interface &() {
378 interface(std::get<0>(t)), interface_index(std::get<1>(t)), gpu_id(gpu_id) {}
385 executor_pool::release_interface<Interface, Pool>(interface_index, gpu_id);
388 template <
typename F,
typename... Ts>
389 inline decltype(
auto)
post(F &&f, Ts &&... ts) {
390 return interface.post(std::forward<F>(f), std::forward<Ts>(ts)...);
393 template <
typename F,
typename... Ts>
395 return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
403 operator Interface &() {
408 std::tuple<Interface &, size_t> t;
409 size_t interface_index;
Definition executor_pools_management.hpp:374
decltype(auto) async_execute(F &&f, Ts &&... ts)
Definition executor_pools_management.hpp:394
executor_interface(size_t gpu_id)
Definition executor_pools_management.hpp:376
decltype(auto) get_future()
Definition executor_pools_management.hpp:398
Interface & interface
Definition executor_pools_management.hpp:413
executor_interface & operator=(const executor_interface &other)=delete
executor_interface & operator=(executor_interface &&other)=delete
executor_interface(executor_interface &&other)=delete
decltype(auto) post(F &&f, Ts &&... ts)
Definition executor_pools_management.hpp:389
~executor_interface()
Definition executor_pools_management.hpp:384
executor_interface(const executor_interface &other)=delete
Access/Concurrency Control for executor pool implementation.
Definition executor_pools_management.hpp:138
static void set_device_selector(std::function< void(size_t)> select_gpu_function)
Definition executor_pools_management.hpp:184
executor_pool(executor_pool const &other)=delete
static void init(size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:141
static bool interface_available(size_t load_limit, const size_t gpu_id) noexcept
Definition executor_pools_management.hpp:168
static size_t get_next_device_id(const size_t number_gpus) noexcept
Definition executor_pools_management.hpp:178
static void select_device(size_t gpu_id)
Definition executor_pools_management.hpp:189
static void init_executor_pool(size_t pool_id, size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:151
static void cleanup()
Definition executor_pools_management.hpp:155
executor_pool(executor_pool &&other)=delete
static size_t get_current_load(const size_t gpu_id=0) noexcept
Definition executor_pools_management.hpp:173
executor_pool & operator=(executor_pool const &other)=delete
static std::tuple< Interface &, size_t > get_interface(const size_t gpu_id)
Definition executor_pools_management.hpp:159
static void release_interface(size_t index, const size_t gpu_id) noexcept
Definition executor_pools_management.hpp:163
executor_pool & operator=(executor_pool &&other)=delete
static void init_all_executor_pools(size_t number_of_executors, Ts ... executor_args)
Definition executor_pools_management.hpp:146
Definition executor_pools_management.hpp:93
priority_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition executor_pools_management.hpp:100
std::tuple< Interface &, size_t > get_interface()
Definition executor_pools_management.hpp:110
size_t get_current_load()
Definition executor_pools_management.hpp:130
bool interface_available(size_t load_limit)
Definition executor_pools_management.hpp:127
void release_interface(size_t index)
Definition executor_pools_management.hpp:120
Definition executor_pools_management.hpp:54
void release_interface(size_t index)
Definition executor_pools_management.hpp:78
std::tuple< Interface &, size_t > get_interface()
Definition executor_pools_management.hpp:70
size_t get_current_load()
Definition executor_pools_management.hpp:83
bool interface_available(size_t load_limit)
Definition executor_pools_management.hpp:79
round_robin_pool_impl(size_t number_of_executors, Ts... executor_args)
Definition executor_pools_management.hpp:62
auto make_scoped_lock_from_array(mutex_array_t &mutexes)
Turns a std::array_mutex into an scoped lock.
Definition executor_pools_management.hpp:48
size_t get_device_id(const size_t number_gpus)
Uses HPX thread information to determine which GPU should be used.
Definition config.hpp:59
constexpr size_t max_number_gpus
Definition config.hpp:52
std::mutex mutex_t
Definition config.hpp:36
void cleanup()
Deletes all buffers currently marked as unused.
Definition buffer_manager.hpp:48