420 class executor_slice {
422 aggregated_executor<Executor> &parent;
427 size_t launch_counter{0};
428 size_t buffer_counter{0};
429 bool notify_parent_about_destruction{true};
434 const size_t number_slices;
437 using executor_t = Executor;
438 executor_slice(aggregated_executor &parent, const size_t slice_id,
439 const size_t number_slices, const size_t max_number_slices)
440 : parent(parent), notify_parent_about_destruction(true),
441 number_slices(number_slices), id(slice_id), max_slices(max_number_slices) {
442 assert(parent.max_slices == max_slices);
443 assert(number_slices >= 1);
444 assert(number_slices <= max_slices);
446 ~executor_slice(void) {
447 // Don't notify parent if we moved away from this executor_slice
448 if (notify_parent_about_destruction) {
449 // Executor should be done by the time of destruction
450 // -> check here before notifying parent
452 assert(parent.max_slices == max_slices);
453 assert(number_slices >= 1);
454 assert(number_slices <= max_slices);
455 // parent still in execution mode?
456 assert(parent.slices_exhausted == true);
457 // all kernel launches done?
458 assert(launch_counter == parent.function_calls.size());
459 // Notifiy parent that this aggregation slice is one
460 parent.reduce_usage_counter();
463 executor_slice(const executor_slice &other) = delete;
464 executor_slice &operator=(const executor_slice &other) = delete;
465 executor_slice(executor_slice &&other)
466 : parent(other.parent), launch_counter(std::move(other.launch_counter)),
467 buffer_counter(std::move(other.buffer_counter)),
468 number_slices(std::move(other.number_slices)),
469 id(std::move(other.id)), max_slices(std::move(other.max_slices)) {
470 other.notify_parent_about_destruction = false;
472 executor_slice &operator=(executor_slice &&other) {
473 parent = other.parent;
474 launch_counter = std::move(other.launch_counter);
475 buffer_counter = std::move(other.buffer_counter);
476 number_slices = std::move(other.number_slices);
477 id = std::move(other.id);
478 max_slices = std::move(other.max_slices);
479 other.notify_parent_about_destruction = false;
481 template <typename T, typename Host_Allocator>
482 allocator_slice<T, Host_Allocator, Executor> make_allocator() {
483 return allocator_slice<T, Host_Allocator, Executor>(*this);
485 bool sync_aggregation_slices() {
486 assert(parent.slices_exhausted == true);
487 auto ret = parent.sync_aggregation_slices(launch_counter);
491 template <typename F, typename... Ts> void post(F &&f, Ts &&...ts) {
492 // we should only execute function calls once all slices
493 // have been given away (-> Executor Slices start)
494 assert(parent.slices_exhausted == true);
495 parent.post(launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
498 template <typename F, typename... Ts>
499 hpx::lcos::future<void> async(F &&f, Ts &&...ts) {
500 // we should only execute function calls once all slices
501 // have been given away (-> Executor Slices start)
502 assert(parent.slices_exhausted == true);
503 hpx::lcos::future<void> ret_fut = parent.async(
504 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
510 template <typename F, typename... Ts>
511 friend decltype(auto) tag_invoke(hpx::parallel::execution::post_t,
512 executor_slice& exec, F&& f, Ts&&... ts)
514 return exec.post(std::forward<F>(f), std::forward<Ts>(ts)...);
518 template <typename F, typename... Ts>
519 friend decltype(auto) tag_invoke(
520 hpx::parallel::execution::async_execute_t, executor_slice& exec,
524 std::forward<F>(f), std::forward<Ts>(ts)...);
527 template <typename F, typename... Ts>
528 hpx::lcos::shared_future<void> wrap_async(F &&f, Ts &&...ts) {
529 // we should only execute function calls once all slices
530 // have been given away (-> Executor Slices start)
531 assert(parent.slices_exhausted == true);
532 hpx::lcos::shared_future<void> ret_fut = parent.wrap_async(
533 launch_counter, std::forward<F>(f), std::forward<Ts>(ts)...);
540 template <typename T, typename Host_Allocator> T *get(const size_t size) {
541 assert(parent.slices_exhausted == true);
542 T *aggregated_buffer =
543 parent.get<T, Host_Allocator>(size, buffer_counter);
545 assert(buffer_counter > 0);
546 return aggregated_buffer;
549 Executor& get_underlying_executor(void) {
550 assert(parent.executor_wrapper);
551 return *(parent.executor_wrapper);
584 T *get(const size_t size, const size_t slice_alloc_counter) {
585 assert(slices_exhausted == true);
586 assert(executor_wrapper);
587 assert(executor_slices_alive == true);
588 // Add aggreated buffer entry in case it hasn't happened yet for this call
589 // First: Check if it already has happened
590 if (buffer_counter <= slice_alloc_counter) {
591 // we might be the first! Lock...
592 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
594 if (buffer_counter <= slice_alloc_counter) {
595 constexpr bool manage_content_lifetime = false;
596 buffers_in_use = true;
598 // Default location -- useful for GPU builds as we otherwise create way too
599 // many different buffers for different aggregation sizes on different GPUs
600 /* size_t location_id = gpu_id * instances_per_gpu; */
601 // Use integer conversion to only use 0 16 32 ... as buckets
602 size_t location_id = ((hpx::get_worker_thread_num() % cppuddle::number_instances) / 16) * 16;
603#ifdef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
604 if (max_slices == 1) {
605 // get prefered location: aka the current hpx threads location
606 // Usually handy for CPU builds where we want to use the buffers
607 // close to the current CPU core
608 /* location_id = (hpx::get_worker_thread_num() / instances_per_gpu) * instances_per_gpu; */
609 /* location_id = (gpu_id) * instances_per_gpu; */
610 // division makes sure that we always use the same instance to store our gpu buffers.
613 // Get shiny and new buffer that will be shared between all slices
614 // Buffer might be recycled from previous allocations by the
615 // buffer_interface...
616 T *aggregated_buffer =
617 cppuddle::memory_recycling::detail::buffer_interface::get<
618 T, Host_Allocator>(size, manage_content_lifetime, location_id,
620 // Create buffer entry for this buffer
621 buffer_allocations.emplace_back(static_cast<void *>(aggregated_buffer),
622 size, 1, true, location_id, gpu_id);
625 // if previousely used the buffer should not be in usage anymore
626 const auto exists = buffer_allocations_map.count(
627 static_cast<void *>(aggregated_buffer));
629 const auto previous_usage_id =
630 buffer_allocations_map[static_cast<void *>(aggregated_buffer)];
632 std::get<3>(buffer_allocations[previous_usage_id]);
636 buffer_allocations_map.insert_or_assign(static_cast<void *>(aggregated_buffer),
639 assert (buffer_counter == slice_alloc_counter);
640 buffer_counter = buffer_allocations.size();
643 return aggregated_buffer;
646 assert(buffers_in_use == true);
647 assert(std::get<3>(buffer_allocations[slice_alloc_counter])); // valid
648 assert(std::get<2>(buffer_allocations[slice_alloc_counter]) >= 1);
650 // Buffer entry should already exist:
651 T *aggregated_buffer = static_cast<T *>(
652 std::get<0>(buffer_allocations[slice_alloc_counter]));
653 // Error handling: Size is wrong?
654 assert(size == std::get<1>(buffer_allocations[slice_alloc_counter]));
655 // Notify that one more slice has visited this buffer allocation
656 std::get<2>(buffer_allocations[slice_alloc_counter])++;
657 return aggregated_buffer;
662 void mark_unused(T *p, const size_t size) {
663 assert(slices_exhausted == true);
664 assert(executor_wrapper);
666 void *ptr_key = static_cast<void*>(p);
667 size_t slice_alloc_counter = buffer_allocations_map[p];
669 assert(slice_alloc_counter < buffer_allocations.size());
670 /*auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter, valid] =
671 buffer_allocations[slice_alloc_counter];*/
672 auto buffer_pointer_void = std::get<0>(buffer_allocations[slice_alloc_counter]);
673 const auto buffer_size = std::get<1>(buffer_allocations[slice_alloc_counter]);
674 auto &buffer_allocation_counter = std::get<2>(buffer_allocations[slice_alloc_counter]);
675 auto &valid = std::get<3>(buffer_allocations[slice_alloc_counter]);
676 const auto &location_id = std::get<4>(buffer_allocations[slice_alloc_counter]);
677 const auto &gpu_id = std::get<5>(buffer_allocations[slice_alloc_counter]);
679 T *buffer_pointer = static_cast<T *>(buffer_pointer_void);
681 assert(buffer_size == size);
682 assert(p == buffer_pointer);
683 // assert(buffer_pointer == p || buffer_pointer == nullptr);
684 // Slice is done with this buffer
685 buffer_allocation_counter--;
686 // Check if all slices are done with this buffer?
687 if (buffer_allocation_counter == 0) {
688 // Yes! "Deallocate
" by telling the recylcer the buffer is fit for reusage
689 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
690 // Only mark unused if another buffer has not done so already (and marked
693 assert(buffers_in_use == true);
694 cppuddle::memory_recycling::detail::buffer_interface::mark_unused<
695 T, Host_Allocator>(buffer_pointer, buffer_size, location_id,
697 // mark buffer as invalid to prevent any other slice from marking the
701 const size_t current_deallocs = ++dealloc_counter;
702 if (current_deallocs == buffer_counter) {
703 std::lock_guard<aggregation_mutex_t> guard(mut);
704 buffers_in_use = false;
705 if (!executor_slices_alive && !buffers_in_use) {
706 slices_exhausted = false;
708 executor_wrapper.reset(nullptr);
723 bool sync_aggregation_slices(const size_t slice_launch_counter) {
724 std::lock_guard<aggregation_mutex_t> guard(mut);
725 assert(slices_exhausted == true);
726 assert(executor_wrapper);
727 // Add function call object in case it hasn't happened for this launch yet
728 if (overall_launch_counter <= slice_launch_counter) {
729 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
730 if (overall_launch_counter <= slice_launch_counter) {
731 function_calls.emplace_back(current_slices, false, *executor_wrapper);
732 overall_launch_counter = function_calls.size();
733 return function_calls[slice_launch_counter].sync_aggregation_slices(
734 last_stream_launch_done);
738 return function_calls[slice_launch_counter].sync_aggregation_slices(
739 last_stream_launch_done);
744 void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
745 std::lock_guard<aggregation_mutex_t> guard(mut);
746 assert(slices_exhausted == true);
747 assert(executor_wrapper);
748 // Add function call object in case it hasn't happened for this launch yet
749 if (overall_launch_counter <= slice_launch_counter) {
750 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
751 if (overall_launch_counter <= slice_launch_counter) {
752 function_calls.emplace_back(current_slices, false, *executor_wrapper);
753 overall_launch_counter = function_calls.size();
754 function_calls[slice_launch_counter].post_when(
755 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
760 function_calls[slice_launch_counter].post_when(
761 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
767 hpx::lcos::future<void> async(const size_t slice_launch_counter, F &&f,
769 std::lock_guard<aggregation_mutex_t> guard(mut);
770 assert(slices_exhausted == true);
771 assert(executor_wrapper);
772 // Add function call object in case it hasn't happened for this launch yet
773 if (overall_launch_counter <= slice_launch_counter) {
774 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
775 if (overall_launch_counter <= slice_launch_counter) {
776 function_calls.emplace_back(current_slices, true, *executor_wrapper);
777 overall_launch_counter = function_calls.size();
778 return function_calls[slice_launch_counter].async_when(
779 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
783 return function_calls[slice_launch_counter].async_when(
784 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
788 hpx::lcos::shared_future<void> wrap_async(const size_t slice_launch_counter, F &&f,
790 std::lock_guard<aggregation_mutex_t> guard(mut);
791 assert(slices_exhausted == true);
792 assert(executor_wrapper);
793 // Add function call object in case it hasn't happened for this launch yet
794 if (overall_launch_counter <= slice_launch_counter) {
795 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
796 if (overall_launch_counter <= slice_launch_counter) {
797 function_calls.emplace_back(current_slices, true, *executor_wrapper);
798 overall_launch_counter = function_calls.size();
799 return function_calls[slice_launch_counter].wrap_async(
800 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
804 return function_calls[slice_launch_counter].wrap_async(
805 last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
813 std::optional<hpx::lcos::future<executor_slice>> request_executor_slice() {
814 std::lock_guard<aggregation_mutex_t> guard(mut);
815 if (!slices_exhausted) {
816 const size_t local_slice_id = ++current_slices;
817 if (local_slice_id == 1) {
818 // Cleanup leftovers from last run if any
819 // TODO still required? Should be clean here already
820 function_calls.clear();
821 overall_launch_counter = 0;
822 std::lock_guard<aggregation_mutex_t> guard(buffer_mut);
824 for (const auto &buffer_entry : buffer_allocations) {
825 const auto &[buffer_pointer_any, buffer_size,
826 buffer_allocation_counter, valid, location_id, device_id] =
831 buffer_allocations.clear();
832 buffer_allocations_map.clear();
835 assert(executor_slices_alive == false);
836 assert(buffers_in_use == false);
837 executor_slices_alive = true;
838 buffers_in_use = false;
841 if (mode == aggregated_executor_modes::STRICT ) {
842 slices_full_promise = hpx::lcos::local::promise<void>{};
846 // Create Executor Slice future -- that will be returned later
847 hpx::lcos::future<executor_slice> ret_fut;
848 if (local_slice_id < max_slices) {
849 executor_slices.emplace_back(hpx::lcos::local::promise<executor_slice>{});
851 executor_slices[local_slice_id - 1].get_future();
853 launched_slices = current_slices;
854 ret_fut = hpx::make_ready_future(executor_slice{*this,
855 executor_slices.size(), launched_slices, max_slices});
858 // Are we the first slice? If yes, add continuation set the
860 // futures to ready if the launch conditions are met
861 if (local_slice_id == 1) {
863 assert(!executor_wrapper);
864 cppuddle::executor_recycling::executor_pool::select_device<
865 Executor, cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
867 executor_wrapper.reset(
868 new cppuddle::executor_recycling::executor_interface<
870 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(
872 // Renew promise that all slices will be ready as the primary launch
874 hpx::lcos::shared_future<void> fut;
875 if (mode == aggregated_executor_modes::EAGER ||
876 mode == aggregated_executor_modes::ENDLESS) {
877 // Fallback launch condidtion: Launch as soon as the underlying stream
879 /* auto slices_full_fut = slices_full_promise.get_future(); */
880 cppuddle::executor_recycling::executor_pool::select_device<
882 cppuddle::executor_recycling::round_robin_pool_impl<Executor>>(gpu_id);
883 auto exec_fut = (*executor_wrapper).get_future();
884 /* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
885 fut = std::move(exec_fut);
887 auto slices_full_fut = slices_full_promise.get_shared_future();
888 // Just use the slices launch condition
889 fut = std::move(slices_full_fut);
891 // Launch all executor slices within this continuation
892 current_continuation = fut.then([this](auto &&fut) {
893 std::lock_guard<aggregation_mutex_t> guard(mut);
894 slices_exhausted = true;
895 launched_slices = current_slices;
897 for (auto &slice_promise : executor_slices) {
898 slice_promise.set_value(
899 executor_slice{*this, id, launched_slices, max_slices});
902 executor_slices.clear();
905 if (local_slice_id >= max_slices &&
906 mode != aggregated_executor_modes::ENDLESS) {
907 slices_exhausted = true; // prevents any more threads from entering
908 // before the continuation is launched
909 /* launched_slices = current_slices; */
911 /* for (auto &slice_promise : executor_slices) { */
912 /* slice_promise.set_value( */
913 /* executor_slice{*this, id, launched_slices}); */
916 /* executor_slices.clear(); */
917 if (mode == aggregated_executor_modes::STRICT ) {
918 slices_full_promise.set_value(); // Trigger slices launch condition continuation
920 // that continuation will set all executor slices so far handed out to ready
924 // Return empty optional as failure
925 return std::optional<hpx::lcos::future<executor_slice>>{};
929 void reduce_usage_counter(void) {
930 /* std::lock_guard<aggregation_mutex_t> guard(mut); */
931 assert(slices_exhausted == true);
932 assert(executor_wrapper);
933 assert(executor_slices_alive == true);
934 assert(launched_slices >= 1);
935 assert(current_slices >= 0 && current_slices <= launched_slices);
936 const size_t local_slice_id = --current_slices;
937 // Last slice goes out scope?
938 if (local_slice_id == 0) {
939 // Mark executor fit for reusage
940 std::lock_guard<aggregation_mutex_t> guard(mut);
941 executor_slices_alive = false;
942 if (!executor_slices_alive && !buffers_in_use) {
944 slices_exhausted = false;
945 executor_wrapper.reset(nullptr);
949 ~aggregated_executor(void) {
951 assert(current_slices == 0);
952 assert(executor_slices_alive == false);
953 assert(buffers_in_use == false);
955 if (mode != aggregated_executor_modes::STRICT ) {
956 slices_full_promise.set_value(); // Trigger slices launch condition continuation
959 // Cleanup leftovers from last run if any
960 function_calls.clear();
961 overall_launch_counter = 0;
963 for (const auto &buffer_entry : buffer_allocations) {
964 const auto &[buffer_pointer_any, buffer_size, buffer_allocation_counter,
965 valid, location_id, device_id] = buffer_entry;
969 buffer_allocations.clear();
970 buffer_allocations_map.clear();
973 assert(buffer_allocations.empty());
974 assert(buffer_allocations_map.empty());
977 aggregated_executor(const size_t number_slices,
978 aggregated_executor_modes mode, const size_t gpu_id = 0)
979 : max_slices(number_slices), current_slices(0), slices_exhausted(false),
980 dealloc_counter(0), mode(mode), executor_slices_alive(false),
981 buffers_in_use(false), gpu_id(gpu_id),
982 executor_wrapper(nullptr),
983 current_continuation(hpx::make_ready_future()),
984 last_stream_launch_done(hpx::make_ready_future()) {}