From d4ffc678fdd8bdf53317bd9ac4e449034c4537ad Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Thu, 26 Mar 2026 21:58:25 +0100 Subject: [PATCH 1/4] add delegation example with alien coroutine --- examples/CMakeLists.txt | 2 + examples/alien_delegate.cpp | 299 ++++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 examples/alien_delegate.cpp diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 131473b..e4a000e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -70,6 +70,8 @@ endif() if(BUILD_CUDA AND BUILD_TBB) add_executable(alien_reco alien_reco.cpp) target_link_libraries(alien_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb) + add_executable(alien_delegate alien_delegate.cpp) + target_link_libraries(alien_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb) if(BUILD_STDEXEC) add_executable(exec_reco_stdexec exec_reco.cpp) target_link_libraries(exec_reco_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec) diff --git a/examples/alien_delegate.cpp b/examples/alien_delegate.cpp new file mode 100644 index 0000000..bd3aae1 --- /dev/null +++ b/examples/alien_delegate.cpp @@ -0,0 +1,299 @@ +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "CoroutineTests/alien/counting_scope.hpp" +#include "CoroutineTests/alien/schedule_on.hpp" +#include "CoroutineTests/alien/subtool.hpp" +#include "CoroutineTests/alien/sync_wait.hpp" +#include "CoroutineTests/alien/tool.hpp" +#include "CoroutineTests/threadpool.hpp" +#include "logging_utils.hpp" // log, format_name + +#define ERROR_CHECK_CUDA(EXP) \ + do { \ + cudaError_t errorCode = EXP; \ + if (errorCode != cudaSuccess) { \ + throw std::runtime_error( \ + std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \ + cudaGetErrorString(errorCode))); \ + } \ + } while (false) + +using namespace CoroutineTests::alien; + +/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain +/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback +/// on the stream. +class StreamAwaitable { + public: + explicit StreamAwaitable(cudaStream_t stream) : m_stream(stream) {} + + bool await_ready() const noexcept { return false; } + template + void await_suspend(std::coroutine_handle handle) { + m_error = cudaLaunchHostFunc(m_stream, resumption_callback, + handle.address()); + // If the callback couldn't be registered, we need to reschedule the + // coroutine immediately to avoid deadlock. + if (m_error != cudaSuccess) { + handle.promise().reschedule(); + } + } + cudaError_t await_resume() const noexcept { return m_error; } + + private: + cudaStream_t m_stream; + cudaError_t m_error = cudaSuccess; + + template + static void resumption_callback(void* userData) { + auto handle = std::coroutine_handle::from_address(userData); + handle.promise().reschedule(); + } +}; + +template +struct DeviceBuffer { + T* ptr = nullptr; + std::size_t size = 0; +}; + +template +tool::Task delegate(F f) { + f(); + co_return; +} + +subtool::Task> clusterization( + DeviceBuffer cells, cudaStream_t stream, + std::function)> delegation_scheduler, + std::string_view parent) { + + const auto self = format_name(parent, "clusterization"); + log(self) << "Starting clusterization" << std::endl; + + const auto nCells = static_cast(cells.size); + + // Copy cells back to host to count non-zero entries + auto h_cells = std::vector(nCells); + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated copy of cells from device to host" + << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr, + nCells * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + })); + + ERROR_CHECK_CUDA(co_await StreamAwaitable{stream}); + + auto nClusters = 0; + for (auto v : h_cells) + if (v != 0) + ++nClusters; + + log(self) << "Found " << nClusters << " clusters" << std::endl; + + // Allocate clusters of appropriate size on device + int* d_clusters = nullptr; + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of clusters on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&d_clusters), + nClusters * sizeof(int), stream)); + + // Write some dummy data to the clusters buffer to simulate work + ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 0, + nClusters * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync( + d_clusters, 1, nClusters / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_clusters, + static_cast(nClusters)}; +} + +subtool::Task> seeding( + DeviceBuffer clusters, cudaStream_t stream, + std::function)> delegation_scheduler, + std::string_view parent) { + + const auto self = format_name(parent, "seeding"); + log(self) << "Starting seeding" << std::endl; + + const auto nClusters = static_cast(clusters.size); + + // Copy clusters to host to count non-zero entries + auto h_clusters = std::vector(nClusters); + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated copy of clusters to host" << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, + nClusters * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + })); + + ERROR_CHECK_CUDA(co_await StreamAwaitable{stream}); + + int nSeeds = 0; + for (auto v : h_clusters) + if (v != 0) + ++nSeeds; + + log(self) << "Found " << nSeeds << " seeds" << std::endl; + + // Allocate seeds of appropriate size on device + int* d_seeds = nullptr; + + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of seeds on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), + nSeeds * sizeof(int), stream)); + + // Write some dummy data to the seeds buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); + })); + + co_return DeviceBuffer{d_seeds, static_cast(nSeeds)}; +} + +tool::Task reconstruct( + cudaStream_t stream, + std::function)> delegation_scheduler, + std::string_view parent) { + const auto self = format_name(parent, "reconstruction"); + log(self) << "Starting reconstruction" << std::endl; + + // Allocate some dummy input data on the device + auto cells = DeviceBuffer{nullptr, 1000}; + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated allocation of input data on device" + << std::endl; + ERROR_CHECK_CUDA( + cudaMallocAsync(reinterpret_cast(&cells.ptr), + cells.size * sizeof(int), stream)); + ERROR_CHECK_CUDA(cudaMemsetAsync(cells.ptr, 1, + cells.size * sizeof(int), stream)); + })); + + // Run the clusterization and seeding steps + auto clusters = + co_await clusterization(cells, stream, delegation_scheduler, self); + auto seeds = co_await seeding(clusters, stream, delegation_scheduler, self); + + // Cleanup + co_await schedule_on( + delegation_scheduler, delegate([&]() { + log(self) << "Delegated cleanup of device memory" << std::endl; + ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); + })); + + ERROR_CHECK_CUDA(co_await StreamAwaitable{stream}); + + log(self) << "Finishing reconstruction" << std::endl; + co_return tool::StatusCode::SUCCESS; +} + +int main() { + int deviceCount = 0; + auto error_id = cudaGetDeviceCount(&deviceCount); + + if (error_id != cudaSuccess) { + std::cout << "cudaGetDeviceCount returned " + << static_cast(error_id) << "\n" + << cudaGetErrorString(error_id) << "\n"; + return EXIT_FAILURE; + } + + if (deviceCount == 0) { + std::cout << "No CUDA devices found.\n"; + return EXIT_FAILURE; + } + + log() << "main Starting" << std::endl; + + tbb::task_arena task_arena{2, 0}; + auto scheduler = [&task_arena](std::coroutine_handle<> handle) { + task_arena.enqueue([handle]() { handle.resume(); }); + }; + + CoroutineTests::Threadpool delegation_threadpool(1); + auto delegation_scheduler = + [&delegation_threadpool](std::coroutine_handle<> handle) { + delegation_threadpool.enqueue_task(handle); + }; + + { + cudaStream_t stream; + ERROR_CHECK_CUDA(cudaStreamCreate(&stream)); + std::cout << "--- Single event, synchronous wait for completion ---\n"; + log() << "main Launching algorithm..." << std::endl; + auto status = sync_wait( + scheduler, reconstruct(stream, delegation_scheduler, "main")); + log() << "main Final status of algorithm " << status << "" << std::endl; + ERROR_CHECK_CUDA(cudaStreamDestroy(stream)); + } + + { + std::cout << "--- Multiple events, wait for all to complete ---\n"; + + auto streams = std::vector(2); + auto status = std::vector(streams.size()); + for (auto& stream : streams) { + ERROR_CHECK_CUDA(cudaStreamCreate(&stream)); + } + + auto scope = counting_scope{}; + log() << "main Launching algorithms..." << std::endl; + + auto payload = [](std::vector streams, + std::vector& statuses, + std::function)> + delegation_scheduler, + int i) -> tool::Task { + const auto name = std::format("event{}:main", i); + auto& stream = streams.at(i); + auto& status = statuses.at(i); + status = co_await reconstruct( + stream, std::move(delegation_scheduler), name); + co_return; + }; + + for (std::size_t i = 0; i < streams.size(); ++i) { + scope.spawn(scheduler, + payload(streams, status, delegation_scheduler, + static_cast(i))); + } + scope.join(); + + for (auto& stream : streams) { + ERROR_CHECK_CUDA(cudaStreamDestroy(stream)); + } + + log() << "main All algorithms completed. Final statuses: "; + for (auto s : status) { + std::cout << s << ' '; + } + std::cout << std::endl; + } + return 0; +} From 82d62a18b0b14fe32e95daf4f18091f3acd953fe Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Thu, 26 Mar 2026 22:08:05 +0100 Subject: [PATCH 2/4] fix typos, rename helper --- examples/alien_reco.cpp | 4 +-- examples/capy_delegate.cpp | 62 +++++++++++++++++--------------------- examples/capy_reco.cpp | 4 +-- examples/exec_delegate.cpp | 4 +-- examples/exec_reco.cpp | 4 +-- 5 files changed, 36 insertions(+), 42 deletions(-) diff --git a/examples/alien_reco.cpp b/examples/alien_reco.cpp index 77893ee..db5d862 100644 --- a/examples/alien_reco.cpp +++ b/examples/alien_reco.cpp @@ -84,7 +84,7 @@ subtool::Task> clusterization(DeviceBuffer cells, log(self) << "Found " << nClusters << " clusters" << std::endl; - // Allocate clusters of appropiate size on device + // Allocate clusters of appropriate size on device int* d_clusters = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_clusters), nClusters * sizeof(int), stream)); @@ -124,7 +124,7 @@ subtool::Task> seeding(DeviceBuffer clusters, log(self) << "Found " << nSeeds << " seeds" << std::endl; - // Allocate seeds of appropiate size on device + // Allocate seeds of appropriate size on device int* d_seeds = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), nSeeds * sizeof(int), stream)); diff --git a/examples/capy_delegate.cpp b/examples/capy_delegate.cpp index b8236ca..923b23a 100644 --- a/examples/capy_delegate.cpp +++ b/examples/capy_delegate.cpp @@ -38,7 +38,7 @@ struct DeviceBuffer { }; template -boost::capy::task coro_wrapper(F&& f) { +boost::capy::task delegate(F&& f) { std::forward(f)(); co_return; } @@ -55,8 +55,7 @@ boost::capy::task> clusterization( // Copy cells back to host to count non-zero entries auto h_cells = std::vector(nCells); - co_await boost::capy::run( - delegation_thread.get_executor())(coro_wrapper([&]() { + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { log(self) << "Delegated copy of cells from device to host" << std::endl; ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr, nCells * sizeof(int), @@ -72,11 +71,10 @@ boost::capy::task> clusterization( log(self) << "Found " << nClusters << " clusters" << std::endl; - // Allocate clusters of appropiate size on device + // Allocate clusters of appropriate size on device int* d_clusters = nullptr; - co_await boost::capy::run( - delegation_thread.get_executor())(coro_wrapper([&]() { + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { log(self) << "Delegated allocation of clusters on device" << std::endl; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_clusters), nClusters * sizeof(int), stream)); @@ -104,13 +102,12 @@ boost::capy::task> seeding( // Copy clusters to host to count non-zero entries auto h_clusters = std::vector(nClusters); - co_await boost::capy::run(delegation_thread.get_executor())( - coro_wrapper([&]() { - log(self) << "Delegated copy of clusters to host" << std::endl; - ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, - nClusters * sizeof(int), - cudaMemcpyDeviceToHost, stream)); - })); + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated copy of clusters to host" << std::endl; + ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr, + nClusters * sizeof(int), + cudaMemcpyDeviceToHost, stream)); + })); ERROR_CHECK_CUDA(co_await StreamIoAwaitable{stream}); @@ -121,21 +118,20 @@ boost::capy::task> seeding( log(self) << "Found " << nSeeds << " seeds" << std::endl; - // Allocate seeds of appropiate size on device + // Allocate seeds of appropriate size on device int* d_seeds = nullptr; - co_await boost::capy::run(delegation_thread.get_executor())( - coro_wrapper([&]() { - log(self) << "Delegated allocation of seeds on device" << std::endl; - ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), - nSeeds * sizeof(int), stream)); + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated allocation of seeds on device" << std::endl; + ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), + nSeeds * sizeof(int), stream)); - // Write some dummy data to the seeds buffer to simulate work - ERROR_CHECK_CUDA( - cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); - ERROR_CHECK_CUDA( - cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); - })); + // Write some dummy data to the seeds buffer to simulate work + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream)); + ERROR_CHECK_CUDA( + cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream)); + })); co_return DeviceBuffer{d_seeds, static_cast(nSeeds)}; } @@ -148,8 +144,7 @@ boost::capy::task reconstruct( // Allocate some dummy input data on the device auto cells = DeviceBuffer{nullptr, 1000}; - co_await boost::capy::run( - delegation_thread.get_executor())(coro_wrapper([&]() { + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { log(self) << "Delegated allocation of input data on device" << std::endl; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&cells.ptr), @@ -164,13 +159,12 @@ boost::capy::task reconstruct( auto seeds = co_await seeding(clusters, stream, delegation_thread, self); // Cleanup - co_await boost::capy::run(delegation_thread.get_executor())( - coro_wrapper([&]() { - log(self) << "Delegated cleanup of device memory" << std::endl; - ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); - ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); - ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); - })); + co_await boost::capy::run(delegation_thread.get_executor())(delegate([&]() { + log(self) << "Delegated cleanup of device memory" << std::endl; + ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream)); + ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream)); + })); ERROR_CHECK_CUDA(co_await StreamIoAwaitable{stream}); diff --git a/examples/capy_reco.cpp b/examples/capy_reco.cpp index 4fc92e8..ae094a7 100644 --- a/examples/capy_reco.cpp +++ b/examples/capy_reco.cpp @@ -60,7 +60,7 @@ boost::capy::task> clusterization(DeviceBuffer cells, log(self) << "Found " << nClusters << " clusters" << std::endl; - // Allocate clusters of appropiate size on device + // Allocate clusters of appropriate size on device int* d_clusters = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_clusters), nClusters * sizeof(int), stream)); @@ -100,7 +100,7 @@ boost::capy::task> seeding(DeviceBuffer clusters, log(self) << "Found " << nSeeds << " seeds" << std::endl; - // Allocate seeds of appropiate size on device + // Allocate seeds of appropriate size on device int* d_seeds = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), nSeeds * sizeof(int), stream)); diff --git a/examples/exec_delegate.cpp b/examples/exec_delegate.cpp index 8c5a40a..cdcbedc 100644 --- a/examples/exec_delegate.cpp +++ b/examples/exec_delegate.cpp @@ -67,7 +67,7 @@ exec::task> clusterization( log(self) << "Found " << nClusters << " clusters" << std::endl; - // Allocate clusters of appropiate size on device + // Allocate clusters of appropriate size on device int* d_clusters = nullptr; stdexec::sender auto allocate_clusters = @@ -123,7 +123,7 @@ exec::task> seeding( log(self) << "Found " << nSeeds << " seeds" << std::endl; - // Allocate seeds of appropiate size on device + // Allocate seeds of appropriate size on device int* d_seeds = nullptr; stdexec::sender auto allocate_seeds = diff --git a/examples/exec_reco.cpp b/examples/exec_reco.cpp index bd4eb9a..dbc07d3 100644 --- a/examples/exec_reco.cpp +++ b/examples/exec_reco.cpp @@ -60,7 +60,7 @@ exec::task> clusterization(DeviceBuffer cells, log(self) << "Found " << nClusters << " clusters" << std::endl; - // Allocate clusters of appropiate size on device + // Allocate clusters of appropriate size on device int* d_clusters = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_clusters), nClusters * sizeof(int), stream)); @@ -100,7 +100,7 @@ exec::task> seeding(DeviceBuffer clusters, log(self) << "Found " << nSeeds << " seeds" << std::endl; - // Allocate seeds of appropiate size on device + // Allocate seeds of appropriate size on device int* d_seeds = nullptr; ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast(&d_seeds), nSeeds * sizeof(int), stream)); From 2ee278db035ee9a50f3d5839d4ca567375ac37bd Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Thu, 26 Mar 2026 22:08:47 +0100 Subject: [PATCH 3/4] move alien stream awaitable to separate header --- examples/alien_delegate.cpp | 34 ++------------------------------ examples/alien_reco.cpp | 34 ++------------------------------ examples/alien_stream_await.hpp | 35 +++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 64 deletions(-) create mode 100644 examples/alien_stream_await.hpp diff --git a/examples/alien_delegate.cpp b/examples/alien_delegate.cpp index bd3aae1..e67d9af 100644 --- a/examples/alien_delegate.cpp +++ b/examples/alien_delegate.cpp @@ -14,7 +14,8 @@ #include "CoroutineTests/alien/sync_wait.hpp" #include "CoroutineTests/alien/tool.hpp" #include "CoroutineTests/threadpool.hpp" -#include "logging_utils.hpp" // log, format_name +#include "alien_stream_await.hpp" // StreamAwaitable +#include "logging_utils.hpp" // log, format_name #define ERROR_CHECK_CUDA(EXP) \ do { \ @@ -28,37 +29,6 @@ using namespace CoroutineTests::alien; -/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain -/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback -/// on the stream. -class StreamAwaitable { - public: - explicit StreamAwaitable(cudaStream_t stream) : m_stream(stream) {} - - bool await_ready() const noexcept { return false; } - template - void await_suspend(std::coroutine_handle handle) { - m_error = cudaLaunchHostFunc(m_stream, resumption_callback, - handle.address()); - // If the callback couldn't be registered, we need to reschedule the - // coroutine immediately to avoid deadlock. - if (m_error != cudaSuccess) { - handle.promise().reschedule(); - } - } - cudaError_t await_resume() const noexcept { return m_error; } - - private: - cudaStream_t m_stream; - cudaError_t m_error = cudaSuccess; - - template - static void resumption_callback(void* userData) { - auto handle = std::coroutine_handle::from_address(userData); - handle.promise().reschedule(); - } -}; - template struct DeviceBuffer { T* ptr = nullptr; diff --git a/examples/alien_reco.cpp b/examples/alien_reco.cpp index db5d862..51ba323 100644 --- a/examples/alien_reco.cpp +++ b/examples/alien_reco.cpp @@ -8,7 +8,8 @@ #include "CoroutineTests/alien/subtool.hpp" #include "CoroutineTests/alien/sync_wait.hpp" #include "CoroutineTests/alien/tool.hpp" -#include "logging_utils.hpp" // log, format_name +#include "alien_stream_await.hpp" // StreamAwaitable +#include "logging_utils.hpp" // log, format_name #define ERROR_CHECK_CUDA(EXP) \ do { \ @@ -22,37 +23,6 @@ using namespace CoroutineTests::alien; -/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain -/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback -/// on the stream. -class StreamAwaitable { - public: - StreamAwaitable(cudaStream_t stream) : m_stream(stream) {} - - bool await_ready() const noexcept { return false; } - template - void await_suspend(std::coroutine_handle handle) { - m_error = cudaLaunchHostFunc(m_stream, resumption_callback, - handle.address()); - // If the callback couldn't be registered, we need to reschedule the - // coroutine immediately to avoid deadlock. - if (m_error != cudaSuccess) { - handle.promise().reschedule(); - } - } - cudaError_t await_resume() const noexcept { return m_error; } - - private: - cudaStream_t m_stream; - cudaError_t m_error = cudaSuccess; - - template - static void resumption_callback(void* userData) { - auto handle = std::coroutine_handle::from_address(userData); - handle.promise().reschedule(); - } -}; - template struct DeviceBuffer { T* ptr = nullptr; diff --git a/examples/alien_stream_await.hpp b/examples/alien_stream_await.hpp new file mode 100644 index 0000000..863b212 --- /dev/null +++ b/examples/alien_stream_await.hpp @@ -0,0 +1,35 @@ +#pragma once +#include + +#include + +/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain +/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback +/// on the stream. +class StreamAwaitable { + public: + explicit StreamAwaitable(cudaStream_t stream) : m_stream(stream) {} + + bool await_ready() const noexcept { return false; } + template + void await_suspend(std::coroutine_handle handle) { + m_error = cudaLaunchHostFunc(m_stream, resumption_callback, + handle.address()); + // If the callback couldn't be registered, we need to reschedule the + // coroutine immediately to avoid deadlock. + if (m_error != cudaSuccess) { + handle.promise().reschedule(); + } + } + cudaError_t await_resume() const noexcept { return m_error; } + + private: + cudaStream_t m_stream; + cudaError_t m_error = cudaSuccess; + + template + static void resumption_callback(void* userData) { + auto handle = std::coroutine_handle::from_address(userData); + handle.promise().reschedule(); + } +}; From 8c200210c3554539d0d05440887ff569ebf1d12f Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Thu, 26 Mar 2026 22:19:25 +0100 Subject: [PATCH 4/4] update readme, fix links --- examples/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/README.md b/examples/README.md index e81e40a..943bbe5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -168,8 +168,8 @@ Link: [alien_reco.cpp](alien_reco.cpp), [exec_reco.cpp](exec_reco.cpp), [capy_re These examples show a setup and coroutine chain loosely inspired by track reconstruction on GPU in high energy physics experiments. The `reconstruct` coroutine prepares a mockup input data on a CUDA device, then `co_await`s the `clustering` coroutine, then receives output from it and `co_await`s the `seeding` coroutine. Both `clustering` and `seeding` coroutines receive a device buffer, copy it asynchronously back to host, suspend until the copy is done, then count non-zero elements and allocate new buffer of that size for their results. In `main` the `reconstruct` coroutines are executed in a TBB task arena either synchronously waiting for the result from the submitting or dynamically starting a few `reconstruct` coroutines and waiting until all the work is finished. In the `alien_reco` the application is implemented with "alien" coroutines (as in [alien examples](#alien)), in `exec_reco_stdexec` the application is implemented with C++26 execution, in `capy_reco` the implementation uses Boost.Capy and IoAwaitables protocol ([p4003](https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2026/p4003r0.pdf)). -## Delegation +## Delegate -Link: [exec_delegation.cpp](exec_delegation.cpp), [capy_delegation.cpp](capy_delegation.cpp) +Link: [alien_delegate.cpp](alien_delegate.cpp), [exec_delegate.cpp](exec_delegate.cpp), [capy_delegate.cpp](capy_delegate.cpp) -These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. By delegating these calls to a single thread, this contention can be reduced. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors. +These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors.