Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ endif()
if(BUILD_CUDA AND BUILD_TBB)
add_executable(alien_reco alien_reco.cpp)
target_link_libraries(alien_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb)
add_executable(alien_delegate alien_delegate.cpp)
target_link_libraries(alien_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb)
if(BUILD_STDEXEC)
add_executable(exec_reco_stdexec exec_reco.cpp)
target_link_libraries(exec_reco_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec)
Expand Down
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ Link: [alien_reco.cpp](alien_reco.cpp), [exec_reco.cpp](exec_reco.cpp), [capy_re

These examples show a setup and coroutine chain loosely inspired by track reconstruction on GPU in high energy physics experiments. The `reconstruct` coroutine prepares a mockup input data on a CUDA device, then `co_await`s the `clustering` coroutine, then receives output from it and `co_await`s the `seeding` coroutine. Both `clustering` and `seeding` coroutines receive a device buffer, copy it asynchronously back to host, suspend until the copy is done, then count non-zero elements and allocate new buffer of that size for their results. In `main` the `reconstruct` coroutines are executed in a TBB task arena either synchronously waiting for the result from the submitting or dynamically starting a few `reconstruct` coroutines and waiting until all the work is finished. In the `alien_reco` the application is implemented with "alien" coroutines (as in [alien examples](#alien)), in `exec_reco_stdexec` the application is implemented with C++26 execution, in `capy_reco` the implementation uses Boost.Capy and IoAwaitables protocol ([p4003](https://www.open-std.org/JTC1/SC22/WG21/docs/papers/2026/p4003r0.pdf)).

## Delegation
## Delegate

Link: [exec_delegation.cpp](exec_delegation.cpp), [capy_delegation.cpp](capy_delegation.cpp)
Link: [alien_delegate.cpp](alien_delegate.cpp), [exec_delegate.cpp](exec_delegate.cpp), [capy_delegate.cpp](capy_delegate.cpp)

These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. By delegating these calls to a single thread, this contention can be reduced. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors.
These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors.
269 changes: 269 additions & 0 deletions examples/alien_delegate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
#include <cuda_runtime_api.h>
#include <tbb/task_arena.h>

#include <cstddef>
#include <format>
#include <iostream>
#include <stdexcept>
#include <string_view>
#include <vector>

#include "CoroutineTests/alien/counting_scope.hpp"
#include "CoroutineTests/alien/schedule_on.hpp"
#include "CoroutineTests/alien/subtool.hpp"
#include "CoroutineTests/alien/sync_wait.hpp"
#include "CoroutineTests/alien/tool.hpp"
#include "CoroutineTests/threadpool.hpp"
#include "alien_stream_await.hpp" // StreamAwaitable
#include "logging_utils.hpp" // log, format_name

#define ERROR_CHECK_CUDA(EXP) \
do { \
cudaError_t errorCode = EXP; \
if (errorCode != cudaSuccess) { \
throw std::runtime_error( \
std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \
cudaGetErrorString(errorCode))); \
} \
} while (false)

using namespace CoroutineTests::alien;

template <typename T>
struct DeviceBuffer {
T* ptr = nullptr;
std::size_t size = 0;
};

template <typename F>
tool::Task<void> delegate(F f) {
f();
co_return;
}

subtool::Task<DeviceBuffer<int>> clusterization(
DeviceBuffer<int> cells, cudaStream_t stream,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {

const auto self = format_name(parent, "clusterization");
log(self) << "Starting clusterization" << std::endl;

const auto nCells = static_cast<int>(cells.size);

// Copy cells back to host to count non-zero entries
auto h_cells = std::vector<int>(nCells);

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated copy of cells from device to host"
<< std::endl;
ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr,
nCells * sizeof(int),
cudaMemcpyDeviceToHost, stream));
}));

ERROR_CHECK_CUDA(co_await StreamAwaitable{stream});

auto nClusters = 0;
for (auto v : h_cells)
if (v != 0)
++nClusters;

log(self) << "Found " << nClusters << " clusters" << std::endl;

// Allocate clusters of appropriate size on device
int* d_clusters = nullptr;

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of clusters on device"
<< std::endl;
ERROR_CHECK_CUDA(
cudaMallocAsync(reinterpret_cast<void**>(&d_clusters),
nClusters * sizeof(int), stream));

// Write some dummy data to the clusters buffer to simulate work
ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 0,
nClusters * sizeof(int), stream));
ERROR_CHECK_CUDA(cudaMemsetAsync(
d_clusters, 1, nClusters / 2 * sizeof(int), stream));
}));

co_return DeviceBuffer<int>{d_clusters,
static_cast<std::size_t>(nClusters)};
}

subtool::Task<DeviceBuffer<int>> seeding(
DeviceBuffer<int> clusters, cudaStream_t stream,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {

const auto self = format_name(parent, "seeding");
log(self) << "Starting seeding" << std::endl;

const auto nClusters = static_cast<int>(clusters.size);

// Copy clusters to host to count non-zero entries
auto h_clusters = std::vector<int>(nClusters);

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated copy of clusters to host" << std::endl;
ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr,
nClusters * sizeof(int),
cudaMemcpyDeviceToHost, stream));
}));

ERROR_CHECK_CUDA(co_await StreamAwaitable{stream});

int nSeeds = 0;
for (auto v : h_clusters)
if (v != 0)
++nSeeds;

log(self) << "Found " << nSeeds << " seeds" << std::endl;

// Allocate seeds of appropriate size on device
int* d_seeds = nullptr;

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of seeds on device" << std::endl;
ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast<void**>(&d_seeds),
nSeeds * sizeof(int), stream));

// Write some dummy data to the seeds buffer to simulate work
ERROR_CHECK_CUDA(
cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream));
ERROR_CHECK_CUDA(
cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream));
}));

co_return DeviceBuffer<int>{d_seeds, static_cast<std::size_t>(nSeeds)};
}

tool::Task<tool::StatusCode> reconstruct(
cudaStream_t stream,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {
const auto self = format_name(parent, "reconstruction");
log(self) << "Starting reconstruction" << std::endl;

// Allocate some dummy input data on the device
auto cells = DeviceBuffer<int>{nullptr, 1000};
co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of input data on device"
<< std::endl;
ERROR_CHECK_CUDA(
cudaMallocAsync(reinterpret_cast<void**>(&cells.ptr),
cells.size * sizeof(int), stream));
ERROR_CHECK_CUDA(cudaMemsetAsync(cells.ptr, 1,
cells.size * sizeof(int), stream));
}));

// Run the clusterization and seeding steps
auto clusters =
co_await clusterization(cells, stream, delegation_scheduler, self);
auto seeds = co_await seeding(clusters, stream, delegation_scheduler, self);

// Cleanup
co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated cleanup of device memory" << std::endl;
ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream));
ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream));
ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream));
}));

ERROR_CHECK_CUDA(co_await StreamAwaitable{stream});

log(self) << "Finishing reconstruction" << std::endl;
co_return tool::StatusCode::SUCCESS;
}

int main() {
int deviceCount = 0;
auto error_id = cudaGetDeviceCount(&deviceCount);

if (error_id != cudaSuccess) {
std::cout << "cudaGetDeviceCount returned "
<< static_cast<int>(error_id) << "\n"
<< cudaGetErrorString(error_id) << "\n";
return EXIT_FAILURE;
}

if (deviceCount == 0) {
std::cout << "No CUDA devices found.\n";
return EXIT_FAILURE;
}

log() << "main Starting" << std::endl;

tbb::task_arena task_arena{2, 0};
auto scheduler = [&task_arena](std::coroutine_handle<> handle) {
task_arena.enqueue([handle]() { handle.resume(); });
};

CoroutineTests::Threadpool delegation_threadpool(1);
auto delegation_scheduler =
[&delegation_threadpool](std::coroutine_handle<> handle) {
delegation_threadpool.enqueue_task(handle);
};

{
cudaStream_t stream;
ERROR_CHECK_CUDA(cudaStreamCreate(&stream));
std::cout << "--- Single event, synchronous wait for completion ---\n";
log() << "main Launching algorithm..." << std::endl;
auto status = sync_wait(
scheduler, reconstruct(stream, delegation_scheduler, "main"));
log() << "main Final status of algorithm " << status << "" << std::endl;
ERROR_CHECK_CUDA(cudaStreamDestroy(stream));
}

{
std::cout << "--- Multiple events, wait for all to complete ---\n";

auto streams = std::vector<cudaStream_t>(2);
auto status = std::vector<tool::StatusCode>(streams.size());
for (auto& stream : streams) {
ERROR_CHECK_CUDA(cudaStreamCreate(&stream));
}

auto scope = counting_scope{};
log() << "main Launching algorithms..." << std::endl;

auto payload = [](std::vector<cudaStream_t> streams,
std::vector<tool::StatusCode>& statuses,
std::function<void(std::coroutine_handle<>)>
delegation_scheduler,
int i) -> tool::Task<void> {
const auto name = std::format("event{}:main", i);
auto& stream = streams.at(i);
auto& status = statuses.at(i);
status = co_await reconstruct(
stream, std::move(delegation_scheduler), name);
co_return;
};

for (std::size_t i = 0; i < streams.size(); ++i) {
scope.spawn(scheduler,
payload(streams, status, delegation_scheduler,
static_cast<int>(i)));
}
scope.join();

for (auto& stream : streams) {
ERROR_CHECK_CUDA(cudaStreamDestroy(stream));
}

log() << "main All algorithms completed. Final statuses: ";
for (auto s : status) {
std::cout << s << ' ';
}
std::cout << std::endl;
}
return 0;
}
38 changes: 4 additions & 34 deletions examples/alien_reco.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include "CoroutineTests/alien/subtool.hpp"
#include "CoroutineTests/alien/sync_wait.hpp"
#include "CoroutineTests/alien/tool.hpp"
#include "logging_utils.hpp" // log, format_name
#include "alien_stream_await.hpp" // StreamAwaitable
#include "logging_utils.hpp" // log, format_name

#define ERROR_CHECK_CUDA(EXP) \
do { \
Expand All @@ -22,37 +23,6 @@

using namespace CoroutineTests::alien;

/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain
/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback
/// on the stream.
class StreamAwaitable {
public:
StreamAwaitable(cudaStream_t stream) : m_stream(stream) {}

bool await_ready() const noexcept { return false; }
template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> handle) {
m_error = cudaLaunchHostFunc(m_stream, resumption_callback<Promise>,
handle.address());
// If the callback couldn't be registered, we need to reschedule the
// coroutine immediately to avoid deadlock.
if (m_error != cudaSuccess) {
handle.promise().reschedule();
}
}
cudaError_t await_resume() const noexcept { return m_error; }

private:
cudaStream_t m_stream;
cudaError_t m_error = cudaSuccess;

template <typename Promise>
static void resumption_callback(void* userData) {
auto handle = std::coroutine_handle<Promise>::from_address(userData);
handle.promise().reschedule();
}
};

template <typename T>
struct DeviceBuffer {
T* ptr = nullptr;
Expand Down Expand Up @@ -84,7 +54,7 @@ subtool::Task<DeviceBuffer<int>> clusterization(DeviceBuffer<int> cells,

log(self) << "Found " << nClusters << " clusters" << std::endl;

// Allocate clusters of appropiate size on device
// Allocate clusters of appropriate size on device
int* d_clusters = nullptr;
ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast<void**>(&d_clusters),
nClusters * sizeof(int), stream));
Expand Down Expand Up @@ -124,7 +94,7 @@ subtool::Task<DeviceBuffer<int>> seeding(DeviceBuffer<int> clusters,

log(self) << "Found " << nSeeds << " seeds" << std::endl;

// Allocate seeds of appropiate size on device
// Allocate seeds of appropriate size on device
int* d_seeds = nullptr;
ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast<void**>(&d_seeds),
nSeeds * sizeof(int), stream));
Expand Down
35 changes: 35 additions & 0 deletions examples/alien_stream_await.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once
#include <cuda_runtime_api.h>

#include <coroutine>

/// Awaitable that resumes a coroutine when a CUDA stream reaches a certain
/// point. Internally cudaLaunchHostFunc is used to set up a resumption callback
/// on the stream.
class StreamAwaitable {
public:
explicit StreamAwaitable(cudaStream_t stream) : m_stream(stream) {}

bool await_ready() const noexcept { return false; }
template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> handle) {
m_error = cudaLaunchHostFunc(m_stream, resumption_callback<Promise>,
handle.address());
// If the callback couldn't be registered, we need to reschedule the
// coroutine immediately to avoid deadlock.
if (m_error != cudaSuccess) {
handle.promise().reschedule();
}
}
cudaError_t await_resume() const noexcept { return m_error; }

private:
cudaStream_t m_stream;
cudaError_t m_error = cudaSuccess;

template <typename Promise>
static void resumption_callback(void* userData) {
auto handle = std::coroutine_handle<Promise>::from_address(userData);
handle.promise().reschedule();
}
};
Loading