CPPuddle
aggregation_executors_and_allocators.hpp
Go to the documentation of this file.
1 // Copyright (c) 2022-2024 Gregor Daiß
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #ifndef AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
7 #define AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
8 
9 #ifndef CPPUDDLE_HAVE_HPX
10 #error "Work aggregation allocators/executors require CPPUDDLE_WITH_HPX=ON"
11 #endif
12 
13 #include <stdexcept>
14 // When defined, CPPuddle will run more checks
15 // about the order of aggregated method calls.
16 // Best defined before including this header when needed
17 // (hence commented out here)
18 //#define DEBUG_AGGREGATION_CALLS 1
19 
20 #include <stdio.h>
21 
22 #include <any>
23 #include <atomic>
24 #include <chrono>
25 #include <cstdio>
26 #include <iostream>
27 #include <memory>
28 #include <mutex>
29 #include <optional>
30 #include <ostream>
31 #include <string>
32 #include <tuple>
33 #include <type_traits>
34 #include <typeinfo>
35 #include <utility>
36 #include <unordered_map>
37 
38 #include <hpx/futures/future.hpp>
39 #include <hpx/hpx_init.hpp>
40 #include <hpx/include/async.hpp>
41 #include <hpx/include/iostreams.hpp>
42 #include <hpx/include/lcos.hpp>
43 #include <hpx/lcos/promise.hpp>
44 #include <hpx/mutex.hpp>
45 
46 #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
47 // required for defining type traits using cuda executor as underlying
48 // aggregation executors
49 #include <hpx/async_cuda/cuda_executor.hpp>
50 #endif
51 
52 #include <boost/core/demangle.hpp>
53 #include <boost/format.hpp>
54 
56 // get direct access to the buffer manangment
58 // get normal access to the executor pools
60 
61 #ifndef CPPUDDLE_HAVE_HPX_MUTEX
62 #pragma message \
63  "Work aggregation will use hpx::mutex internally, despite CPPUDDLE_WITH_HPX_MUTEX=OFF"
64 #pragma message \
65  "Consider using CPPUDDLE_WITH_HPX_MUTEX=ON, to make the rest of CPPuddle also use hpx::mutex"
66 #endif
67 namespace cppuddle {
68 namespace kernel_aggregation {
69 namespace detail {
70  using aggregation_mutex_t = hpx::mutex;
71 
72 //===============================================================================
73 //===============================================================================
74 // Helper functions/classes
75 
78 template <typename... Ts>
79 std::tuple<Ts...> make_tuple_supporting_references(Ts &&...ts) {
80  return std::tuple<Ts...>{std::forward<Ts>(ts)...};
81 }
82 
85 template <typename T> std::string print_if_possible(T val) {
86  if constexpr (std::is_convertible_v<T, std::string>) {
87  return val;
88  } else if constexpr (std::is_integral_v<T> || std::is_floating_point_v<T>) {
89  return std::to_string(val);
90  } else if constexpr (std::is_pointer_v<T>) {
91  // Pretty printing pointer sort of only works well with %p
92  // TODO Try using std::format as soon as we can move to C++20
93  std::unique_ptr<char[]> debug_string(new char[128]());
94  snprintf(debug_string.get(), 128, "%p", val);
95  return std::string(debug_string.get());
96  } else {
97  return std::string("cannot print value");
98  }
99 }
100 
103 template <class TupType, size_t... I>
104 void print_tuple(const TupType &_tup, std::index_sequence<I...>) {
105  (..., (hpx::cout << (I == 0 ? "" : ", ")
106  << print_if_possible(std::get<I + 1>(_tup))));
107 }
108 
111 template <class... T> void print_tuple(const std::tuple<T...> &_tup) {
112  // Use pointer and sprintf as boost::format refused to NOT cast the pointer
113  // address to 1...
114  // TODO Try using std::format as soon as we can move to C++20
115  std::unique_ptr<char[]> debug_string(new char[128]());
116  snprintf(debug_string.get(), 128, "Function address: %p -- Arguments: (",
117  std::get<0>(_tup));
118  hpx::cout << debug_string.get();
119  print_tuple(_tup, std::make_index_sequence<sizeof...(T) - 1>());
120  hpx::cout << ")";
121 }
122 
123 //===============================================================================
124 //===============================================================================
125 template <typename Executor, typename F, typename... Ts>
126 void exec_post_wrapper(Executor & exec, F &&f, Ts &&...ts) {
127  hpx::apply(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
128 }
129 
130 template <typename Executor, typename F, typename... Ts>
131 hpx::lcos::future<void> exec_async_wrapper(Executor & exec, F &&f, Ts &&...ts) {
132  return hpx::async(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
133 }
134 
137 /** Launch conditions: All slice executors must have called the same function
138  * (tracked by future all_slices_ready)
139  * AND
140  * Previous aggregated_function_call on the same Executor must have been
141  * launched (tracked by future stream_future)
142  * All function calls received from the slice executors are checked if they
143  * match the first one in both types and values (throws exception otherwise)
144  */
145 
146 template <typename Executor> class aggregated_function_call {
147 private:
148  std::atomic<size_t> slice_counter = 0;
149 
151  /* hpx::lcos::local::promise<void> slices_ready_promise; */
153  /* hpx::lcos::future<void> all_slices_ready = slices_ready_promise.get_future(); */
155  const size_t number_slices;
156  const bool async_mode;
157 
158  Executor &underlying_executor;
159 
160 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
161 #pragma message \
162  "Building slow work aggegator build with additional runtime checks! Build with NDEBUG defined for fast build..."
165  std::any function_tuple;
167  std::string debug_type_information;
168  aggregation_mutex_t debug_mut;
169 #endif
170 
171  std::vector<hpx::lcos::local::promise<void>> potential_async_promises{};
172 
173 public:
174  aggregated_function_call(const size_t number_slices, bool async_mode, Executor &exec)
175  : number_slices(number_slices), async_mode(async_mode), underlying_executor(exec) {
176  if (async_mode)
177  potential_async_promises.resize(number_slices);
178  }
179  ~aggregated_function_call(void) {
180  // All slices should have done this call
181  assert(slice_counter == number_slices);
182  // assert(!all_slices_ready.valid());
183  }
185  bool sync_aggregation_slices(hpx::lcos::future<void> &stream_future) {
186  assert(!async_mode);
187  assert(potential_async_promises.empty());
188  const size_t local_counter = slice_counter++;
189  if (local_counter == number_slices - 1) {
190  return true;
191  }
192  else return false;
193  }
194  template <typename F, typename... Ts>
195  void post_when(hpx::lcos::future<void> &stream_future, F &&f, Ts &&...ts) {
196 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
197  // needed for concurrent access to function_tuple and debug_type_information
198  // Not required for normal use
199  std::lock_guard<aggregation_mutex_t> guard(debug_mut);
200 #endif
201  assert(!async_mode);
202  assert(potential_async_promises.empty());
203  const size_t local_counter = slice_counter++;
204 
205  if (local_counter == 0) {
206 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
207  auto tmp_tuple =
208  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
209  function_tuple = tmp_tuple;
210  debug_type_information = typeid(decltype(tmp_tuple)).name();
211 #endif
212 
213  } else {
214  //
215  // This scope checks if both the type and the values of the current call
216  // match the original call To be used in debug build...
217  //
218 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
219  auto comparison_tuple =
220  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
221  try {
222  auto orig_call_tuple =
223  std::any_cast<decltype(comparison_tuple)>(function_tuple);
224  if (comparison_tuple != orig_call_tuple) {
225  throw std::runtime_error(
226  "Values of post function arguments (or function "
227  "itself) do not match ");
228  }
229  } catch (const std::bad_any_cast &e) {
230  hpx::cout
231  << "\nMismatched types error in aggregated post call of executor "
232  << ": " << e.what() << "\n";
233  hpx::cout << "Expected types:\t\t "
234  << boost::core::demangle(debug_type_information.c_str());
235  hpx::cout << "\nGot types:\t\t "
236  << boost::core::demangle(
237  typeid(decltype(comparison_tuple)).name())
238  << "\n"
239  << std::endl;
240  // throw;
241  } catch (const std::runtime_error &e) {
242  hpx::cout
243  << "\nMismatched values error in aggregated post call of executor "
244  << ": " << e.what() << std::endl;
245  hpx::cout << "Types (matched):\t "
246  << boost::core::demangle(debug_type_information.c_str());
247  auto orig_call_tuple =
248  std::any_cast<decltype(comparison_tuple)>(function_tuple);
249  hpx::cout << "\nExpected values:\t ";
250  print_tuple(orig_call_tuple);
251  hpx::cout << "\nGot values:\t\t ";
252  print_tuple(comparison_tuple);
253  hpx::cout << std::endl << std::endl;
254  // throw;
255  }
256 #endif
257  }
258  assert(local_counter < number_slices);
259  assert(slice_counter < number_slices + 1);
260  // Check exit criteria: Launch function call continuation by setting the
261  // slices promise
262  if (local_counter == number_slices - 1) {
263  exec_post_wrapper<Executor, F, Ts...>(underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
264  //slices_ready_promise.set_value();
265  }
266  }
267  template <typename F, typename... Ts>
268  hpx::lcos::future<void> async_when(hpx::lcos::future<void> &stream_future,
269  F &&f, Ts &&...ts) {
270 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
271  // needed for concurrent access to function_tuple and debug_type_information
272  // Not required for normal use
273  std::lock_guard<aggregation_mutex_t> guard(debug_mut);
274 #endif
275  assert(async_mode);
276  assert(!potential_async_promises.empty());
277  const size_t local_counter = slice_counter++;
278  if (local_counter == 0) {
279 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
280  auto tmp_tuple =
281  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
282  function_tuple = tmp_tuple;
283  debug_type_information = typeid(decltype(tmp_tuple)).name();
284 #endif
285  } else {
286  //
287  // This scope checks if both the type and the values of the current call
288  // match the original call To be used in debug build...
289  //
290 #if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
291  auto comparison_tuple =
292  make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
293  try {
294  auto orig_call_tuple =
295  std::any_cast<decltype(comparison_tuple)>(function_tuple);
296  if (comparison_tuple != orig_call_tuple) {
297  throw std::runtime_error(
298  "Values of async function arguments (or function "
299  "itself) do not match ");
300  }
301  } catch (const std::bad_any_cast &e) {
302  hpx::cout
303  << "\nMismatched types error in aggregated async call of executor "
304  << ": " << e.what() << "\n";
305  hpx::cout << "Expected types:\t\t "
306  << boost::core::demangle(debug_type_information.c_str());
307  hpx::cout << "\nGot types:\t\t "
308  << boost::core::demangle(
309  typeid(decltype(comparison_tuple)).name())
310  << "\n"
311  << std::endl;
312  // throw;
313  } catch (const std::runtime_error &e) {
314  hpx::cout
315  << "\nMismatched values error in aggregated async call of executor "
316  << ": " << e.what() << std::endl;
317  hpx::cout << "Types (matched):\t "
318  << boost::core::demangle(debug_type_information.c_str());
319  auto orig_call_tuple =
320  std::any_cast<decltype(comparison_tuple)>(function_tuple);
321  hpx::cout << "\nExpected values:\t ";
322  print_tuple(orig_call_tuple);
323  hpx::cout << "\nGot values:\t\t ";
324  print_tuple(comparison_tuple);
325  hpx::cout << std::endl << std::endl;
326  // throw;
327  }
328 #endif
329  }
330  assert(local_counter < number_slices);
331  assert(slice_counter < number_slices + 1);
332  assert(potential_async_promises.size() == number_slices);
333  hpx::lcos::future<void> ret_fut =
334  potential_async_promises[local_counter].get_future();
335  if (local_counter == number_slices - 1) {
336  /* slices_ready_promise.set_value(); */
337  auto fut = exec_async_wrapper<Executor, F, Ts...>(
338  underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
339  fut.then([this](auto &&fut) {
340  for (auto &promise : potential_async_promises) {
341  promise.set_value();
342  }
343  });
344  }
345  // Check exit criteria: Launch function call continuation by setting the
346  // slices promise
347  return ret_fut;
348  }
349  template <typename F, typename... Ts>
350  hpx::lcos::shared_future<void> wrap_async(hpx::lcos::future<void> &stream_future,
351  F &&f, Ts &&...ts) {
352  assert(async_mode);
353  assert(!potential_async_promises.empty());
354  const size_t local_counter = slice_counter++;
355  assert(local_counter < number_slices);
356  assert(slice_counter < number_slices + 1);
357  assert(potential_async_promises.size() == number_slices);
358  hpx::lcos::shared_future<void> ret_fut =
359  potential_async_promises[local_counter].get_shared_future();
360  if (local_counter == number_slices - 1) {
361  auto fut = f(std::forward<Ts>(ts)...);
362  fut.then([this](auto &&fut) {
363  // TODO just use one promise
364  for (auto &promise : potential_async_promises) {
365  promise.set_value();
366  }
367  });
368  }
369  return ret_fut;
370  }
371  // We need to be able to copy or no-except move for std::vector..
372  aggregated_function_call(const aggregated_function_call &other) = default;
373  aggregated_function_call &
374  operator=(const aggregated_function_call &other) = default;
375  aggregated_function_call(aggregated_function_call &&other) = default;
376  aggregated_function_call &
377  operator=(aggregated_function_call &&other) = default;
378 };
379 
380 //===============================================================================
381 //===============================================================================
382 
383 enum class aggregated_executor_modes { EAGER = 1, STRICT, ENDLESS };
385 template <typename T, typename Host_Allocator, typename Executor>
386 class allocator_slice;
387 
389 /** Executor is not meant to be used directly. Instead it yields multiple
390  * executor_slice objects. These serve as interfaces. Slices from the same
391  * aggregated_executor are meant to execute the same function calls but on
392  * different data (i.e. different tasks)
393  */
394 template <typename Executor> class aggregated_executor {
395 private:
396  //===============================================================================
397  // Misc private variables:
398  //
399  std::atomic<bool> slices_exhausted;
400 
401  std::atomic<bool> executor_slices_alive;
402  std::atomic<bool> buffers_in_use;
403  std::atomic<size_t> dealloc_counter;
404 
405  const aggregated_executor_modes mode;
406  const size_t max_slices;
407  std::atomic<size_t> current_slices;
411  std::unique_ptr<cppuddle::executor_recycling::executor_interface<
412  Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>>
413  executor_wrapper;
414 
415 public:
416  size_t gpu_id;
417  // Subclasses
418 
420  class executor_slice {
421  public:
422  aggregated_executor<Executor> &parent;
423  private:
427  size_t launch_counter{0};
428  size_t buffer_counter{0};
429  bool notify_parent_about_destruction{true};
430 
431  public:
434  const size_t number_slices;
435  size_t max_slices;
436  size_t id;
437  using executor_t = Executor;
438  executor_slice(aggregated_executor &parent, const size_t slice_id,
439  const size_t number_slices, const size_t max_number_slices)
440  : parent(parent), notify_parent_about_destruction(true),
441  number_slices(number_slices), id(slice_id), max_slices(max_number_slices) {
442  assert(parent.max_slices == max_slices);
443  assert(number_slices >= 1);
444  assert(number_slices <= max_slices);
445  }
446  ~executor_slice(void) {
447  // Don't notify parent if we moved away from this executor_slice
448  if (notify_parent_about_destruction) {
449  // Executor should be done by the time of destruction
450  // -> check here before notifying parent
451 
452  assert(parent.max_slices == max_slices);
453  assert(number_slices >= 1);
454  assert(number_slices <= max_slices);
455  // parent still in execution mode?
456  assert(parent.slices_exhausted == true);
457  // all kernel launches done?
458  assert(launch_counter == parent.function_calls.size());
459  // Notifiy parent that this aggregation slice is one
460  parent.reduce_usage_counter();
461  }
462  }
463  executor_slice(const executor_slice &other) = delete;
464  executor_slice &operator=(const executor_slice &other) = delete;
465  executor_slice(executor_slice &&other)
466  : parent(other.parent), launch_counter(std::move(other.launch_counter)),
467  buffer_counter(std::move(other.buffer_counter)),
468  number_slices(std::move(other.number_slices)),
469  id(std::move(other.id)), max_slices(std::move(other.max_slices)) {
470  other.notify_parent_about_destruction = false;
471  }
472  executor_slice &operator=(executor_slice &&other) {
473  parent = other.parent;
474  launch_counter = std::move(other.launch_counter);
475  buffer_counter = std::move(other.buffer_counter);
476  number_slices = std::move(other.number_slices);
477  id = std::move(other.id);
478  max_slices = std::move(other.max_slices);
479  other.notify_parent_about_destruction = false;
480  }
481  template <typename T, typename Host_Allocator>
482  allocator_slice<T, Host_Allocator, Executor> make_allocator() {
483  return allocator_slice<T, Host_Allocator, Executor>(*this);
484  }
485  bool sync_aggregation_slices() {
486  assert(parent.slices_exhausted == true);
487  auto ret = parent.sync_aggregation_slices(launch_counter);
488  launch_counter++;
489  return ret;
490  }
491  template <typename F, typename... Ts> void post(F &&f, Ts &&...ts) {
492  // we should only execute function calls once all slices
493  // have been given away (-> Executor Slices start)
494  assert(parent.slices_exhausted == true);
495  parent.post(launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
496  launch_counter++;
497  }
498  template <typename F, typename... Ts>
499  hpx::lcos::future<void> async(F &&f, Ts &&...ts) {
500  // we should only execute function calls once all slices
501  // have been given away (-> Executor Slices start)
502  assert(parent.slices_exhausted == true);
503  hpx::lcos::future<void> ret_fut = parent.async(
504  launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
505  launch_counter++;
506  return ret_fut;
507  }
508 
509  // OneWay Execution
510  template <typename F, typename... Ts>
511  friend decltype(auto) tag_invoke(hpx::parallel::execution::post_t,
512  executor_slice& exec, F&& f, Ts&&... ts)
513  {
514  return exec.post(std::forward<F>(f), std::forward<Ts>(ts)...);
515  }
516 
517  // TwoWay Execution
518  template <typename F, typename... Ts>
519  friend decltype(auto) tag_invoke(
520  hpx::parallel::execution::async_execute_t, executor_slice& exec,
521  F&& f, Ts&&... ts)
522  {
523  return exec.async(
524  std::forward<F>(f), std::forward<Ts>(ts)...);
525  }
526 
527  template <typename F, typename... Ts>
528  hpx::lcos::shared_future<void> wrap_async(F &&f, Ts &&...ts) {
529  // we should only execute function calls once all slices
530  // have been given away (-> Executor Slices start)
531  assert(parent.slices_exhausted == true);
532  hpx::lcos::shared_future<void> ret_fut = parent.wrap_async(
533  launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
534  launch_counter++;
535  return ret_fut;
536  }
537 
540  template <typename T, typename Host_Allocator> T *get(const size_t size) {
541  assert(parent.slices_exhausted == true);
542  T *aggregated_buffer =
543  parent.get<T, Host_Allocator>(size, buffer_counter);
544  buffer_counter++;
545  assert(buffer_counter > 0);
546  return aggregated_buffer;
547  }
548 
549  Executor& get_underlying_executor(void) {
550  assert(parent.executor_wrapper);
551  return *(parent.executor_wrapper);
552  }
553  };
554 
555  // deprecated name...
556  using Executor_Slice [[deprectated("Renamed: Use executor_slice instead")]] = executor_slice;
557 
558  //===============================================================================
559 
560  hpx::lcos::local::promise<void> slices_full_promise;
563  std::vector<hpx::lcos::local::promise<executor_slice>> executor_slices;
566  std::deque<aggregated_function_call<Executor>> function_calls;
568  aggregation_mutex_t mut;
569 
572  using buffer_entry_t =
573  std::tuple<void*, const size_t, std::atomic<size_t>, bool, const size_t, size_t>;
575  std::deque<buffer_entry_t> buffer_allocations;
577  std::unordered_map<void*,size_t> buffer_allocations_map;
579  aggregation_mutex_t buffer_mut;
580  std::atomic<size_t> buffer_counter = 0;
581 
583  template <typename T, typename Host_Allocator>
584  T *get(const size_t size, const size_t slice_alloc_counter) {
585  assert(slices_exhausted == true);
586  assert(executor_wrapper);
587  assert(executor_slices_alive == true);
588  // Add aggreated buffer entry in case it hasn't happened yet for this call
589  // First: Check if it already has happened
590  if (buffer_counter <= slice_alloc_counter) {
591  // we might be the first! Lock...
592  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
593  // ... and recheck
594  if (buffer_counter <= slice_alloc_counter) {
595  constexpr bool manage_content_lifetime = false;
596  buffers_in_use = true;
597 
598  // Default location -- useful for GPU builds as we otherwise create way too
599  // many different buffers for different aggregation sizes on different GPUs
600  /* size_t location_id = gpu_id * instances_per_gpu; */
601  // Use integer conversion to only use 0 16 32 ... as buckets
602  size_t location_id = ((hpx::get_worker_thread_num() % cppuddle::number_instances) / 16) * 16;
603 #ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
604  if (max_slices == 1) {
605  // get prefered location: aka the current hpx threads location
606  // Usually handy for CPU builds where we want to use the buffers
607  // close to the current CPU core
608  /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */
609  /* location_id = (gpu_id) * instances_per_gpu; */
610  // division makes sure that we always use the same instance to store our gpu buffers.
611  }
612 #endif
613  // Get shiny and new buffer that will be shared between all slices
614  // Buffer might be recycled from previous allocations by the
615  // buffer_interface...
616  T *aggregated_buffer =
617  cppuddle::memory_recycling::detail::buffer_interface::get<
618  T, Host_Allocator>(size, manage_content_lifetime, location_id,
619  gpu_id);
620  // Create buffer entry for this buffer
621  buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
622  size, 1, true, location_id, gpu_id);
623 
624 #ifndef NDEBUG
625  // if previousely used the buffer should not be in usage anymore
626  const auto exists = buffer_allocations_map.count(
627  static_cast<void *>(aggregated_buffer));
628  if (exists > 0) {
629  const auto previous_usage_id =
630  buffer_allocations_map[static_cast<void *>(aggregated_buffer)];
631  const auto &valid =
632  std::get<3>(buffer_allocations[previous_usage_id]);
633  assert(!valid);
634  }
635 #endif
636  buffer_allocations_map.insert_or_assign(static_cast<void *>(aggregated_buffer),
637  buffer_counter);
638 
639  assert (buffer_counter == slice_alloc_counter);
640  buffer_counter = buffer_allocations.size();
641 
642  // Return buffer
643  return aggregated_buffer;
644  }
645  }
646  assert(buffers_in_use == true);
647  assert(std::get<3>(buffer_allocations[slice_alloc_counter])); // valid
648  assert(std::get<2>(buffer_allocations[slice_alloc_counter]) >= 1);
649 
650  // Buffer entry should already exist:
651  T *aggregated_buffer = static_cast<T *>(
652  std::get<0>(buffer_allocations[slice_alloc_counter]));
653  // Error handling: Size is wrong?
654  assert(size == std::get<1>(buffer_allocations[slice_alloc_counter]));
655  // Notify that one more slice has visited this buffer allocation
656  std::get<2>(buffer_allocations[slice_alloc_counter])++;
657  return aggregated_buffer;
658  }
659 
661  template <typename T, typename Host_Allocator>
662  void mark_unused(T *p, const size_t size) {
663  assert(slices_exhausted == true);
664  assert(executor_wrapper);
665 
666  void *ptr_key = static_cast<void*>(p);
667  size_t slice_alloc_counter = buffer_allocations_map[p];
668 
669  assert(slice_alloc_counter < buffer_allocations.size());
670  /*auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, valid] =
671  buffer_allocations[slice_alloc_counter];*/
672  auto buffer_pointer_void = std::get<0>(buffer_allocations[slice_alloc_counter]);
673  const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
674  auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
675  auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
676  const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
677  const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]);
678  assert(valid);
679  T *buffer_pointer = static_cast<T *>(buffer_pointer_void);
680 
681  assert(buffer_size == size);
682  assert(p == buffer_pointer);
683  // assert(buffer_pointer == p || buffer_pointer == nullptr);
684  // Slice is done with this buffer
685  buffer_allocation_counter--;
686  // Check if all slices are done with this buffer?
687  if (buffer_allocation_counter == 0) {
688  // Yes! "Deallocate" by telling the recylcer the buffer is fit for reusage
689  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
690  // Only mark unused if another buffer has not done so already (and marked
691  // it as invalid)
692  if (valid) {
693  assert(buffers_in_use == true);
694  cppuddle::memory_recycling::detail::buffer_interface::mark_unused<
695  T, Host_Allocator>(buffer_pointer, buffer_size, location_id,
696  gpu_id);
697  // mark buffer as invalid to prevent any other slice from marking the
698  // buffer as unused
699  valid = false;
700 
701  const size_t current_deallocs = ++dealloc_counter;
702  if (current_deallocs == buffer_counter) {
703  std::lock_guard<aggregation_mutex_t> guard(mut);
704  buffers_in_use = false;
705  if (!executor_slices_alive && !buffers_in_use) {
706  slices_exhausted = false;
707  // Release executor
708  executor_wrapper.reset(nullptr);
709  }
710  }
711  }
712  }
713  }
714 
715  //===============================================================================
716  // Public Interface
717 public:
718  hpx::lcos::future<void> current_continuation;
719  hpx::lcos::future<void> last_stream_launch_done;
720  std::atomic<size_t> overall_launch_counter = 0;
721 
723  bool sync_aggregation_slices(const size_t slice_launch_counter) {
724  std::lock_guard<aggregation_mutex_t> guard(mut);
725  assert(slices_exhausted == true);
726  assert(executor_wrapper);
727  // Add function call object in case it hasn't happened for this launch yet
728  if (overall_launch_counter <= slice_launch_counter) {
729  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
730  if (overall_launch_counter <= slice_launch_counter) {
731  function_calls.emplace_back(current_slices, false, *executor_wrapper);
732  overall_launch_counter = function_calls.size();
733  return function_calls[slice_launch_counter].sync_aggregation_slices(
734  last_stream_launch_done);
735  }
736  }
737 
738  return function_calls[slice_launch_counter].sync_aggregation_slices(
739  last_stream_launch_done);
740  }
741 
743  template <typename F, typename... Ts>
744  void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
745  std::lock_guard<aggregation_mutex_t> guard(mut);
746  assert(slices_exhausted == true);
747  assert(executor_wrapper);
748  // Add function call object in case it hasn't happened for this launch yet
749  if (overall_launch_counter <= slice_launch_counter) {
750  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
751  if (overall_launch_counter <= slice_launch_counter) {
752  function_calls.emplace_back(current_slices, false, *executor_wrapper);
753  overall_launch_counter = function_calls.size();
754  function_calls[slice_launch_counter].post_when(
755  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
756  return;
757  }
758  }
759 
760  function_calls[slice_launch_counter].post_when(
761  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
762  return;
763  }
764 
766  template <typename F, typename... Ts>
767  hpx::lcos::future<void> async(const size_t slice_launch_counter, F &&f,
768  Ts &&...ts) {
769  std::lock_guard<aggregation_mutex_t> guard(mut);
770  assert(slices_exhausted == true);
771  assert(executor_wrapper);
772  // Add function call object in case it hasn't happened for this launch yet
773  if (overall_launch_counter <= slice_launch_counter) {
774  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
775  if (overall_launch_counter <= slice_launch_counter) {
776  function_calls.emplace_back(current_slices, true, *executor_wrapper);
777  overall_launch_counter = function_calls.size();
778  return function_calls[slice_launch_counter].async_when(
779  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
780  }
781  }
782 
783  return function_calls[slice_launch_counter].async_when(
784  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
785  }
787  template <typename F, typename... Ts>
788  hpx::lcos::shared_future<void> wrap_async(const size_t slice_launch_counter, F &&f,
789  Ts &&...ts) {
790  std::lock_guard<aggregation_mutex_t> guard(mut);
791  assert(slices_exhausted == true);
792  assert(executor_wrapper);
793  // Add function call object in case it hasn't happened for this launch yet
794  if (overall_launch_counter <= slice_launch_counter) {
795  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
796  if (overall_launch_counter <= slice_launch_counter) {
797  function_calls.emplace_back(current_slices, true, *executor_wrapper);
798  overall_launch_counter = function_calls.size();
799  return function_calls[slice_launch_counter].wrap_async(
800  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
801  }
802  }
803 
804  return function_calls[slice_launch_counter].wrap_async(
805  last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
806  }
807 
808  bool slice_available(void) {
809  std::lock_guard<aggregation_mutex_t> guard(mut);
810  return !slices_exhausted;
811  }
812 
813  std::optional<hpx::lcos::future<executor_slice>> request_executor_slice() {
814  std::lock_guard<aggregation_mutex_t> guard(mut);
815  if (!slices_exhausted) {
816  const size_t local_slice_id = ++current_slices;
817  if (local_slice_id == 1) {
818  // Cleanup leftovers from last run if any
819  // TODO still required? Should be clean here already
820  function_calls.clear();
821  overall_launch_counter = 0;
822  std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
823 #ifndef NDEBUG
824  for (const auto &buffer_entry : buffer_allocations) {
825  const auto &[buffer_pointer_any, buffer_size,
826  buffer_allocation_counter, valid, location_id, device_id] =
827  buffer_entry;
828  assert(!valid);
829  }
830 #endif
831  buffer_allocations.clear();
832  buffer_allocations_map.clear();
833  buffer_counter = 0;
834 
835  assert(executor_slices_alive == false);
836  assert(buffers_in_use == false);
837  executor_slices_alive = true;
838  buffers_in_use = false;
839  dealloc_counter = 0;
840 
841  if (mode == aggregated_executor_modes::STRICT ) {
842  slices_full_promise = hpx::lcos::local::promise<void>{};
843  }
844  }
845 
846  // Create Executor Slice future -- that will be returned later
847  hpx::lcos::future<executor_slice> ret_fut;
848  if (local_slice_id < max_slices) {
849  executor_slices.emplace_back(hpx::lcos::local::promise<executor_slice>{});
850  ret_fut =
851  executor_slices[local_slice_id - 1].get_future();
852  } else {
853  launched_slices = current_slices;
854  ret_fut = hpx::make_ready_future(executor_slice{*this,
855  executor_slices.size(), launched_slices, max_slices});
856  }
857 
858  // Are we the first slice? If yes, add continuation set the
859  // executor_slice
860  // futures to ready if the launch conditions are met
861  if (local_slice_id == 1) {
862  // Redraw executor
863  assert(!executor_wrapper);
864  cppuddle::executor_recycling::executor_pool::select_device<
865  Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
866  gpu_id);
867  executor_wrapper.reset(
868  new cppuddle::executor_recycling::executor_interface<
869  Executor,
870  cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
871  gpu_id));
872  // Renew promise that all slices will be ready as the primary launch
873  // criteria...
874  hpx::lcos::shared_future<void> fut;
875  if (mode == aggregated_executor_modes::EAGER ||
876  mode == aggregated_executor_modes::ENDLESS) {
877  // Fallback launch condidtion: Launch as soon as the underlying stream
878  // is ready
879  /* auto slices_full_fut = slices_full_promise.get_future(); */
880  cppuddle::executor_recycling::executor_pool::select_device<
881  Executor,
882  cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(gpu_id);
883  auto exec_fut = (*executor_wrapper).get_future();
884  /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
885  fut = std::move(exec_fut);
886  } else {
887  auto slices_full_fut = slices_full_promise.get_shared_future();
888  // Just use the slices launch condition
889  fut = std::move(slices_full_fut);
890  }
891  // Launch all executor slices within this continuation
892  current_continuation = fut.then([this](auto &&fut) {
893  std::lock_guard<aggregation_mutex_t> guard(mut);
894  slices_exhausted = true;
895  launched_slices = current_slices;
896  size_t id = 0;
897  for (auto &slice_promise : executor_slices) {
898  slice_promise.set_value(
899  executor_slice{*this, id, launched_slices, max_slices});
900  id++;
901  }
902  executor_slices.clear();
903  });
904  }
905  if (local_slice_id >= max_slices &&
906  mode != aggregated_executor_modes::ENDLESS) {
907  slices_exhausted = true; // prevents any more threads from entering
908  // before the continuation is launched
909  /* launched_slices = current_slices; */
910  /* size_t id = 0; */
911  /* for (auto &slice_promise : executor_slices) { */
912  /* slice_promise.set_value( */
913  /* executor_slice{*this, id, launched_slices}); */
914  /* id++; */
915  /* } */
916  /* executor_slices.clear(); */
917  if (mode == aggregated_executor_modes::STRICT ) {
918  slices_full_promise.set_value(); // Trigger slices launch condition continuation
919  }
920  // that continuation will set all executor slices so far handed out to ready
921  }
922  return ret_fut;
923  } else {
924  // Return empty optional as failure
925  return std::optional<hpx::lcos::future<executor_slice>>{};
926  }
927  }
928  size_t launched_slices;
929  void reduce_usage_counter(void) {
930  /* std::lock_guard<aggregation_mutex_t> guard(mut); */
931  assert(slices_exhausted == true);
932  assert(executor_wrapper);
933  assert(executor_slices_alive == true);
934  assert(launched_slices >= 1);
935  assert(current_slices >= 0 && current_slices <= launched_slices);
936  const size_t local_slice_id = --current_slices;
937  // Last slice goes out scope?
938  if (local_slice_id == 0) {
939  // Mark executor fit for reusage
940  std::lock_guard<aggregation_mutex_t> guard(mut);
941  executor_slices_alive = false;
942  if (!executor_slices_alive && !buffers_in_use) {
943  // Release executor
944  slices_exhausted = false;
945  executor_wrapper.reset(nullptr);
946  }
947  }
948  }
949  ~aggregated_executor(void) {
950 
951  assert(current_slices == 0);
952  assert(executor_slices_alive == false);
953  assert(buffers_in_use == false);
954 
955  if (mode != aggregated_executor_modes::STRICT ) {
956  slices_full_promise.set_value(); // Trigger slices launch condition continuation
957  }
958 
959  // Cleanup leftovers from last run if any
960  function_calls.clear();
961  overall_launch_counter = 0;
962 #ifndef NDEBUG
963  for (const auto &buffer_entry : buffer_allocations) {
964  const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
965  valid, location_id, device_id] = buffer_entry;
966  assert(!valid);
967  }
968 #endif
969  buffer_allocations.clear();
970  buffer_allocations_map.clear();
971  buffer_counter = 0;
972 
973  assert(buffer_allocations.empty());
974  assert(buffer_allocations_map.empty());
975  }
976 
977  aggregated_executor(const size_t number_slices,
978  aggregated_executor_modes mode, const size_t gpu_id = 0)
979  : max_slices(number_slices), current_slices(0), slices_exhausted(false),
980  dealloc_counter(0), mode(mode), executor_slices_alive(false),
981  buffers_in_use(false), gpu_id(gpu_id),
982  executor_wrapper(nullptr),
983  current_continuation(hpx::make_ready_future()),
984  last_stream_launch_done(hpx::make_ready_future()) {}
985  // Not meant to be copied or moved
986  aggregated_executor(const aggregated_executor &other) = delete;
987  aggregated_executor &operator=(const aggregated_executor &other) = delete;
988  aggregated_executor(aggregated_executor &&other) = delete;
989  aggregated_executor &operator=(aggregated_executor &&other) = delete;
990 };
991 
992 template <typename T, typename Host_Allocator, typename Executor>
993 class allocator_slice {
994 private:
995  typename aggregated_executor<Executor>::executor_slice &executor_reference;
996  aggregated_executor<Executor> &executor_parent;
997 
998 public:
999  using value_type = T;
1000  allocator_slice(
1001  typename aggregated_executor<Executor>::executor_slice &executor)
1002  : executor_reference(executor), executor_parent(executor.parent) {}
1003  template <typename U>
1004  explicit allocator_slice(
1005  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {}
1006  T *allocate(std::size_t n) {
1007  T *data = executor_reference.template get<T, Host_Allocator>(n);
1008  return data;
1009  }
1010  void deallocate(T *p, std::size_t n) {
1011  /* executor_reference.template mark_unused<T, Host_Allocator>(p, n); */
1012  executor_parent.template mark_unused<T, Host_Allocator>(p, n);
1013  }
1014  template <typename... Args>
1015  inline void construct(T *p, Args... args) noexcept {
1016  // Do nothing here - we reuse the content of the last owner
1017  }
1018  void destroy(T *p) {
1019  // Do nothing here - Contents will be destroyed when the buffer manager is
1020  // destroyed, not before
1021  }
1022 };
1023 template <typename T, typename U, typename Host_Allocator, typename Executor>
1024 constexpr bool
1025 operator==(allocator_slice<T, Host_Allocator, Executor> const &,
1026  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
1027  return false;
1028 }
1029 template <typename T, typename U, typename Host_Allocator, typename Executor>
1030 constexpr bool
1031 operator!=(allocator_slice<T, Host_Allocator, Executor> const &,
1032  allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
1033  return true;
1034 }
1035 
1036 } // namespace detail
1037 } // namespace kernel_aggregation
1038 } // namespace cppuddle
1039 
1040 
1041 
1042 namespace hpx { namespace parallel { namespace execution {
1043  // TODO Unfortunately does not work that way! Create trait that works for Executor Slices with
1044  // compatible unlying executor types
1045  /* template<typename E> */
1046  /* struct is_one_way_executor<typename aggregated_executor<E>::executor_slice> */
1047  /* : std::true_type */
1048  /* {}; */
1049  /* template<typename E> */
1050  /* struct is_two_way_executor<typename aggregated_executor<E>::executor_slice> */
1051  /* : std::true_type */
1052  /* {}; */
1053 
1054 #if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
1055  // Workaround for the meantime: Manually create traits for compatible types:
1056 template <>
1057 struct is_one_way_executor<
1058  typename cppuddle::kernel_aggregation::detail::aggregated_executor<
1059  hpx::cuda::experimental::cuda_executor>::executor_slice>
1060  : std::true_type {};
1061 template <>
1062 struct is_two_way_executor<
1063  typename cppuddle::kernel_aggregation::detail::aggregated_executor<
1064  hpx::cuda::experimental::cuda_executor>::executor_slice>
1065  : std::true_type {};
1066 #endif
1067 }}}
1068 
1069 #endif
Slice class - meant as a scope interface to the aggregated executor.
Definition: aggregation_executors_and_allocators.hpp:420