diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 865113b..5876924 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -79,6 +79,8 @@ if(BUILD_CUDA AND BUILD_TBB) target_link_libraries(exec_reco_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec) add_executable(exec_delegate_stdexec exec_delegate.cpp) target_link_libraries(exec_delegate_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec) + add_executable(exec_event_poll_stdexec exec_event_poll.cpp) + target_link_libraries(exec_event_poll_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec) endif() if(BUILD_CAPY) add_executable(capy_reco capy_reco.cpp) diff --git a/examples/README.md b/examples/README.md index 67b1f58..037e363 100644 --- a/examples/README.md +++ b/examples/README.md @@ -176,6 +176,6 @@ These examples are a modification of [reconstruction examples](#reconstruction) ## Event poll -Link: [alien_event_poll.cpp](alien_event_poll.cpp), [capy_event_poll.cpp](capy_event_poll.cpp) +Link: [alien_event_poll.cpp](alien_event_poll.cpp), [exec_event_poll.cpp](exec_event_poll.cpp), [capy_event_poll.cpp](capy_event_poll.cpp) These examples are a variant of [delegate examples](#delegate) in which awaiting completion of CUDA operations is done by repeatedly querying the event state instead of using a callback as in earlier examples. All the queries are executed by a specific thread. diff --git a/examples/alien_event_poll.cpp b/examples/alien_event_poll.cpp index 1b49d7e..19c3042 100644 --- a/examples/alien_event_poll.cpp +++ b/examples/alien_event_poll.cpp @@ -50,14 +50,15 @@ struct Retry { void await_resume() const noexcept {} }; -tool::Task poll(cudaEvent_t event) { +tool::Task poll(cudaEvent_t event, std::string_view parent) { auto status = cudaSuccess; - log() << "Polling for event completion..." << std::endl; + log(parent) << "Polling for event completion..." << std::endl; while ((status = cudaEventQuery(event)) == cudaErrorNotReady) { - log() << "Event not ready, retrying..." << std::endl; + log(parent) << "Event not ready, retrying..." << std::endl; co_await Retry{}; } ERROR_CHECK_CUDA(status); + log(parent) << "Event completed successfully" << std::endl; } subtool::Task> clusterization( @@ -83,7 +84,7 @@ subtool::Task> clusterization( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await schedule_on(delegation_scheduler, poll(event)); + co_await schedule_on(delegation_scheduler, poll(event, self)); auto nClusters = 0; for (auto v : h_cells) @@ -136,7 +137,7 @@ subtool::Task> seeding( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await schedule_on(delegation_scheduler, poll(event)); + co_await schedule_on(delegation_scheduler, poll(event, self)); int nSeeds = 0; for (auto v : h_clusters) @@ -200,7 +201,7 @@ tool::Task reconstruct( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await schedule_on(delegation_scheduler, poll(event)); + co_await schedule_on(delegation_scheduler, poll(event, self)); log(self) << "Finishing reconstruction" << std::endl; co_return tool::StatusCode::SUCCESS; diff --git a/examples/capy_event_poll.cpp b/examples/capy_event_poll.cpp index 099101c..3f48831 100644 --- a/examples/capy_event_poll.cpp +++ b/examples/capy_event_poll.cpp @@ -51,14 +51,15 @@ struct Retry { void await_resume() const noexcept {} }; -boost::capy::task poll(cudaEvent_t event) { +boost::capy::task poll(cudaEvent_t event, std::string_view parent) { auto status = cudaSuccess; - log() << "Polling for event completion..." << std::endl; + log(parent) << "Polling for event completion..." << std::endl; while ((status = cudaEventQuery(event)) == cudaErrorNotReady) { - log() << "Event not ready, retrying..." << std::endl; + log(parent) << "Event not ready, retrying..." << std::endl; co_await Retry{}; } ERROR_CHECK_CUDA(status); + log(parent) << "Event completed successfully" << std::endl; } boost::capy::task> clusterization( @@ -81,7 +82,8 @@ boost::capy::task> clusterization( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + co_await boost::capy::run(delegation_thread.get_executor())( + poll(event, self)); auto nClusters = 0; for (auto v : h_cells) @@ -129,7 +131,8 @@ boost::capy::task> seeding( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + co_await boost::capy::run(delegation_thread.get_executor())( + poll(event, self)); int nSeeds = 0; for (auto v : h_clusters) @@ -188,7 +191,8 @@ boost::capy::task reconstruct( ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); })); - co_await boost::capy::run(delegation_thread.get_executor())(poll(event)); + co_await boost::capy::run(delegation_thread.get_executor())( + poll(event, self)); log(self) << "Finishing reconstruction" << std::endl; co_return tools::StatusCode::SUCCESS; diff --git a/examples/exec_event_poll.cpp b/examples/exec_event_poll.cpp new file mode 100644 index 0000000..3b36596 --- /dev/null +++ b/examples/exec_event_poll.cpp @@ -0,0 +1,302 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "exec_task_arena_scheduler.hpp" // TaskArenaScheduler +#include "logging_utils.hpp" // log, format_name +#include "statuscode.hpp" // StatusCodeImpl + +namespace tools { +struct Tag { + static constexpr const char* name = "tools"; +}; +using StatusCode = StatusCodeImpl; +} // namespace tools + +#define ERROR_CHECK_CUDA(EXP) \ + do { \ + cudaError_t errorCode = EXP; \ + if (errorCode != cudaSuccess) { \ + throw std::runtime_error( \ + std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \ + cudaGetErrorString(errorCode))); \ + } \ + } while (false) + +template +struct DeviceBuffer { + T* ptr = nullptr; + std::size_t size = 0; +}; + +// Poll a CUDA event for completion by repeatedly querying it. +// Each query attempt is scheduled onto the delegation scheduler. +static auto poll_event(cudaEvent_t event, + exec::single_thread_context& delegation_ctx, + std::string_view parent) { + log(parent) << "Polling for event completion..." << std::endl; + auto query_once = + execution::just() | execution::then([event, parent]() -> bool { + auto status = cudaEventQuery(event); + if (status == cudaErrorNotReady) { + log(parent) << "Event not ready, retrying..." << std::endl; + return false; + } + ERROR_CHECK_CUDA(status); + log(parent) << "Event completed successfully" << std::endl; + return true; + }); + + return exec::repeat_effect_until(execution::starts_on( + delegation_ctx.get_scheduler(), std::move(query_once))); +} + +exec::task> clusterization( + DeviceBuffer cells, cudaStream_t stream, cudaEvent_t event, + exec::single_thread_context& delegation_ctx, std::string_view parent) { + + const auto self = format_name(parent, "clusterization"); + log(self) << "Starting clusterization" << std::endl; + + const auto nCells = static_cast(cells.size); + + // Copy cells back to host to count non-zero entries + auto h_cells = std::vector(nCells); + + stdexec::sender auto copy_cells = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated copy of cells from device to host" + << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr, + nCells * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), std::move(copy_cells)); + + co_await poll_event(event, delegation_ctx, self); + + auto nClusters = 0; + for (auto v : h_cells) + if (v != 0) + ++nClusters; + + log(self) << "Found " << nClusters << " clusters" << std::endl; + + // Allocate clusters of appropriate size on device + int* d_clusters = nullptr; + + stdexec::sender auto allocate_clusters = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated allocation of clusters on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&d_clusters), + nClusters * sizeof(int), stream)); + + // Write some dummy data to the clusters buffer to simulate work + ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 0, + nClusters * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync( + d_clusters, 1, nClusters / 2 * sizeof(int), stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), + std::move(allocate_clusters)); + + co_return DeviceBuffer{d_clusters, + static_cast(nClusters)}; +} + +exec::task> seeding( + DeviceBuffer clusters, cudaStream_t stream, cudaEvent_t event, + exec::single_thread_context& delegation_ctx, std::string_view parent) { + + const auto self = format_name(parent, "seeding"); + log(self) << "Starting seeding" << std::endl; + + const auto nClusters = static_cast(clusters.size); + + // Copy clusters to host to count non-zero entries + auto h_clusters = std::vector(nClusters); + + stdexec::sender auto copy_clusters = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated copy of clusters from device to host" + << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, + nClusters * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), + std::move(copy_clusters)); + + co_await poll_event(event, delegation_ctx, self); + + int nSeeds = 0; + for (auto v : h_clusters) + if (v != 0) + ++nSeeds; + + log(self) << "Found " << nSeeds << " seeds" << std::endl; + + // Allocate seeds of appropriate size on device + int* d_seeds = nullptr; + + stdexec::sender auto allocate_seeds = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated allocation of seeds on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), + nSeeds * sizeof(int), stream)); + + // Write some dummy data to the clusters buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), + std::move(allocate_seeds)); + + co_return DeviceBuffer{d_seeds, static_cast(nSeeds)}; +} + +exec::task reconstruct( + cudaStream_t stream, cudaEvent_t event, + exec::single_thread_context& delegation_ctx, std::string_view parent) { + const auto self = format_name(parent, "reconstruction"); + log(self) << "Starting reconstruction" << std::endl; + + // Allocate some dummy input data on the device + auto cells = DeviceBuffer{nullptr, 1000}; + stdexec::sender auto allocate_cells = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated allocation of input data on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&cells.ptr), + cells.size * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync(cells.ptr, 1, + cells.size * sizeof(int), stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), + std::move(allocate_cells)); + + // Run the clusterization and seeding steps + auto clusters = + co_await clusterization(cells, stream, event, delegation_ctx, self); + auto seeds = + co_await seeding(clusters, stream, event, delegation_ctx, self); + + // Cleanup + stdexec::sender auto cleanup = + stdexec::just() | stdexec::then([&]() { + log(self) << "Delegated cleanup of device memory" << std::endl; + ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); + ERROR_CHECK_CUDA(cudaEventRecord(event, stream)); + }); + co_await stdexec::on(delegation_ctx.get_scheduler(), std::move(cleanup)); + + co_await poll_event(event, delegation_ctx, self); + + log(self) << "Finishing reconstruction" << std::endl; + co_return tools::StatusCode::SUCCESS; +} + +int main() { + int deviceCount = 0; + auto error_id = cudaGetDeviceCount(&deviceCount); + + if (error_id != cudaSuccess) { + std::cout << "cudaGetDeviceCount returned " + << static_cast(error_id) << "\n" + << cudaGetErrorString(error_id) << "\n"; + return EXIT_FAILURE; + } + + if (deviceCount == 0) { + std::cout << "No CUDA devices found.\n"; + return EXIT_FAILURE; + } + + log() << "main Starting" << std::endl; + + tbb::task_arena task_arena{2, 0}; + execution::scheduler auto scheduler = get_scheduler(task_arena, false); + exec::single_thread_context delegation_context; + + { + cudaStream_t stream; + ERROR_CHECK_CUDA(cudaStreamCreate(&stream)); + cudaEvent_t event; + ERROR_CHECK_CUDA( + cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); + std::cout << "--- Single event, synchronous wait for completion ---\n"; + log() << "main Launching algorithm..." << std::endl; + auto [status] = + stdexec::sync_wait( + stdexec::starts_on( + scheduler, + reconstruct(stream, event, delegation_context, "main"))) + .value(); + log() << "main Final status of algorithm " << status << "" << std::endl; + ERROR_CHECK_CUDA(cudaEventDestroy(event)); + ERROR_CHECK_CUDA(cudaStreamDestroy(stream)); + } + + { + std::cout << "--- Multiple events, wait for all to complete ---\n"; + + auto streams = std::vector(2); + auto events = std::vector(streams.size()); + auto status = std::vector(streams.size()); + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaStreamCreate(&streams.at(i))); + ERROR_CHECK_CUDA(cudaEventCreateWithFlags(&events.at(i), + cudaEventDisableTiming)); + } + + auto scope = exec::async_scope{}; + log() << "main Launching algorithms..." << std::endl; + + auto payload = [](std::vector streams, + std::vector events, + std::vector& statuses, int i, + exec::single_thread_context& delegation_ctx) + -> exec::task { + const auto name = std::format("event{}:", i); + auto& stream = streams.at(i); + auto& event = events.at(i); + auto& status = statuses.at(i); + status = co_await reconstruct(stream, event, delegation_ctx, name); + }; + for (std::size_t i = 0; i < streams.size(); ++i) { + scope.spawn(stdexec::starts_on( + scheduler, + payload(streams, events, status, i, delegation_context))); + } + stdexec::sync_wait(scope.on_empty()); + + for (std::size_t i = 0; i < streams.size(); ++i) { + ERROR_CHECK_CUDA(cudaEventDestroy(events.at(i))); + ERROR_CHECK_CUDA(cudaStreamDestroy(streams.at(i))); + } + + log() << "main All algorithms completed. Final statuses: "; + for (auto s : status) { + std::cout << s << ' '; + } + std::cout << std::endl; + } + return 0; +}