Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ if(BUILD_CUDA AND BUILD_TBB)
target_link_libraries(alien_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb)
add_executable(alien_delegate alien_delegate.cpp)
target_link_libraries(alien_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb)
add_executable(alien_event_poll alien_event_poll.cpp)
target_link_libraries(alien_event_poll PRIVATE CoroutineTests CUDA::cudart TBB::tbb)
if(BUILD_STDEXEC)
add_executable(exec_reco_stdexec exec_reco.cpp)
target_link_libraries(exec_reco_stdexec PRIVATE CoroutineTests CUDA::cudart TBB::tbb STDEXEC::stdexec)
Expand All @@ -83,5 +85,7 @@ if(BUILD_CUDA AND BUILD_TBB)
target_link_libraries(capy_reco PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy)
add_executable(capy_delegate capy_delegate.cpp)
target_link_libraries(capy_delegate PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy)
add_executable(capy_event_poll capy_event_poll.cpp)
target_link_libraries(capy_event_poll PRIVATE CoroutineTests CUDA::cudart TBB::tbb Boost::capy)
endif()
endif()
8 changes: 7 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ These examples show a setup and coroutine chain loosely inspired by track recons

Link: [alien_delegate.cpp](alien_delegate.cpp), [exec_delegate.cpp](exec_delegate.cpp), [capy_delegate.cpp](capy_delegate.cpp)

These examples are a modification of [reconstuction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors.
These examples are a modification of [reconstruction examples](#reconstruction) to delegate all the calls to CUDA API that are happening inside the task to execute on a designated thread by changing scheduler/executor. This represents a specific optimization: invoking CUDA APIs from multiple threads can incur performance penalties due to contention on internal CUDA locks. This contention can be reduced by delegating the calls to a single thread. The examples demonstrate how this optimization can be implemented using coroutines schedulers/executors.

## Event poll

Link: [alien_event_poll.cpp](alien_event_poll.cpp), [capy_event_poll.cpp](capy_event_poll.cpp)

These examples are a variant of [delegate examples](#delegate) in which awaiting completion of CUDA operations is done by repeatedly querying the event state instead of using a callback as in earlier examples. All the queries are executed by a specific thread.
303 changes: 303 additions & 0 deletions examples/alien_event_poll.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
#include <cuda_runtime_api.h>
#include <driver_types.h>
#include <tbb/task_arena.h>

#include <cstddef>
#include <format>
#include <iostream>
#include <stdexcept>
#include <string_view>
#include <vector>

#include "CoroutineTests/alien/counting_scope.hpp"
#include "CoroutineTests/alien/schedule_on.hpp"
#include "CoroutineTests/alien/subtool.hpp"
#include "CoroutineTests/alien/sync_wait.hpp"
#include "CoroutineTests/alien/tool.hpp"
#include "CoroutineTests/threadpool.hpp"
#include "logging_utils.hpp" // log, format_name

#define ERROR_CHECK_CUDA(EXP) \
do { \
cudaError_t errorCode = EXP; \
if (errorCode != cudaSuccess) { \
throw std::runtime_error( \
std::format("CUDA error at {}:{}: {}", __FILE__, __LINE__, \
cudaGetErrorString(errorCode))); \
} \
} while (false)

using namespace CoroutineTests::alien;

template <typename T>
struct DeviceBuffer {
T* ptr = nullptr;
std::size_t size = 0;
};

template <typename F>
tool::Task<void> delegate(F f) {
f();
co_return;
}

struct Retry {
bool await_ready() const noexcept { return false; }
template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> handle) const {
handle.promise().reschedule();
}
void await_resume() const noexcept {}
};

tool::Task<void> poll(cudaEvent_t event) {
auto status = cudaSuccess;
log() << "Polling for event completion..." << std::endl;
while ((status = cudaEventQuery(event)) == cudaErrorNotReady) {
log() << "Event not ready, retrying..." << std::endl;
co_await Retry{};
}
ERROR_CHECK_CUDA(status);
}

subtool::Task<DeviceBuffer<int>> clusterization(
DeviceBuffer<int> cells, cudaStream_t stream, cudaEvent_t event,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {

const auto self = format_name(parent, "clusterization");
log(self) << "Starting clusterization" << std::endl;

const auto nCells = static_cast<int>(cells.size);

// Copy cells back to host to count non-zero entries
auto h_cells = std::vector<int>(nCells);

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated copy of cells from device to host"
<< std::endl;
ERROR_CHECK_CUDA(cudaMemcpyAsync(h_cells.data(), cells.ptr,
nCells * sizeof(int),
cudaMemcpyDeviceToHost, stream));
ERROR_CHECK_CUDA(cudaEventRecord(event, stream));
}));

co_await schedule_on(delegation_scheduler, poll(event));

auto nClusters = 0;
for (auto v : h_cells)
if (v != 0)
++nClusters;

log(self) << "Found " << nClusters << " clusters" << std::endl;

// Allocate clusters of appropriate size on device
int* d_clusters = nullptr;

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of clusters on device"
<< std::endl;
ERROR_CHECK_CUDA(
cudaMallocAsync(reinterpret_cast<void**>(&d_clusters),
nClusters * sizeof(int), stream));

// Write some dummy data to the clusters buffer to simulate work
ERROR_CHECK_CUDA(cudaMemsetAsync(d_clusters, 0,
nClusters * sizeof(int), stream));
ERROR_CHECK_CUDA(cudaMemsetAsync(
d_clusters, 1, nClusters / 2 * sizeof(int), stream));
}));

co_return DeviceBuffer<int>{d_clusters,
static_cast<std::size_t>(nClusters)};
}

subtool::Task<DeviceBuffer<int>> seeding(
DeviceBuffer<int> clusters, cudaStream_t stream, cudaEvent_t event,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {

const auto self = format_name(parent, "seeding");
log(self) << "Starting seeding" << std::endl;

const auto nClusters = static_cast<int>(clusters.size);

// Copy clusters to host to count non-zero entries
auto h_clusters = std::vector<int>(nClusters);

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated copy of clusters to host" << std::endl;
ERROR_CHECK_CUDA(cudaMemcpyAsync(h_clusters.data(), clusters.ptr,
nClusters * sizeof(int),
cudaMemcpyDeviceToHost, stream));
ERROR_CHECK_CUDA(cudaEventRecord(event, stream));
}));

co_await schedule_on(delegation_scheduler, poll(event));

int nSeeds = 0;
for (auto v : h_clusters)
if (v != 0)
++nSeeds;

log(self) << "Found " << nSeeds << " seeds" << std::endl;

// Allocate seeds of appropriate size on device
int* d_seeds = nullptr;

co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of seeds on device" << std::endl;
ERROR_CHECK_CUDA(cudaMallocAsync(reinterpret_cast<void**>(&d_seeds),
nSeeds * sizeof(int), stream));

// Write some dummy data to the seeds buffer to simulate work
ERROR_CHECK_CUDA(
cudaMemsetAsync(d_seeds, 0, nSeeds * sizeof(int), stream));
ERROR_CHECK_CUDA(
cudaMemsetAsync(d_seeds, 1, nSeeds / 2 * sizeof(int), stream));
}));

co_return DeviceBuffer<int>{d_seeds, static_cast<std::size_t>(nSeeds)};
}

tool::Task<tool::StatusCode> reconstruct(
cudaStream_t stream, cudaEvent_t event,
std::function<void(std::coroutine_handle<>)> delegation_scheduler,
std::string_view parent) {
const auto self = format_name(parent, "reconstruction");
log(self) << "Starting reconstruction" << std::endl;

// Allocate some dummy input data on the device
auto cells = DeviceBuffer<int>{nullptr, 1000};
co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated allocation of input data on device"
<< std::endl;
ERROR_CHECK_CUDA(
cudaMallocAsync(reinterpret_cast<void**>(&cells.ptr),
cells.size * sizeof(int), stream));
ERROR_CHECK_CUDA(cudaMemsetAsync(cells.ptr, 1,
cells.size * sizeof(int), stream));
}));

// Run the clusterization and seeding steps
auto clusters = co_await clusterization(cells, stream, event,
delegation_scheduler, self);
auto seeds =
co_await seeding(clusters, stream, event, delegation_scheduler, self);

// Cleanup
co_await schedule_on(
delegation_scheduler, delegate([&]() {
log(self) << "Delegated cleanup of device memory" << std::endl;
ERROR_CHECK_CUDA(cudaFreeAsync(cells.ptr, stream));
ERROR_CHECK_CUDA(cudaFreeAsync(clusters.ptr, stream));
ERROR_CHECK_CUDA(cudaFreeAsync(seeds.ptr, stream));
ERROR_CHECK_CUDA(cudaEventRecord(event, stream));
}));

co_await schedule_on(delegation_scheduler, poll(event));

log(self) << "Finishing reconstruction" << std::endl;
co_return tool::StatusCode::SUCCESS;
}

int main() {
int deviceCount = 0;
auto error_id = cudaGetDeviceCount(&deviceCount);

if (error_id != cudaSuccess) {
std::cout << "cudaGetDeviceCount returned "
<< static_cast<int>(error_id) << "\n"
<< cudaGetErrorString(error_id) << "\n";
return EXIT_FAILURE;
}

if (deviceCount == 0) {
std::cout << "No CUDA devices found.\n";
return EXIT_FAILURE;
}

log() << "main Starting" << std::endl;

tbb::task_arena task_arena{2, 0};
auto scheduler = [&task_arena](std::coroutine_handle<> handle) {
task_arena.enqueue([handle]() { handle.resume(); });
};

CoroutineTests::Threadpool delegation_threadpool(1);
auto delegation_scheduler =
[&delegation_threadpool](std::coroutine_handle<> handle) {
delegation_threadpool.enqueue_task(handle);
};

{
cudaStream_t stream;
ERROR_CHECK_CUDA(cudaStreamCreate(&stream));
cudaEvent_t event;
ERROR_CHECK_CUDA(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
std::cout << "--- Single event, synchronous wait for completion ---\n";
log() << "main Launching algorithm..." << std::endl;
auto status =
sync_wait(scheduler,
reconstruct(stream, event, delegation_scheduler, "main"));
log() << "main Final status of algorithm " << status << "" << std::endl;
ERROR_CHECK_CUDA(cudaEventDestroy(event));
ERROR_CHECK_CUDA(cudaStreamDestroy(stream));
}

{
std::cout << "--- Multiple events, wait for all to complete ---\n";

auto streams = std::vector<cudaStream_t>(2);
auto events = std::vector<cudaEvent_t>(streams.size());
auto status = std::vector<tool::StatusCode>(streams.size());
for (std::size_t i = 0; i < streams.size(); ++i) {
ERROR_CHECK_CUDA(cudaStreamCreate(&streams.at(i)));
ERROR_CHECK_CUDA(cudaEventCreateWithFlags(&events.at(i),
cudaEventDisableTiming));
}

auto scope = counting_scope{};
log() << "main Launching algorithms..." << std::endl;

auto payload = [](std::vector<cudaStream_t> streams,
std::vector<cudaEvent_t> events,
std::vector<tool::StatusCode>& statuses,
std::function<void(std::coroutine_handle<>)>
delegation_scheduler,
int i) -> tool::Task<void> {
const auto name = std::format("event{}:main", i);
auto& stream = streams.at(i);
auto& event = events.at(i);
auto& status = statuses.at(i);
status = co_await reconstruct(
stream, event, std::move(delegation_scheduler), name);
co_return;
};

for (std::size_t i = 0; i < streams.size(); ++i) {
scope.spawn(scheduler,
payload(streams, events, status, delegation_scheduler,
static_cast<int>(i)));
}
scope.join();

for (std::size_t i = 0; i < streams.size(); ++i) {
ERROR_CHECK_CUDA(cudaEventDestroy(events.at(i)));
ERROR_CHECK_CUDA(cudaStreamDestroy(streams.at(i)));
}

log() << "main All algorithms completed. Final statuses: ";
for (auto s : status) {
std::cout << s << ' ';
}
std::cout << std::endl;
}
return 0;
}
Loading