CPPuddle
Loading...
Searching...
No Matches
aggregation_executors_and_allocators.hpp
Go to the documentation of this file.
1// Copyright (c) 2022-2024 Gregor Daiß
2//
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6#ifndef AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
7#define AGGREGATION_EXECUTOR_AND_ALLOCATOR_HPP
8
9#ifndef CPPUDDLE_HAVE_HPX
10#error "Work aggregation allocators/executors require CPPUDDLE_WITH_HPX=ON"
11#endif
12
13#include <stdexcept>
14// When defined, CPPuddle will run more checks
15// about the order of aggregated method calls.
16// Best defined before including this header when needed
17// (hence commented out here)
18//#define DEBUG_AGGREGATION_CALLS 1
19
20#include <stdio.h>
21
22#include <any>
23#include <atomic>
24#include <chrono>
25#include <cstdio>
26#include <iostream>
27#include <memory>
28#include <mutex>
29#include <optional>
30#include <ostream>
31#include <string>
32#include <tuple>
33#include <type_traits>
34#include <typeinfo>
35#include <utility>
36#include <unordered_map>
37
38#include <hpx/futures/future.hpp>
39#include <hpx/hpx_init.hpp>
40#include <hpx/include/async.hpp>
41#include <hpx/include/iostreams.hpp>
42#include <hpx/include/lcos.hpp>
43#include <hpx/lcos/promise.hpp>
44#include <hpx/mutex.hpp>
45
46#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
47// required for defining type traits using cuda executor as underlying
48// aggregation executors
49#include <hpx/async_cuda/cuda_executor.hpp>
50#endif
51
52#include <boost/core/demangle.hpp>
53#include <boost/format.hpp>
54
56// get direct access to the buffer manangment
58// get normal access to the executor pools
60
61#ifndef CPPUDDLE_HAVE_HPX_MUTEX
62#pragma message \
63 "Work aggregation will use hpx::mutex internally, despite CPPUDDLE_WITH_HPX_MUTEX=OFF"
64#pragma message \
65 "Consider using CPPUDDLE_WITH_HPX_MUTEX=ON, to make the rest of CPPuddle also use hpx::mutex"
66#endif
67namespace cppuddle {
68namespace kernel_aggregation {
69namespace detail {
70 using aggregation_mutex_t = hpx::mutex;
71
72//===============================================================================
73//===============================================================================
74// Helper functions/classes
75
78template <typename... Ts>
79std::tuple<Ts...> make_tuple_supporting_references(Ts &&...ts) {
80 return std::tuple<Ts...>{std::forward<Ts>(ts)...};
81}
82
85template <typename T> std::string print_if_possible(T val) {
86 if constexpr (std::is_convertible_v<T, std::string>) {
87 return val;
88 } else if constexpr (std::is_integral_v<T> || std::is_floating_point_v<T>) {
89 return std::to_string(val);
90 } else if constexpr (std::is_pointer_v<T>) {
91 // Pretty printing pointer sort of only works well with %p
92 // TODO Try using std::format as soon as we can move to C++20
93 std::unique_ptr<char[]> debug_string(new char[128]());
94 snprintf(debug_string.get(), 128, "%p", val);
95 return std::string(debug_string.get());
96 } else {
97 return std::string("cannot print value");
98 }
99}
100
103template <class TupType, size_t... I>
104void print_tuple(const TupType &_tup, std::index_sequence<I...>) {
105 (..., (hpx::cout << (I == 0 ? "" : ", ")
106 << print_if_possible(std::get<I + 1>(_tup))));
107}
108
111template <class... T> void print_tuple(const std::tuple<T...> &_tup) {
112 // Use pointer and sprintf as boost::format refused to NOT cast the pointer
113 // address to 1...
114 // TODO Try using std::format as soon as we can move to C++20
115 std::unique_ptr<char[]> debug_string(new char[128]());
116 snprintf(debug_string.get(), 128, "Function address: %p -- Arguments: (",
117 std::get<0>(_tup));
118 hpx::cout << debug_string.get();
119 print_tuple(_tup, std::make_index_sequence<sizeof...(T) - 1>());
120 hpx::cout << ")";
121}
122
123//===============================================================================
124//===============================================================================
125template <typename Executor, typename F, typename... Ts>
126void exec_post_wrapper(Executor & exec, F &&f, Ts &&...ts) {
127 hpx::apply(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
128}
129
130template <typename Executor, typename F, typename... Ts>
131hpx::lcos::future<void> exec_async_wrapper(Executor & exec, F &&f, Ts &&...ts) {
132 return hpx::async(exec, std::forward<F>(f), std::forward<Ts>(ts)...);
133}
134
137/** Launch conditions: All slice executors must have called the same function
138 * (tracked by future all_slices_ready)
139 * AND
140 * Previous aggregated_function_call on the same Executor must have been
141 * launched (tracked by future stream_future)
142 * All function calls received from the slice executors are checked if they
143 * match the first one in both types and values (throws exception otherwise)
144 */
145
146template <typename Executor> class aggregated_function_call {
147private:
148 std::atomic<size_t> slice_counter = 0;
149
151 /* hpx::lcos::local::promise<void> slices_ready_promise; */
153 /* hpx::lcos::future<void> all_slices_ready = slices_ready_promise.get_future(); */
155 const size_t number_slices;
156 const bool async_mode;
157
158 Executor &underlying_executor;
159
160#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
161#pragma message \
162 "Building slow work aggegator build with additional runtime checks! Build with NDEBUG defined for fast build..."
165 std::any function_tuple;
167 std::string debug_type_information;
168 aggregation_mutex_t debug_mut;
169#endif
170
171 std::vector<hpx::lcos::local::promise<void>> potential_async_promises{};
172
173public:
174 aggregated_function_call(const size_t number_slices, bool async_mode, Executor &exec)
175 : number_slices(number_slices), async_mode(async_mode), underlying_executor(exec) {
176 if (async_mode)
177 potential_async_promises.resize(number_slices);
178 }
179 ~aggregated_function_call(void) {
180 // All slices should have done this call
181 assert(slice_counter == number_slices);
182 // assert(!all_slices_ready.valid());
183 }
185 bool sync_aggregation_slices(hpx::lcos::future<void> &stream_future) {
186 assert(!async_mode);
187 assert(potential_async_promises.empty());
188 const size_t local_counter = slice_counter++;
189 if (local_counter == number_slices - 1) {
190 return true;
191 }
192 else return false;
193 }
194 template <typename F, typename... Ts>
195 void post_when(hpx::lcos::future<void> &stream_future, F &&f, Ts &&...ts) {
196#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
197 // needed for concurrent access to function_tuple and debug_type_information
198 // Not required for normal use
199 std::lock_guard<aggregation_mutex_t> guard(debug_mut);
200#endif
201 assert(!async_mode);
202 assert(potential_async_promises.empty());
203 const size_t local_counter = slice_counter++;
204
205 if (local_counter == 0) {
206#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
207 auto tmp_tuple =
208 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
209 function_tuple = tmp_tuple;
210 debug_type_information = typeid(decltype(tmp_tuple)).name();
211#endif
212
213 } else {
214 //
215 // This scope checks if both the type and the values of the current call
216 // match the original call To be used in debug build...
217 //
218#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
219 auto comparison_tuple =
220 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
221 try {
222 auto orig_call_tuple =
223 std::any_cast<decltype(comparison_tuple)>(function_tuple);
224 if (comparison_tuple != orig_call_tuple) {
225 throw std::runtime_error(
226 "Values of post function arguments (or function "
227 "itself) do not match ");
228 }
229 } catch (const std::bad_any_cast &e) {
230 hpx::cout
231 << "\nMismatched types error in aggregated post call of executor "
232 << ": " << e.what() << "\n";
233 hpx::cout << "Expected types:\t\t "
234 << boost::core::demangle(debug_type_information.c_str());
235 hpx::cout << "\nGot types:\t\t "
236 << boost::core::demangle(
237 typeid(decltype(comparison_tuple)).name())
238 << "\n"
239 << std::endl;
240 // throw;
241 } catch (const std::runtime_error &e) {
242 hpx::cout
243 << "\nMismatched values error in aggregated post call of executor "
244 << ": " << e.what() << std::endl;
245 hpx::cout << "Types (matched):\t "
246 << boost::core::demangle(debug_type_information.c_str());
247 auto orig_call_tuple =
248 std::any_cast<decltype(comparison_tuple)>(function_tuple);
249 hpx::cout << "\nExpected values:\t ";
250 print_tuple(orig_call_tuple);
251 hpx::cout << "\nGot values:\t\t ";
252 print_tuple(comparison_tuple);
253 hpx::cout << std::endl << std::endl;
254 // throw;
255 }
256#endif
257 }
258 assert(local_counter < number_slices);
259 assert(slice_counter < number_slices + 1);
260 // Check exit criteria: Launch function call continuation by setting the
261 // slices promise
262 if (local_counter == number_slices - 1) {
263 exec_post_wrapper<Executor, F, Ts...>(underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
264 //slices_ready_promise.set_value();
265 }
266 }
267 template <typename F, typename... Ts>
268 hpx::lcos::future<void> async_when(hpx::lcos::future<void> &stream_future,
269 F &&f, Ts &&...ts) {
270#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
271 // needed for concurrent access to function_tuple and debug_type_information
272 // Not required for normal use
273 std::lock_guard<aggregation_mutex_t> guard(debug_mut);
274#endif
275 assert(async_mode);
276 assert(!potential_async_promises.empty());
277 const size_t local_counter = slice_counter++;
278 if (local_counter == 0) {
279#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
280 auto tmp_tuple =
281 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
282 function_tuple = tmp_tuple;
283 debug_type_information = typeid(decltype(tmp_tuple)).name();
284#endif
285 } else {
286 //
287 // This scope checks if both the type and the values of the current call
288 // match the original call To be used in debug build...
289 //
290#if !(defined(NDEBUG)) && defined(DEBUG_AGGREGATION_CALLS)
291 auto comparison_tuple =
292 make_tuple_supporting_references(f, std::forward<Ts>(ts)...);
293 try {
294 auto orig_call_tuple =
295 std::any_cast<decltype(comparison_tuple)>(function_tuple);
296 if (comparison_tuple != orig_call_tuple) {
297 throw std::runtime_error(
298 "Values of async function arguments (or function "
299 "itself) do not match ");
300 }
301 } catch (const std::bad_any_cast &e) {
302 hpx::cout
303 << "\nMismatched types error in aggregated async call of executor "
304 << ": " << e.what() << "\n";
305 hpx::cout << "Expected types:\t\t "
306 << boost::core::demangle(debug_type_information.c_str());
307 hpx::cout << "\nGot types:\t\t "
308 << boost::core::demangle(
309 typeid(decltype(comparison_tuple)).name())
310 << "\n"
311 << std::endl;
312 // throw;
313 } catch (const std::runtime_error &e) {
314 hpx::cout
315 << "\nMismatched values error in aggregated async call of executor "
316 << ": " << e.what() << std::endl;
317 hpx::cout << "Types (matched):\t "
318 << boost::core::demangle(debug_type_information.c_str());
319 auto orig_call_tuple =
320 std::any_cast<decltype(comparison_tuple)>(function_tuple);
321 hpx::cout << "\nExpected values:\t ";
322 print_tuple(orig_call_tuple);
323 hpx::cout << "\nGot values:\t\t ";
324 print_tuple(comparison_tuple);
325 hpx::cout << std::endl << std::endl;
326 // throw;
327 }
328#endif
329 }
330 assert(local_counter < number_slices);
331 assert(slice_counter < number_slices + 1);
332 assert(potential_async_promises.size() == number_slices);
333 hpx::lcos::future<void> ret_fut =
334 potential_async_promises[local_counter].get_future();
335 if (local_counter == number_slices - 1) {
336 /* slices_ready_promise.set_value(); */
337 auto fut = exec_async_wrapper<Executor, F, Ts...>(
338 underlying_executor, std::forward<F>(f), std::forward<Ts>(ts)...);
339 fut.then([this](auto &&fut) {
340 for (auto &promise : potential_async_promises) {
341 promise.set_value();
342 }
343 });
344 }
345 // Check exit criteria: Launch function call continuation by setting the
346 // slices promise
347 return ret_fut;
348 }
349 template <typename F, typename... Ts>
350 hpx::lcos::shared_future<void> wrap_async(hpx::lcos::future<void> &stream_future,
351 F &&f, Ts &&...ts) {
352 assert(async_mode);
353 assert(!potential_async_promises.empty());
354 const size_t local_counter = slice_counter++;
355 assert(local_counter < number_slices);
356 assert(slice_counter < number_slices + 1);
357 assert(potential_async_promises.size() == number_slices);
358 hpx::lcos::shared_future<void> ret_fut =
359 potential_async_promises[local_counter].get_shared_future();
360 if (local_counter == number_slices - 1) {
361 auto fut = f(std::forward<Ts>(ts)...);
362 fut.then([this](auto &&fut) {
363 // TODO just use one promise
364 for (auto &promise : potential_async_promises) {
365 promise.set_value();
366 }
367 });
368 }
369 return ret_fut;
370 }
371 // We need to be able to copy or no-except move for std::vector..
372 aggregated_function_call(const aggregated_function_call &other) = default;
373 aggregated_function_call &
374 operator=(const aggregated_function_call &other) = default;
375 aggregated_function_call(aggregated_function_call &&other) = default;
376 aggregated_function_call &
377 operator=(aggregated_function_call &&other) = default;
378};
379
380//===============================================================================
381//===============================================================================
382
383enum class aggregated_executor_modes { EAGER = 1, STRICT, ENDLESS };
385template <typename T, typename Host_Allocator, typename Executor>
386class allocator_slice;
387
389/** Executor is not meant to be used directly. Instead it yields multiple
390 * executor_slice objects. These serve as interfaces. Slices from the same
391 * aggregated_executor are meant to execute the same function calls but on
392 * different data (i.e. different tasks)
393 */
394template <typename Executor> class aggregated_executor {
395private:
396 //===============================================================================
397 // Misc private variables:
398 //
399 std::atomic<bool> slices_exhausted;
400
401 std::atomic<bool> executor_slices_alive;
402 std::atomic<bool> buffers_in_use;
403 std::atomic<size_t> dealloc_counter;
404
405 const aggregated_executor_modes mode;
406 const size_t max_slices;
407 std::atomic<size_t> current_slices;
411 std::unique_ptr<cppuddle::executor_recycling::executor_interface<
412 Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>>
413 executor_wrapper;
414
415public:
416 size_t gpu_id;
417 // Subclasses
418
420 class executor_slice {
421 public:
422 aggregated_executor<Executor> &parent;
423 private:
427 size_t launch_counter{0};
428 size_t buffer_counter{0};
429 bool notify_parent_about_destruction{true};
430
431 public:
434 const size_t number_slices;
435 size_t max_slices;
436 size_t id;
437 using executor_t = Executor;
438 executor_slice(aggregated_executor &parent, const size_t slice_id,
439 const size_t number_slices, const size_t max_number_slices)
440 : parent(parent), notify_parent_about_destruction(true),
441 number_slices(number_slices), id(slice_id), max_slices(max_number_slices) {
442 assert(parent.max_slices == max_slices);
443 assert(number_slices >= 1);
444 assert(number_slices <= max_slices);
445 }
446 ~executor_slice(void) {
447 // Don't notify parent if we moved away from this executor_slice
448 if (notify_parent_about_destruction) {
449 // Executor should be done by the time of destruction
450 // -> check here before notifying parent
451
452 assert(parent.max_slices == max_slices);
453 assert(number_slices >= 1);
454 assert(number_slices <= max_slices);
455 // parent still in execution mode?
456 assert(parent.slices_exhausted == true);
457 // all kernel launches done?
458 assert(launch_counter == parent.function_calls.size());
459 // Notifiy parent that this aggregation slice is one
460 parent.reduce_usage_counter();
461 }
462 }
463 executor_slice(const executor_slice &other) = delete;
464 executor_slice &operator=(const executor_slice &other) = delete;
465 executor_slice(executor_slice &&other)
466 : parent(other.parent), launch_counter(std::move(other.launch_counter)),
467 buffer_counter(std::move(other.buffer_counter)),
468 number_slices(std::move(other.number_slices)),
469 id(std::move(other.id)), max_slices(std::move(other.max_slices)) {
470 other.notify_parent_about_destruction = false;
471 }
472 executor_slice &operator=(executor_slice &&other) {
473 parent = other.parent;
474 launch_counter = std::move(other.launch_counter);
475 buffer_counter = std::move(other.buffer_counter);
476 number_slices = std::move(other.number_slices);
477 id = std::move(other.id);
478 max_slices = std::move(other.max_slices);
479 other.notify_parent_about_destruction = false;
480 }
481 template <typename T, typename Host_Allocator>
482 allocator_slice<T, Host_Allocator, Executor> make_allocator() {
483 return allocator_slice<T, Host_Allocator, Executor>(*this);
484 }
485 bool sync_aggregation_slices() {
486 assert(parent.slices_exhausted == true);
487 auto ret = parent.sync_aggregation_slices(launch_counter);
488 launch_counter++;
489 return ret;
490 }
491 template <typename F, typename... Ts> void post(F &&f, Ts &&...ts) {
492 // we should only execute function calls once all slices
493 // have been given away (-> Executor Slices start)
494 assert(parent.slices_exhausted == true);
495 parent.post(launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
496 launch_counter++;
497 }
498 template <typename F, typename... Ts>
499 hpx::lcos::future<void> async(F &&f, Ts &&...ts) {
500 // we should only execute function calls once all slices
501 // have been given away (-> Executor Slices start)
502 assert(parent.slices_exhausted == true);
503 hpx::lcos::future<void> ret_fut = parent.async(
504 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
505 launch_counter++;
506 return ret_fut;
507 }
508
509 // OneWay Execution
510 template <typename F, typename... Ts>
511 friend decltype(auto) tag_invoke(hpx::parallel::execution::post_t,
512 executor_slice& exec, F&& f, Ts&&... ts)
513 {
514 return exec.post(std::forward<F>(f), std::forward<Ts>(ts)...);
515 }
516
517 // TwoWay Execution
518 template <typename F, typename... Ts>
519 friend decltype(auto) tag_invoke(
520 hpx::parallel::execution::async_execute_t, executor_slice& exec,
521 F&& f, Ts&&... ts)
522 {
523 return exec.async(
524 std::forward<F>(f), std::forward<Ts>(ts)...);
525 }
526
527 template <typename F, typename... Ts>
528 hpx::lcos::shared_future<void> wrap_async(F &&f, Ts &&...ts) {
529 // we should only execute function calls once all slices
530 // have been given away (-> Executor Slices start)
531 assert(parent.slices_exhausted == true);
532 hpx::lcos::shared_future<void> ret_fut = parent.wrap_async(
533 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
534 launch_counter++;
535 return ret_fut;
536 }
537
540 template <typename T, typename Host_Allocator> T *get(const size_t size) {
541 assert(parent.slices_exhausted == true);
542 T *aggregated_buffer =
543 parent.get<T, Host_Allocator>(size, buffer_counter);
544 buffer_counter++;
545 assert(buffer_counter > 0);
546 return aggregated_buffer;
547 }
548
549 Executor& get_underlying_executor(void) {
550 assert(parent.executor_wrapper);
551 return *(parent.executor_wrapper);
552 }
553 };
554
555 // deprecated name...
556 using Executor_Slice [[deprectated("Renamed: Use executor_slice instead")]] = executor_slice;
557
558 //===============================================================================
559
560 hpx::lcos::local::promise<void> slices_full_promise;
563 std::vector<hpx::lcos::local::promise<executor_slice>> executor_slices;
566 std::deque<aggregated_function_call<Executor>> function_calls;
568 aggregation_mutex_t mut;
569
572 using buffer_entry_t =
573 std::tuple<void*, const size_t, std::atomic<size_t>, bool, const size_t, size_t>;
575 std::deque<buffer_entry_t> buffer_allocations;
577 std::unordered_map<void*,size_t> buffer_allocations_map;
579 aggregation_mutex_t buffer_mut;
580 std::atomic<size_t> buffer_counter = 0;
581
583 template <typename T, typename Host_Allocator>
584 T *get(const size_t size, const size_t slice_alloc_counter) {
585 assert(slices_exhausted == true);
586 assert(executor_wrapper);
587 assert(executor_slices_alive == true);
588 // Add aggreated buffer entry in case it hasn't happened yet for this call
589 // First: Check if it already has happened
590 if (buffer_counter <= slice_alloc_counter) {
591 // we might be the first! Lock...
592 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
593 // ... and recheck
594 if (buffer_counter <= slice_alloc_counter) {
595 constexpr bool manage_content_lifetime = false;
596 buffers_in_use = true;
597
598 // Default location -- useful for GPU builds as we otherwise create way too
599 // many different buffers for different aggregation sizes on different GPUs
600 /* size_t location_id = gpu_id * instances_per_gpu; */
601 // Use integer conversion to only use 0 16 32 ... as buckets
602 size_t location_id = ((hpx::get_worker_thread_num() % cppuddle::number_instances) / 16) * 16;
603#ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
604 if (max_slices == 1) {
605 // get prefered location: aka the current hpx threads location
606 // Usually handy for CPU builds where we want to use the buffers
607 // close to the current CPU core
608 /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */
609 /* location_id = (gpu_id) * instances_per_gpu; */
610 // division makes sure that we always use the same instance to store our gpu buffers.
611 }
612#endif
613 // Get shiny and new buffer that will be shared between all slices
614 // Buffer might be recycled from previous allocations by the
615 // buffer_interface...
616 T *aggregated_buffer =
617 cppuddle::memory_recycling::detail::buffer_interface::get<
618 T, Host_Allocator>(size, manage_content_lifetime, location_id,
619 gpu_id);
620 // Create buffer entry for this buffer
621 buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
622 size, 1, true, location_id, gpu_id);
623
624#ifndef NDEBUG
625 // if previousely used the buffer should not be in usage anymore
626 const auto exists = buffer_allocations_map.count(
627 static_cast<void *>(aggregated_buffer));
628 if (exists > 0) {
629 const auto previous_usage_id =
630 buffer_allocations_map[static_cast<void *>(aggregated_buffer)];
631 const auto &valid =
632 std::get<3>(buffer_allocations[previous_usage_id]);
633 assert(!valid);
634 }
635#endif
636 buffer_allocations_map.insert_or_assign(static_cast<void *>(aggregated_buffer),
637 buffer_counter);
638
639 assert (buffer_counter == slice_alloc_counter);
640 buffer_counter = buffer_allocations.size();
641
642 // Return buffer
643 return aggregated_buffer;
644 }
645 }
646 assert(buffers_in_use == true);
647 assert(std::get<3>(buffer_allocations[slice_alloc_counter])); // valid
648 assert(std::get<2>(buffer_allocations[slice_alloc_counter]) >= 1);
649
650 // Buffer entry should already exist:
651 T *aggregated_buffer = static_cast<T *>(
652 std::get<0>(buffer_allocations[slice_alloc_counter]));
653 // Error handling: Size is wrong?
654 assert(size == std::get<1>(buffer_allocations[slice_alloc_counter]));
655 // Notify that one more slice has visited this buffer allocation
656 std::get<2>(buffer_allocations[slice_alloc_counter])++;
657 return aggregated_buffer;
658 }
659
661 template <typename T, typename Host_Allocator>
662 void mark_unused(T *p, const size_t size) {
663 assert(slices_exhausted == true);
664 assert(executor_wrapper);
665
666 void *ptr_key = static_cast<void*>(p);
667 size_t slice_alloc_counter = buffer_allocations_map[p];
668
669 assert(slice_alloc_counter < buffer_allocations.size());
670 /*auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, valid] =
671 buffer_allocations[slice_alloc_counter];*/
672 auto buffer_pointer_void = std::get<0>(buffer_allocations[slice_alloc_counter]);
673 const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
674 auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
675 auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
676 const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
677 const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]);
678 assert(valid);
679 T *buffer_pointer = static_cast<T *>(buffer_pointer_void);
680
681 assert(buffer_size == size);
682 assert(p == buffer_pointer);
683 // assert(buffer_pointer == p || buffer_pointer == nullptr);
684 // Slice is done with this buffer
685 buffer_allocation_counter--;
686 // Check if all slices are done with this buffer?
687 if (buffer_allocation_counter == 0) {
688 // Yes! "Deallocate" by telling the recylcer the buffer is fit for reusage
689 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
690 // Only mark unused if another buffer has not done so already (and marked
691 // it as invalid)
692 if (valid) {
693 assert(buffers_in_use == true);
694 cppuddle::memory_recycling::detail::buffer_interface::mark_unused<
695 T, Host_Allocator>(buffer_pointer, buffer_size, location_id,
696 gpu_id);
697 // mark buffer as invalid to prevent any other slice from marking the
698 // buffer as unused
699 valid = false;
700
701 const size_t current_deallocs = ++dealloc_counter;
702 if (current_deallocs == buffer_counter) {
703 std::lock_guard<aggregation_mutex_t> guard(mut);
704 buffers_in_use = false;
705 if (!executor_slices_alive && !buffers_in_use) {
706 slices_exhausted = false;
707 // Release executor
708 executor_wrapper.reset(nullptr);
709 }
710 }
711 }
712 }
713 }
714
715 //===============================================================================
716 // Public Interface
717public:
718 hpx::lcos::future<void> current_continuation;
719 hpx::lcos::future<void> last_stream_launch_done;
720 std::atomic<size_t> overall_launch_counter = 0;
721
723 bool sync_aggregation_slices(const size_t slice_launch_counter) {
724 std::lock_guard<aggregation_mutex_t> guard(mut);
725 assert(slices_exhausted == true);
726 assert(executor_wrapper);
727 // Add function call object in case it hasn't happened for this launch yet
728 if (overall_launch_counter <= slice_launch_counter) {
729 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
730 if (overall_launch_counter <= slice_launch_counter) {
731 function_calls.emplace_back(current_slices, false, *executor_wrapper);
732 overall_launch_counter = function_calls.size();
733 return function_calls[slice_launch_counter].sync_aggregation_slices(
734 last_stream_launch_done);
735 }
736 }
737
738 return function_calls[slice_launch_counter].sync_aggregation_slices(
739 last_stream_launch_done);
740 }
741
743 template <typename F, typename... Ts>
744 void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
745 std::lock_guard<aggregation_mutex_t> guard(mut);
746 assert(slices_exhausted == true);
747 assert(executor_wrapper);
748 // Add function call object in case it hasn't happened for this launch yet
749 if (overall_launch_counter <= slice_launch_counter) {
750 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
751 if (overall_launch_counter <= slice_launch_counter) {
752 function_calls.emplace_back(current_slices, false, *executor_wrapper);
753 overall_launch_counter = function_calls.size();
754 function_calls[slice_launch_counter].post_when(
755 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
756 return;
757 }
758 }
759
760 function_calls[slice_launch_counter].post_when(
761 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
762 return;
763 }
764
766 template <typename F, typename... Ts>
767 hpx::lcos::future<void> async(const size_t slice_launch_counter, F &&f,
768 Ts &&...ts) {
769 std::lock_guard<aggregation_mutex_t> guard(mut);
770 assert(slices_exhausted == true);
771 assert(executor_wrapper);
772 // Add function call object in case it hasn't happened for this launch yet
773 if (overall_launch_counter <= slice_launch_counter) {
774 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
775 if (overall_launch_counter <= slice_launch_counter) {
776 function_calls.emplace_back(current_slices, true, *executor_wrapper);
777 overall_launch_counter = function_calls.size();
778 return function_calls[slice_launch_counter].async_when(
779 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
780 }
781 }
782
783 return function_calls[slice_launch_counter].async_when(
784 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
785 }
787 template <typename F, typename... Ts>
788 hpx::lcos::shared_future<void> wrap_async(const size_t slice_launch_counter, F &&f,
789 Ts &&...ts) {
790 std::lock_guard<aggregation_mutex_t> guard(mut);
791 assert(slices_exhausted == true);
792 assert(executor_wrapper);
793 // Add function call object in case it hasn't happened for this launch yet
794 if (overall_launch_counter <= slice_launch_counter) {
795 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
796 if (overall_launch_counter <= slice_launch_counter) {
797 function_calls.emplace_back(current_slices, true, *executor_wrapper);
798 overall_launch_counter = function_calls.size();
799 return function_calls[slice_launch_counter].wrap_async(
800 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
801 }
802 }
803
804 return function_calls[slice_launch_counter].wrap_async(
805 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
806 }
807
808 bool slice_available(void) {
809 std::lock_guard<aggregation_mutex_t> guard(mut);
810 return !slices_exhausted;
811 }
812
813 std::optional<hpx::lcos::future<executor_slice>> request_executor_slice() {
814 std::lock_guard<aggregation_mutex_t> guard(mut);
815 if (!slices_exhausted) {
816 const size_t local_slice_id = ++current_slices;
817 if (local_slice_id == 1) {
818 // Cleanup leftovers from last run if any
819 // TODO still required? Should be clean here already
820 function_calls.clear();
821 overall_launch_counter = 0;
822 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
823#ifndef NDEBUG
824 for (const auto &buffer_entry : buffer_allocations) {
825 const auto &[buffer_pointer_any, buffer_size,
826 buffer_allocation_counter, valid, location_id, device_id] =
827 buffer_entry;
828 assert(!valid);
829 }
830#endif
831 buffer_allocations.clear();
832 buffer_allocations_map.clear();
833 buffer_counter = 0;
834
835 assert(executor_slices_alive == false);
836 assert(buffers_in_use == false);
837 executor_slices_alive = true;
838 buffers_in_use = false;
839 dealloc_counter = 0;
840
841 if (mode == aggregated_executor_modes::STRICT ) {
842 slices_full_promise = hpx::lcos::local::promise<void>{};
843 }
844 }
845
846 // Create Executor Slice future -- that will be returned later
847 hpx::lcos::future<executor_slice> ret_fut;
848 if (local_slice_id < max_slices) {
849 executor_slices.emplace_back(hpx::lcos::local::promise<executor_slice>{});
850 ret_fut =
851 executor_slices[local_slice_id - 1].get_future();
852 } else {
853 launched_slices = current_slices;
854 ret_fut = hpx::make_ready_future(executor_slice{*this,
855 executor_slices.size(), launched_slices, max_slices});
856 }
857
858 // Are we the first slice? If yes, add continuation set the
859 // executor_slice
860 // futures to ready if the launch conditions are met
861 if (local_slice_id == 1) {
862 // Redraw executor
863 assert(!executor_wrapper);
864 cppuddle::executor_recycling::executor_pool::select_device<
865 Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
866 gpu_id);
867 executor_wrapper.reset(
868 new cppuddle::executor_recycling::executor_interface<
869 Executor,
870 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
871 gpu_id));
872 // Renew promise that all slices will be ready as the primary launch
873 // criteria...
874 hpx::lcos::shared_future<void> fut;
875 if (mode == aggregated_executor_modes::EAGER ||
876 mode == aggregated_executor_modes::ENDLESS) {
877 // Fallback launch condidtion: Launch as soon as the underlying stream
878 // is ready
879 /* auto slices_full_fut = slices_full_promise.get_future(); */
880 cppuddle::executor_recycling::executor_pool::select_device<
881 Executor,
882 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(gpu_id);
883 auto exec_fut = (*executor_wrapper).get_future();
884 /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
885 fut = std::move(exec_fut);
886 } else {
887 auto slices_full_fut = slices_full_promise.get_shared_future();
888 // Just use the slices launch condition
889 fut = std::move(slices_full_fut);
890 }
891 // Launch all executor slices within this continuation
892 current_continuation = fut.then([this](auto &&fut) {
893 std::lock_guard<aggregation_mutex_t> guard(mut);
894 slices_exhausted = true;
895 launched_slices = current_slices;
896 size_t id = 0;
897 for (auto &slice_promise : executor_slices) {
898 slice_promise.set_value(
899 executor_slice{*this, id, launched_slices, max_slices});
900 id++;
901 }
902 executor_slices.clear();
903 });
904 }
905 if (local_slice_id >= max_slices &&
906 mode != aggregated_executor_modes::ENDLESS) {
907 slices_exhausted = true; // prevents any more threads from entering
908 // before the continuation is launched
909 /* launched_slices = current_slices; */
910 /* size_t id = 0; */
911 /* for (auto &slice_promise : executor_slices) { */
912 /* slice_promise.set_value( */
913 /* executor_slice{*this, id, launched_slices}); */
914 /* id++; */
915 /* } */
916 /* executor_slices.clear(); */
917 if (mode == aggregated_executor_modes::STRICT ) {
918 slices_full_promise.set_value(); // Trigger slices launch condition continuation
919 }
920 // that continuation will set all executor slices so far handed out to ready
921 }
922 return ret_fut;
923 } else {
924 // Return empty optional as failure
925 return std::optional<hpx::lcos::future<executor_slice>>{};
926 }
927 }
928 size_t launched_slices;
929 void reduce_usage_counter(void) {
930 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
931 assert(slices_exhausted == true);
932 assert(executor_wrapper);
933 assert(executor_slices_alive == true);
934 assert(launched_slices >= 1);
935 assert(current_slices >= 0 && current_slices <= launched_slices);
936 const size_t local_slice_id = --current_slices;
937 // Last slice goes out scope?
938 if (local_slice_id == 0) {
939 // Mark executor fit for reusage
940 std::lock_guard<aggregation_mutex_t> guard(mut);
941 executor_slices_alive = false;
942 if (!executor_slices_alive && !buffers_in_use) {
943 // Release executor
944 slices_exhausted = false;
945 executor_wrapper.reset(nullptr);
946 }
947 }
948 }
949 ~aggregated_executor(void) {
950
951 assert(current_slices == 0);
952 assert(executor_slices_alive == false);
953 assert(buffers_in_use == false);
954
955 if (mode != aggregated_executor_modes::STRICT ) {
956 slices_full_promise.set_value(); // Trigger slices launch condition continuation
957 }
958
959 // Cleanup leftovers from last run if any
960 function_calls.clear();
961 overall_launch_counter = 0;
962#ifndef NDEBUG
963 for (const auto &buffer_entry : buffer_allocations) {
964 const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
965 valid, location_id, device_id] = buffer_entry;
966 assert(!valid);
967 }
968#endif
969 buffer_allocations.clear();
970 buffer_allocations_map.clear();
971 buffer_counter = 0;
972
973 assert(buffer_allocations.empty());
974 assert(buffer_allocations_map.empty());
975 }
976
977 aggregated_executor(const size_t number_slices,
978 aggregated_executor_modes mode, const size_t gpu_id = 0)
979 : max_slices(number_slices), current_slices(0), slices_exhausted(false),
980 dealloc_counter(0), mode(mode), executor_slices_alive(false),
981 buffers_in_use(false), gpu_id(gpu_id),
982 executor_wrapper(nullptr),
983 current_continuation(hpx::make_ready_future()),
984 last_stream_launch_done(hpx::make_ready_future()) {}
985 // Not meant to be copied or moved
986 aggregated_executor(const aggregated_executor &other) = delete;
987 aggregated_executor &operator=(const aggregated_executor &other) = delete;
988 aggregated_executor(aggregated_executor &&other) = delete;
989 aggregated_executor &operator=(aggregated_executor &&other) = delete;
990};
991
992template <typename T, typename Host_Allocator, typename Executor>
993class allocator_slice {
994private:
995 typename aggregated_executor<Executor>::executor_slice &executor_reference;
996 aggregated_executor<Executor> &executor_parent;
997
998public:
999 using value_type = T;
1000 allocator_slice(
1001 typename aggregated_executor<Executor>::executor_slice &executor)
1002 : executor_reference(executor), executor_parent(executor.parent) {}
1003 template <typename U>
1004 explicit allocator_slice(
1005 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {}
1006 T *allocate(std::size_t n) {
1007 T *data = executor_reference.template get<T, Host_Allocator>(n);
1008 return data;
1009 }
1010 void deallocate(T *p, std::size_t n) {
1011 /* executor_reference.template mark_unused<T, Host_Allocator>(p, n); */
1012 executor_parent.template mark_unused<T, Host_Allocator>(p, n);
1013 }
1014 template <typename... Args>
1015 inline void construct(T *p, Args... args) noexcept {
1016 // Do nothing here - we reuse the content of the last owner
1017 }
1018 void destroy(T *p) {
1019 // Do nothing here - Contents will be destroyed when the buffer manager is
1020 // destroyed, not before
1021 }
1022};
1023template <typename T, typename U, typename Host_Allocator, typename Executor>
1024constexpr bool
1025operator==(allocator_slice<T, Host_Allocator, Executor> const &,
1026 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
1027 return false;
1028}
1029template <typename T, typename U, typename Host_Allocator, typename Executor>
1030constexpr bool
1031operator!=(allocator_slice<T, Host_Allocator, Executor> const &,
1032 allocator_slice<U, Host_Allocator, Executor> const &) noexcept {
1033 return true;
1034}
1035
1036} // namespace detail
1037} // namespace kernel_aggregation
1038} // namespace cppuddle
1039
1040
1041
1042namespace CPPUDDLE_HPX_EXECUTOR_SPECIALIZATION_NS {
1043 // TODO Unfortunately does not work that way! Create trait that works for Executor Slices with
1044 // compatible underlying executor types
1045 /* template<typename E> */
1046 /* struct is_one_way_executor<typename aggregated_executor<E>::executor_slice> */
1047 /* : std::true_type */
1048 /* {}; */
1049 /* template<typename E> */
1050 /* struct is_two_way_executor<typename aggregated_executor<E>::executor_slice> */
1051 /* : std::true_type */
1052 /* {}; */
1053
1054#if defined(HPX_HAVE_CUDA) || defined(HPX_HAVE_HIP)
1055 // Workaround for the meantime: Manually create traits for compatible types:
1056template <>
1057struct is_one_way_executor<
1058 typename cppuddle::kernel_aggregation::detail::aggregated_executor<
1059 hpx::cuda::experimental::cuda_executor>::executor_slice>
1060 : std::true_type {};
1061template <>
1062struct is_two_way_executor<
1063 typename cppuddle::kernel_aggregation::detail::aggregated_executor<
1064 hpx::cuda::experimental::cuda_executor>::executor_slice>
1065 : std::true_type {};
1066#endif
1067}
1068
1069#endif
Slice class - meant as a scope interface to the aggregated executor.
Definition aggregation_executors_and_allocators.hpp:420