Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ add_executable(alien_sync_wait alien_sync_wait.cpp)
target_link_libraries(alien_sync_wait PRIVATE CoroutineTests)
add_executable(alien_counting_scope alien_counting_scope.cpp)
target_link_libraries(alien_counting_scope PRIVATE CoroutineTests)

add_executable(alien_schedule_on alien_schedule_on.cpp)
target_link_libraries(alien_schedule_on PRIVATE CoroutineTests)

if(BUILD_STDEXEC)
add_executable(exec_task_stdexec exec_task.cpp)
Expand Down
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ Link: [alien_counting_scope.cpp](alien_counting_scope.cpp)

This example demonstrates dynamic work submitting with `counting_scope` compatible with coroutine semantic as in ["Alien" example](#alien). `spawn` schedules execution of a coroutine that returns `void`. `join()` blocks the current thread until all submitted coroutines are finished (and rethrows the first exception captured from submitted work, if any).

## Alien schedule_on

Link: [alien_schedule_on.cpp](alien_schedule_on.cpp)

This example demonstrates changing scheduler used by part of a chain of coroutines following the semantics from ["Alien" example](#alien). The `schedule_on` algorithm can be used to adapt a coroutine to use a different scheduler than its parent. Starting the coroutine suspends its parent and reschedule work on the new scheduler, finishing the coroutine reschedules continuation of its parent using parent's scheduler.

## Capy task

Link: [capy_task.cpp](capy_task.cpp)
Expand Down
74 changes: 74 additions & 0 deletions examples/alien_schedule_on.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include <coroutine>
#include <functional>
#include <string_view>

#include "CoroutineTests/alien/schedule_on.hpp"
#include "CoroutineTests/alien/sync_wait.hpp"
#include "CoroutineTests/alien/tool.hpp"
#include "CoroutineTests/threadpool.hpp"
#include "alien_timer.hpp" // AsyncTimer
#include "logging_utils.hpp" // log, format_name

using namespace CoroutineTests::alien;

tool::Task<void> inner_most(std::string_view parent) {
const auto self = format_name(parent, "inner_most");
log(self) << "Starting inner_most" << std::endl;
log(self) << "Calling async API in inner_most" << std::endl;
auto status = co_await AsyncTimer{std::chrono::milliseconds(75),
StatusCode::SUCCESS, self};
log(self) << "Result from async API in inner_most: " << status << std::endl;
log(self) << "Finishing inner_most" << std::endl;
co_return;
}

tool::Task<tool::StatusCode> inner(std::string_view parent) {
const auto self = format_name(parent, "inner");
log(self) << "Starting inner" << std::endl;
co_await inner_most(self);
log(self) << "Finishing inner" << std::endl;
co_return tool::StatusCode::SUCCESS;
}

tool::Task<void> outer(
std::function<void(std::coroutine_handle<>)> inner_scheduler,
std::string_view parent) {
const auto self = format_name(parent, "outer");
log(self) << "Starting outer" << std::endl;
auto status = co_await schedule_on(inner_scheduler, inner(self));
log(self) << "Received status " << status << " from inner" << std::endl;
log(self) << "Finishing outer" << std::endl;
co_return;
}

tool::Task<tool::StatusCode> outer_most(
std::function<void(std::coroutine_handle<>)> inner_scheduler,
std::string_view parent) {
const auto self = format_name(parent, "outer_most");
log(self) << "Starting outer_most" << std::endl;
co_await outer(inner_scheduler, self);
log(self) << "Finishing outer_most" << std::endl;
co_return tool::StatusCode::SUCCESS;
}

int main() {
log() << "main Starting" << std::endl;
CoroutineTests::Threadpool threadpool(1);
auto scheduler = [&threadpool](std::coroutine_handle<> handle) {
log() << "scheduler called, enqueuing work" << std::endl;
threadpool.enqueue_task(handle);
};

CoroutineTests::Threadpool inner_threadpool(1);
auto inner_scheduler = [&inner_threadpool](std::coroutine_handle<> handle) {
log() << "inner_scheduler called, enqueuing work" << std::endl;
inner_threadpool.enqueue_task(handle);
};

log() << "main Launching outer_most and waiting for completion..."
<< std::endl;
auto status = CoroutineTests::alien::sync_wait(
scheduler, outer_most(inner_scheduler, "main"));
log() << "main Final status of outer_most " << status << "" << std::endl;
return 0;
}
179 changes: 179 additions & 0 deletions include/CoroutineTests/alien/schedule_on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#pragma once

#include <concepts>
#include <coroutine>
#include <exception>
#include <functional>
#include <optional>

namespace CoroutineTests::alien {

namespace detail::schedule_on {
namespace concepts {
template <typename T>
concept HasScheduler = requires(T t) {
{
t.get_scheduler()
} -> std::convertible_to<std::function<void(std::coroutine_handle<>)>>;
};
} // namespace concepts

// Helper coroutine type allowing for having different scheduler than parent
// coroutine.
template <typename ResultType>
class [[nodiscard]] Task {

static_assert(std::movable<ResultType> || std::same_as<ResultType, void>,
"Task<ResultType> requires ResultType to be movable or void");

public:
using result_type = ResultType;

struct promise_type; // typedef required by coroutines
using handle_type =
std::coroutine_handle<promise_type>; // not required but useful

// Constructor from coroutine handle
Task(handle_type coroutine_handle) : m_coroutine(coroutine_handle) {}
~Task() {
if (m_coroutine) {
m_coroutine.destroy();
}
}
Task() = default;
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task(Task&& other) noexcept : m_coroutine{other.m_coroutine} {
other.m_coroutine = {};
}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (m_coroutine) {
m_coroutine.destroy();
}
m_coroutine = other.m_coroutine;
other.m_coroutine = {};
}
return *this;
}

// Awaitable interface: always suspend to allow async execution
bool await_ready() const noexcept { return false; }
// Awaitable interface: setup parent relationship, suspend parent and
// schedule this coroutine on its scheduler
template <detail::schedule_on::concepts::HasScheduler T>
inline void await_suspend(std::coroutine_handle<T> handle) noexcept;
// Awaitable interface: return result or rethrow exception on resume
result_type await_resume() const;

private:
handle_type m_coroutine = nullptr;
};

// Helper for handling co_return in promise_type, default implementation for
// non-void ResultType
template <typename ResultType>
struct ReturnHelper {
// Storage for the co_return result value
std::optional<ResultType> m_value;

// Required by coroutines, mutually exclusive with return_void
// Store the co_return value
template <typename T>
requires std::constructible_from<ResultType, T&&>
void return_value(T&& value) {
m_value.emplace(std::forward<T>(value));
}
// Overload to resolve ambiguity
void return_value(ResultType value) { m_value.emplace(std::move(value)); }
};

// Specialization for void return type
template <>
struct ReturnHelper<void> {
// Required by coroutines, mutually exclusive with return_value
// Handle co_return without value
void return_void() {}
};

template <typename ResultType>
struct Task<ResultType>::promise_type
: public ReturnHelper<typename Task<ResultType>::result_type> {
// Storage for exceptions thrown in the coroutine body
std::exception_ptr m_exception;
// Handle to the parent coroutine that co_awaited this task
std::coroutine_handle<> m_parent;
// Handle to scheduler to resume this coroutine and propagate to children
std::function<void(std::coroutine_handle<>)> m_scheduler;
// Handle to the scheduler to resume parent coroutine
std::function<void(std::coroutine_handle<>)> m_parent_scheduler;

// Non-default constructor to pass the scheduler. The constructor will be
// used if coroutine function has the same signature. The unused parameters
// are here only to match the signature.
template <typename Coro>
promise_type(std::function<void(std::coroutine_handle<>)> scheduler,
Coro&&) noexcept
: m_scheduler(scheduler) {}

// Accessor for scheduler used by child coroutines
const auto& get_scheduler() const { return m_scheduler; }
// Schedule resumption of this task
void reschedule() { m_scheduler(handle_type::from_promise(*this)); }

// Required by coroutines: create the object
Task get_return_object() { return {handle_type::from_promise(*this)}; }
// Required by coroutines: suspend immediately on start (lazy execution)
std::suspend_always initial_suspend() const { return {}; }
// Required by coroutines: handle completion and resume parent
auto final_suspend() const noexcept {
struct final_awaiter {
// Don't skip final suspension
bool await_ready() const noexcept { return false; }
// Resume parent coroutine on its own scheduler
void await_suspend(handle_type handle) noexcept {
auto parent = handle.promise().m_parent;
auto parent_scheduler = handle.promise().m_parent_scheduler;
if (parent && parent_scheduler) {
parent_scheduler(parent);
}
}
// No action needed on resume
void await_resume() const noexcept {}
};
return final_awaiter{};
}
// Required by coroutines: capture exceptions for later rethrowing
void unhandled_exception() { m_exception = std::current_exception(); }
};

template <typename ResultType>
template <detail::schedule_on::concepts::HasScheduler T>
inline void Task<ResultType>::await_suspend(
std::coroutine_handle<T> handle) noexcept {
m_coroutine.promise().m_parent = handle;
m_coroutine.promise().m_parent_scheduler = handle.promise().get_scheduler();
m_coroutine.promise().reschedule();
}

template <typename ResultType>
inline typename Task<ResultType>::result_type Task<ResultType>::await_resume()
const {
if (m_coroutine.promise().m_exception) {
std::rethrow_exception(m_coroutine.promise().m_exception);
}
if constexpr (std::same_as<result_type, void>) {
return;
} else {
return std::move(m_coroutine.promise().m_value).value();
}
}
} // namespace detail::schedule_on

template <typename Coro>
auto schedule_on(std::function<void(std::coroutine_handle<>)>, Coro coro)
-> detail::schedule_on::Task<typename Coro::result_type> {
co_return co_await coro;
}

} // namespace CoroutineTests::alien