diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index e4a000e..865113b 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -72,6 +72,8 @@ if(BUILD_CUDA AND BUILD_TBB) target_link_libraries(alien_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb) add_executable(alien_delegate alien_delegate.cpp) target_link_libraries(alien_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb) + add_executable(alien_event_poll alien_event_poll.cpp) + target_link_libraries(alien_event_poll PRIVATE CoroutineTests CUDA::cudart TBB::tbb) if(BUILD_STDEXEC) add_executable(exec_reco_stdexec exec_reco.cpp) target_link_libraries(exec_reco_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec) @@ -83,5 +85,7 @@ if(BUILD_CUDA AND BUILD_TBB) target_link_libraries(capy_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy) add_executable(capy_delegate capy_delegate.cpp) target_link_libraries(capy_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy) + add_executable(capy_event_poll capy_event_poll.cpp) + target_link_libraries(capy_event_poll PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy) endif() endif() diff --git a/examples/README.md b/examples/README.md index 943bbe5..67b1f58 100644 --- a/examples/README.md +++ b/examples/README.md @@ -172,4 +172,10 @@ These examples show a setup and coroutine chain loosely inspired by track recons Link: [alien_delegate.cpp](alien_delegate.cpp), [exec_delegate.cpp](exec_delegate.cpp), [capy_delegate.cpp](capy_delegate.cpp) -These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors. +These examples are a modification of [reconstruction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors. + +## Event poll + +Link: [alien_event_poll.cpp](alien_event_poll.cpp), [capy_event_poll.cpp](capy_event_poll.cpp) + +These examples are a variant of [delegate examples](#delegate) in which awaiting completion of CUDA operations is done by repeatedly querying the event state instead of using a callback as in earlier examples. All the queries are executed by a specific thread. diff --git a/examples/alien_event_poll.cpp b/examples/alien_event_poll.cpp new file mode 100644 index 0000000..1b49d7e --- /dev/null +++ b/examples/alien_event_poll.cpp @@ -0,0 +1,303 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "CoroutineTests/alien/counting_scope.hpp" +#include "CoroutineTests/alien/schedule_on.hpp" +#include "CoroutineTests/alien/subtool.hpp" +#include "CoroutineTests/alien/sync_wait.hpp" +#include "CoroutineTests/alien/tool.hpp" +#include "CoroutineTests/threadpool.hpp" +#include "logging_utils.hpp" // log, format_name + +#define ERROR_CHECK_CUDA(EXP) \ + do { \ + cudaError_t errorCode = EXP; \ + if (errorCode != cudaSuccess) { \ + throw std::runtime_error( \ + std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \ + cudaGetErrorString(errorCode))); \ + } \ + } while (false) + +using namespace CoroutineTests::alien; + +template +struct DeviceBuffer { + T* ptr = nullptr; + std::size_t size = 0; +}; + +template +tool::Task delegate(F f) { + f(); + co_return; +} + +struct Retry { + bool await_ready() const noexcept { return false; } + template + void await_suspend(std::coroutine_handle handle) const { + handle.promise().reschedule(); + } + void await_resume() const noexcept {} +}; + +tool::Task poll(cudaEvent_t event) { + auto status = cudaSuccess; + log() << "Polling for event completion..." << std::endl; + while ((status = cudaEventQuery(event)) == cudaErrorNotReady) { + log() << "Event not ready, retrying..." << std::endl; + co_await Retry{}; + } + ERROR_CHECK_CUDA(status); +} + +subtool::Task> clusterization( + DeviceBuffer cells, cudaStream_t stream, cudaEvent_t event, + std::function)> delegation_scheduler, + std::string_view parent) { + + const auto self = format_name(parent, "clusterization"); + log(self) << "Starting clusterization" << std::endl; + + const auto nCells = static_cast(cells.size); + + // Copy cells back to host to count non-zero entries + auto h_cells = std::vector(nCells); + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated copy of cells from device to host" + << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr, + nCells * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await schedule_on(delegation_scheduler, poll(event)); + + auto nClusters = 0; + for (auto v : h_cells) + if (v != 0) + ++nClusters; + + log(self) << "Found " << nClusters << " clusters" << std::endl; + + // Allocate clusters of appropriate size on device + int* d_clusters = nullptr; + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of clusters on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&d_clusters), + nClusters * sizeof(int), stream)); + + // Write some dummy data to the clusters buffer to simulate work + ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 0, + nClusters * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync( + d_clusters, 1, nClusters / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_clusters, + static_cast(nClusters)}; +} + +subtool::Task> seeding( + DeviceBuffer clusters, cudaStream_t stream, cudaEvent_t event, + std::function)> delegation_scheduler, + std::string_view parent) { + + const auto self = format_name(parent, "seeding"); + log(self) << "Starting seeding" << std::endl; + + const auto nClusters = static_cast(clusters.size); + + // Copy clusters to host to count non-zero entries + auto h_clusters = std::vector(nClusters); + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated copy of clusters to host" << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, + nClusters * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await schedule_on(delegation_scheduler, poll(event)); + + int nSeeds = 0; + for (auto v : h_clusters) + if (v != 0) + ++nSeeds; + + log(self) << "Found " << nSeeds << " seeds" << std::endl; + + // Allocate seeds of appropriate size on device + int* d_seeds = nullptr; + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of seeds on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), + nSeeds * sizeof(int), stream)); + + // Write some dummy data to the seeds buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_seeds, static_cast(nSeeds)}; +} + +tool::Task reconstruct( + cudaStream_t stream, cudaEvent_t event, + std::function)> delegation_scheduler, + std::string_view parent) { + const auto self = format_name(parent, "reconstruction"); + log(self) << "Starting reconstruction" << std::endl; + + // Allocate some dummy input data on the device + auto cells = DeviceBuffer{nullptr, 1000}; + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of input data on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&cells.ptr), + cells.size * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync(cells.ptr, 1, + cells.size * sizeof(int), stream)); + })); + + // Run the clusterization and seeding steps + auto clusters = co_await clusterization(cells, stream, event, + delegation_scheduler, self); + auto seeds = + co_await seeding(clusters, stream, event, delegation_scheduler, self); + + // Cleanup + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated cleanup of device memory" << std::endl; + ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await schedule_on(delegation_scheduler, poll(event)); + + log(self) << "Finishing reconstruction" << std::endl; + co_return tool::StatusCode::SUCCESS; +} + +int main() { + int deviceCount = 0; + auto error_id = cudaGetDeviceCount(&deviceCount); + + if (error_id != cudaSuccess) { + std::cout << "cudaGetDeviceCount returned " + << static_cast(error_id) << "\n" + << cudaGetErrorString(error_id) << "\n"; + return EXIT_FAILURE; + } + + if (deviceCount == 0) { + std::cout << "No CUDA devices found.\n"; + return EXIT_FAILURE; + } + + log() << "main Starting" << std::endl; + + tbb::task_arena task_arena{2, 0}; + auto scheduler = [&task_arena](std::coroutine_handle<> handle) { + task_arena.enqueue([handle]() { handle.resume(); }); + }; + + CoroutineTests::Threadpool delegation_threadpool(1); + auto delegation_scheduler = + [&delegation_threadpool](std::coroutine_handle<> handle) { + delegation_threadpool.enqueue_task(handle); + }; + + { + cudaStream_t stream; + ERROR_CHECK_CUDA(cudaStreamCreate(&stream)); + cudaEvent_t event; + ERROR_CHECK_CUDA( + cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); + std::cout << "--- Single event, synchronous wait for completion ---\n"; + log() << "main Launching algorithm..." << std::endl; + auto status = + sync_wait(scheduler, + reconstruct(stream, event, delegation_scheduler, "main")); + log() << "main Final status of algorithm " << status << "" << std::endl; + ERROR_CHECK_CUDA(cudaEventDestroy(event)); + ERROR_CHECK_CUDA(cudaStreamDestroy(stream)); + } + + { + std::cout << "--- Multiple events, wait for all to complete ---\n"; + + auto streams = std::vector(2); + auto events = std::vector(streams.size()); + auto status = std::vector(streams.size()); + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaStreamCreate(&streams.at(i))); + ERROR_CHECK_CUDA(cudaEventCreateWithFlags(&events.at(i), + cudaEventDisableTiming)); + } + + auto scope = counting_scope{}; + log() << "main Launching algorithms..." << std::endl; + + auto payload = [](std::vector streams, + std::vector events, + std::vector& statuses, + std::function)> + delegation_scheduler, + int i) -> tool::Task { + const auto name = std::format("event{}:main", i); + auto& stream = streams.at(i); + auto& event = events.at(i); + auto& status = statuses.at(i); + status = co_await reconstruct( + stream, event, std::move(delegation_scheduler), name); + co_return; + }; + + for (std::size_t i = 0; i < streams.size(); ++i) { + scope.spawn(scheduler, + payload(streams, events, status, delegation_scheduler, + static_cast(i))); + } + scope.join(); + + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaEventDestroy(events.at(i))); + ERROR_CHECK_CUDA(cudaStreamDestroy(streams.at(i))); + } + + log() << "main All algorithms completed. Final statuses: "; + for (auto s : status) { + std::cout << s << ' '; + } + std::cout << std::endl; + } + return 0; +} diff --git a/examples/capy_event_poll.cpp b/examples/capy_event_poll.cpp new file mode 100644 index 0000000..099101c --- /dev/null +++ b/examples/capy_event_poll.cpp @@ -0,0 +1,281 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "capy_task_arena_executor.hpp" // TaskArenaExecutor +#include "logging_utils.hpp" // log, format_name +#include "statuscode.hpp" // StatusCodeImpl + +namespace tools { +struct Tag { + static constexpr const char* name = "tools"; +}; +using StatusCode = StatusCodeImpl; +} // namespace tools + +#define ERROR_CHECK_CUDA(EXP) \ + do { \ + cudaError_t errorCode = EXP; \ + if (errorCode != cudaSuccess) { \ + throw std::runtime_error( \ + std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \ + cudaGetErrorString(errorCode))); \ + } \ + } while (false) + +template +struct DeviceBuffer { + T* ptr = nullptr; + std::size_t size = 0; +}; + +template +boost::capy::task delegate(F&& f) { + std::forward(f)(); + co_return; +} + +struct Retry { + bool await_ready() const noexcept { return false; } + void await_suspend(std::coroutine_handle<> handle, + boost::capy::io_env const* env) const noexcept { + env->executor.post(handle); + } + void await_resume() const noexcept {} +}; + +boost::capy::task poll(cudaEvent_t event) { + auto status = cudaSuccess; + log() << "Polling for event completion..." << std::endl; + while ((status = cudaEventQuery(event)) == cudaErrorNotReady) { + log() << "Event not ready, retrying..." << std::endl; + co_await Retry{}; + } + ERROR_CHECK_CUDA(status); +} + +boost::capy::task> clusterization( + DeviceBuffer cells, cudaStream_t stream, cudaEvent_t event, + boost::capy::thread_pool& delegation_thread, std::string_view parent) { + + const auto self = format_name(parent, "clusterization"); + log(self) << "Starting clusterization" << std::endl; + + const auto nCells = static_cast(cells.size); + + // Copy cells back to host to count non-zero entries + auto h_cells = std::vector(nCells); + + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated copy of cells from device to host" << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr, + nCells * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + + auto nClusters = 0; + for (auto v : h_cells) + if (v != 0) + ++nClusters; + + log(self) << "Found " << nClusters << " clusters" << std::endl; + + // Allocate clusters of appropriate size on device + int* d_clusters = nullptr; + + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated allocation of clusters on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_clusters), + nClusters * sizeof(int), stream)); + + // Write some dummy data to the clusters buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_clusters, 0, nClusters * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 1, + nClusters / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_clusters, + static_cast(nClusters)}; +} + +boost::capy::task> seeding( + DeviceBuffer clusters, cudaStream_t stream, cudaEvent_t event, + boost::capy::thread_pool& delegation_thread, std::string_view parent) { + + const auto self = format_name(parent, "seeding"); + log(self) << "Starting seeding" << std::endl; + + const auto nClusters = static_cast(clusters.size); + + // Copy clusters to host to count non-zero entries + auto h_clusters = std::vector(nClusters); + + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated copy of clusters to host" << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, + nClusters * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + + int nSeeds = 0; + for (auto v : h_clusters) + if (v != 0) + ++nSeeds; + + log(self) << "Found " << nSeeds << " seeds" << std::endl; + + // Allocate seeds of appropriate size on device + int* d_seeds = nullptr; + + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated allocation of seeds on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), + nSeeds * sizeof(int), stream)); + + // Write some dummy data to the seeds buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_seeds, static_cast(nSeeds)}; +} + +boost::capy::task reconstruct( + cudaStream_t stream, cudaEvent_t event, + boost::capy::thread_pool& delegation_thread, std::string parent) { + const auto self = format_name(parent, "reconstruction"); + log(self) << "Starting reconstruction" << std::endl; + + // Allocate some dummy input data on the device + auto cells = DeviceBuffer{nullptr, 1000}; + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated allocation of input data on device" + << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&cells.ptr), + cells.size * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(cells.ptr, 1, cells.size * sizeof(int), stream)); + })); + + // Run the clusterization and seeding steps + auto clusters = + co_await clusterization(cells, stream, event, delegation_thread, self); + auto seeds = + co_await seeding(clusters, stream, event, delegation_thread, self); + + // Cleanup + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated cleanup of device memory" << std::endl; + ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + })); + + co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + + log(self) << "Finishing reconstruction" << std::endl; + co_return tools::StatusCode::SUCCESS; +} + +int main() { + int deviceCount = 0; + auto error_id = cudaGetDeviceCount(&deviceCount); + + if (error_id != cudaSuccess) { + std::cout << "cudaGetDeviceCount returned " + << static_cast(error_id) << "\n" + << cudaGetErrorString(error_id) << "\n"; + return EXIT_FAILURE; + } + + if (deviceCount == 0) { + std::cout << "No CUDA devices found.\n"; + return EXIT_FAILURE; + } + + log() << "main Starting" << std::endl; + + auto task_arena = tbb::task_arena{2, 0}; + auto context = TaskArenaContext(task_arena); + auto executor = TaskArenaExecutor(context); + auto delegation_thread = boost::capy::thread_pool(1); + + { + std::cout << "--- Single event, synchronous wait for completion ---\n"; + cudaStream_t stream; + ERROR_CHECK_CUDA(cudaStreamCreate(&stream)); + cudaEvent_t event; + ERROR_CHECK_CUDA( + cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); + auto final_result = tools::StatusCode{}; + auto done = std::latch{1}; + auto result_handler = [&done, &final_result](tools::StatusCode code) { + final_result = code; + done.count_down(); + }; + + log() << "main Launching algorithm..." << std::endl; + boost::capy::run_async(executor, result_handler)( + reconstruct(stream, event, delegation_thread, "main")); + done.wait(); + log() << "main Final status of algorithm " << final_result << "" + << std::endl; + ERROR_CHECK_CUDA(cudaEventDestroy(event)); + ERROR_CHECK_CUDA(cudaStreamDestroy(stream)); + } + + { + std::cout << "--- Multiple events, wait for all to complete ---\n "; + + auto streams = std::vector(2); + auto events = std::vector(streams.size()); + auto status = std::vector(streams.size()); + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaStreamCreate(&streams.at(i))); + ERROR_CHECK_CUDA(cudaEventCreateWithFlags(&events.at(i), + cudaEventDisableTiming)); + } + + auto done = std::latch{static_cast(streams.size())}; + log() << "main Launching algorithms..." << std::endl; + + for (std::size_t i = 0; i < streams.size(); ++i) { + auto result_handler = [&done, &status, i](tools::StatusCode code) { + status.at(i) = code; + done.count_down(); + }; + boost::capy::run_async(executor, result_handler)( + reconstruct(streams.at(i), events.at(i), delegation_thread, + "event" + std::to_string(i))); + } + done.wait(); + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaEventDestroy(events.at(i))); + ERROR_CHECK_CUDA(cudaStreamDestroy(streams.at(i))); + } + + log() << "main All algorithms completed. Final statuses: "; + for (auto s : status) { + std::cout << s << ' '; + } + std::cout << std::endl; + } + return 0; +}