Skip to content

Commit

Permalink
Updated coroutine.h: cco_runtime deprecated, renamed => cco_fiber. Ad…
Browse files Browse the repository at this point in the history
…ded cco_spawn(), cco_await_join() to spawn / join a task in a new fiber.
  • Loading branch information
tylov committed Feb 10, 2025
1 parent 45f264d commit 008749d
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 228 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fast:
all: $(PROGRAMS)
@echo

$(PROGRAMS): $(LIB_PATH)
$(PROGRAMS): $(LIB_PATH) $(MAKEFILE)

clean:
@$(RM_F) $(LIB_OBJS) $(TEST_OBJS) $(EX_OBJS) $(LIB_DEPS) $(EX_DEPS) $(LIB_PATH) $(EX_EXES) $(TEST_EXE)
Expand All @@ -78,17 +78,17 @@ $(LIB_PATH): $(LIB_OBJS)

$(OBJ_DIR)/%.o: %.c
@$(MKDIR_P) $(@D)
@printf "\r\e[2K%s" "$(CC) $< -c -o $@"
@printf "\r\e[2K%s" "$(CC) $(<F) -o $@"
@$(CC) $< -c -o $@ $(CFLAGS)

$(OBJ_DIR)/%.o: %.cpp
@$(MKDIR_P) $(@D)
@printf "\r\e[2K%s" "$(CXX) $< -c -o $@"
@printf "\r\e[2K%s" "$(CXX) $(<F) -o $@"
@$(CXX) $< -c -o $@ $(CXXFLAGS)

$(OBJ_DIR)/%$(DOTEXE): %.c $(LIB_PATH)
@$(MKDIR_P) $(@D)
@printf "\r\e[2K%s" "$(CC) -o $(@F) $< -s $(LDFLAGS) -L$(BUILDDIR) -l$(LIB_NAME)"
@printf "\r\e[2K%s" "$(CC) $(<F) -o $@"
@$(CC) -o $@ $(CFLAGS) -s $< $(LDFLAGS) -L$(BUILDDIR) -l$(LIB_NAME)

$(TEST_EXE): $(TEST_OBJS)
Expand Down
118 changes: 60 additions & 58 deletions docs/coroutine_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ void cco_reset(Coroutine* co); // Reset sta
void cco_stop(Coroutine* co); // Next resume of coroutine enters `cco_finally:`.
cco_run_coroutine(coroutine(co)) {}; // Run blocking until coroutine is finished.
```
#### Task specific (coroutine function-objects)
#### Tasks (coroutine function-objects) and fibers (thread-like entity within a thread)
```c++
cco_task_struct(name) { <name>_state cco; ... }; // A custom coroutine task struct; extends cco_task struct.
void cco_await_task(cco_task* task, cco_runtime* rt); // Await for task to return CCO_DONE (asymmetric call).
void cco_await_task(cco_task* task, cco_runtime* rt, int awaitbits); // Await until task's suspend/return value
// to be in (awaitbits | CCO_DONE).
void cco_yield_to(cco_task* task, cco_runtime* rt); // Yield to task (symmetric control transfer).
void cco_throw_error(uint16_t error, cco_runtime* rt); // Throw an error and unwind call stack at the cco_finally point.
// Error accessible as `rt->error` and `rt->error_line`.
void cco_recover_error(cco_runtime* rt); // Reset error, and jump to original resume point in current task.
void cco_resume_task(cco_task* task, cco_runtime* rt); // Resume suspended task, return value in `rt->result`.
void cco_await_task(cco_task* task, cco_fiber* fb); // Await for task to return CCO_DONE (asymmetric call).
void cco_await_task(cco_task* task, cco_fiber* fb, int awaitbits); // Await until task's suspend/return value
// to be in (awaitbits | CCO_DONE).
void cco_yield_to(cco_task* task, cco_fiber* fb); // Yield to task (symmetric control transfer).
void cco_throw_error(int error, cco_fiber* fb); // Throw an error and unwind call stack at the cco_finally point.
// Error accessible as `fb->error` and `fb->error_line`.
void cco_recover_error(cco_fiber* fb); // Reset error, and jump to original resume point in current task.
void cco_resume_task(cco_task* task, cco_fiber* fb); // Resume suspended task, return value in `fb->result`.
cco_fiber* cco_spawn(cco_task* task, cco_fiber* fb); // Spawn a task in a new returned fiber.
void cco_await_joined(cco_fiber* fb); // Await for all spawned parallel fibers to be joined.
cco_run_task(cco_task* task) {} // Run blocking until task is finished.
cco_run_task(cco_task* task, <Environment> *env) {} // Run blocking with env data
```
Expand Down Expand Up @@ -96,7 +98,7 @@ cco_semaphore cco_make_semaphore(long value); // Create se
|`cco_timer` | Timer type | |
|`cco_semaphore` | Semaphore type | |
|`cco_taskrunner` | Coroutine | Executor coroutine which handles asymmetric and<br> symmetric coroutine control flows, |
|`cco_runtime` | Struct type | Runtime object to manage cco_taskrunner states |
|`cco_fiber` | Struct type | Represent a thread-like entity within a thread |
## Rules
1. Avoid declaring local variables within a `cco_routine` scope. They are only alive until next `cco_yield..` or `cco_await..`
Expand Down Expand Up @@ -345,31 +347,31 @@ typedef struct {
struct TaskC C;
} Subtasks;
int taskC(struct TaskC* self, cco_runtime* rt) {
int TaskC(struct TaskC* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskC start: {%g, %g}\n", self->x, self->y);
// assume there is an error...
cco_throw_error(99, rt);
cco_throw_error(99, fb);
puts("TaskC work");
cco_yield;
puts("TaskC more work");
cco_finally:
if (rt->error) {
if (fb->error) {
puts("TaskC has error, ignored");
}
puts("TaskC done");
}
return 0;
}
int taskB(struct TaskB* self, cco_runtime* rt) {
int TaskB(struct TaskB* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskB start: %g\n", self->d);
Subtasks* sub = rt->env;
cco_await_task(&sub->C, rt);
Subtasks* sub = fb->env;
cco_await_task(&sub->C, fb);
puts("TaskB work");
cco_finally:
Expand All @@ -378,29 +380,29 @@ int taskB(struct TaskB* self, cco_runtime* rt) {
return 0;
}
int taskA(struct TaskA* self, cco_runtime* rt) {
int TaskA(struct TaskA* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskA start: %d\n", self->a);
Subtasks* sub = rt->env;
cco_await_task(&sub->B, rt);
Subtasks* sub = fb->env;
cco_await_task(&sub->B, fb);
puts("TaskA work");
cco_finally:
if (rt->error == 99) {
if (fb->error == 99) {
// if error not handled, will cause 'unhandled error'...
printf("TaskA recovered error '99' thrown on line %d\n", rt->error_line);
cco_recover_error(rt);
printf("TaskA recovered error '99' thrown on line %d\n", fb->error_line);
cco_recover_error(fb);
}
puts("TaskA done");
}
return 0;
}
int start(cco_task* self, cco_runtime* rt) {
int start(cco_task* self, cco_fiber* fb) {
cco_routine (self) {
puts("start");
Subtasks* sub = rt->env;
cco_await_task(&sub->A, rt);
Subtasks* sub = fb->env;
cco_await_task(&sub->A, fb);
cco_finally:
puts("done");
Expand All @@ -411,9 +413,9 @@ int start(cco_task* self, cco_runtime* rt) {
int main(void)
{
Subtasks env = {
{{taskA}, 42},
{{taskB}, 3.1415},
{{taskC}, 1.2f, 3.4f},
{{TaskA}, 42},
{{TaskB}, 3.1415},
{{TaskC}, 1.2f, 3.4f},
};
cco_task task = {{start}};
Expand All @@ -430,7 +432,7 @@ int main(void)
Sometimes the call-tree is dynamic or more complex, then we can dynamically allocate the coroutine frames before
they are awaited. This is somewhat more general and simpler, but requires heap allocation. Note that the coroutine
frames are now freed at the end of the coroutine functions (after any cleanup at cco_finally). Example is based on
the previous, but also shows how to use the env field in `cco_runtime` to return a value from the coroutine
the previous, but also shows how to use the env field in `cco_fiber` to return a value from the coroutine
call/await:

<details>
Expand All @@ -448,21 +450,21 @@ cco_task_struct (TaskC) { TaskC_state cco; float x, y; };

typedef struct { double value; int error; } Result;

int taskC(struct TaskC* self, cco_runtime* rt) {
int TaskC(struct TaskC* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskC start: {%g, %g}\n", self->x, self->y);

// assume there is an error...
cco_throw_error(99, rt);
cco_throw_error(99, fb);

puts("TaskC work");
cco_yield;
puts("TaskC more work");
// initial return value
((Result *)rt->env)->value = self->x * self->y;
((Result *)fb->env)->value = self->x * self->y;

cco_finally:
if (rt->error) {
if (fb->error) {
puts("TaskC has error, ignored");
}
puts("TaskC done");
Expand All @@ -471,12 +473,12 @@ int taskC(struct TaskC* self, cco_runtime* rt) {
return 0;
}

int taskB(struct TaskB* self, cco_runtime* rt) {
int TaskB(struct TaskB* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskB start: %g\n", self->d);
cco_await_task(c_new(struct TaskC, {{taskC}, 1.2f, 3.4f}), rt);
cco_await_task(cco_new_task(TaskC, 1.2f, 3.4f), fb);
puts("TaskB work");
((Result *)rt->env)->value += self->d;
((Result *)fb->env)->value += self->d;

cco_finally:
puts("TaskB done");
Expand All @@ -485,30 +487,30 @@ int taskB(struct TaskB* self, cco_runtime* rt) {
return 0;
}

int taskA(struct TaskA* self, cco_runtime* rt) {
int TaskA(struct TaskA* self, cco_fiber* fb) {
cco_routine (self) {
printf("TaskA start: %d\n", self->a);
cco_await_task(c_new(struct TaskB, {{taskB}, 3.1415}), rt);
cco_await_task(cco_new_task(TaskB, 3.1415), fb);
puts("TaskA work");
((Result *)rt->env)->value += self->a; // final return value;
((Result *)fb->env)->value += self->a; // final return value;

cco_finally:
if (rt->error == 99) {
if (fb->error == 99) {
// if error not handled, will cause 'unhandled error'...
printf("TaskA recovered error '99' thrown on line %d\n", rt->error_line);
((Result *)rt->env)->error = rt->error; // set error in output
cco_recover_error(rt); // reset error to 0 and jump to after the await call.
printf("TaskA recovered error '99' thrown on line %d\n", fb->error_line);
((Result *)fb->env)->error = fb->error; // set error in output
cco_recover_error(fb); // reset error to 0 and jump to after the await call.
}
puts("TaskA done");
}
free(self);
return 0;
}

int start(cco_task* self, cco_runtime* rt) {
int start(cco_task* self, cco_fiber* fb) {
cco_routine (self) {
puts("start");
cco_await_task(c_new(struct TaskA, {{taskA}, 42}), rt);
cco_await_task(cco_new_task(TaskA, 42), fb);

cco_finally:
puts("done");
Expand Down Expand Up @@ -570,7 +572,7 @@ cco_task_struct (consume) {
};
int produce(struct produce* self, cco_runtime* rt) {
int produce(struct produce* self, cco_fiber* fb) {
cco_routine (self) {
while (1) {
if (self->serial > self->total) {
Expand All @@ -589,18 +591,18 @@ int produce(struct produce* self, cco_runtime* rt) {
puts("");
}
cco_yield_to(self->consumer, rt); // symmetric transfer
cco_yield_to(self->consumer, fb); // symmetric transfer
}
cco_finally:
cco_cancel_task(self->consumer, rt);
cco_cancel_task(self->consumer, fb);
Inventory_drop(&self->inventory);
puts("cleanup producer");
}
return 0;
}
int consume(struct consume* self, cco_runtime* rt) {
int consume(struct consume* self, cco_fiber* fb) {
cco_routine (self) {
int n, sz;
while (1) {
Expand All @@ -612,7 +614,7 @@ int consume(struct consume* self, cco_runtime* rt) {
Inventory_pop(&self->producer->inventory);
printf("consumed %d items\n", n);
cco_yield_to(self->producer, rt); // symmetric transfer
cco_yield_to(self->producer, fb); // symmetric transfer
}
cco_finally:
Expand Down Expand Up @@ -685,14 +687,14 @@ cco_task_struct (TaskX) {
char id;
};

int scheduler(struct Scheduler* self, cco_runtime* rt) {
int scheduler(struct Scheduler* self, cco_fiber* fb) {
cco_routine (self) {
while (!Tasks_is_empty(&self->tasks)) {
self->_pulled = Tasks_pull(&self->tasks);

cco_await_task(self->_pulled, rt, CCO_YIELD | CCO_DONE);
cco_await_task(self->_pulled, fb, CCO_YIELD | CCO_DONE);

if (rt->result == CCO_YIELD) {
if (fb->result == CCO_YIELD) {
Tasks_push(&self->tasks, self->_pulled);
} else { // CCO_DONE
Tasks_value_drop(&self->_pulled);
Expand All @@ -706,8 +708,8 @@ int scheduler(struct Scheduler* self, cco_runtime* rt) {
return 0;
}

static int taskX(struct TaskX* self, cco_runtime* rt) {
(void)rt;
static int taskX(struct TaskX* self, cco_fiber* fb) {
(void)fb;
cco_routine (self) {
printf("Hello from task %c\n", self->id);
cco_yield;
Expand All @@ -722,14 +724,14 @@ static int taskX(struct TaskX* self, cco_runtime* rt) {
return 0;
}

static int taskA(struct TaskA* self, cco_runtime* rt) {
static int TaskA(struct TaskA* self, cco_fiber* fb) {
cco_routine (self) {
puts("Hello from task A");
cco_yield;
puts("A is back doing work");
cco_yield;
puts("A adds task C");
Tasks *_tasks = rt->env; // local var only alive until cco_yield.
Tasks *_tasks = fb->env; // local var only alive until cco_yield.
Tasks_push(_tasks, cco_cast_task(c_new(struct TaskX, {.cco={taskX}, .id='C'})));
cco_yield;
puts("A is back doing more work");
Expand All @@ -746,7 +748,7 @@ int main(void) {
struct Scheduler* schedule = c_new(struct Scheduler, {
.cco={scheduler},
.tasks = c_make(Tasks, {
cco_cast_task(c_new(struct TaskA, {.cco={taskA}})),
cco_cast_task(c_new(struct TaskA, {.cco={TaskA}})),
cco_cast_task(c_new(struct TaskX, {.cco={taskX}, .id='B'})),
})
});
Expand Down
Loading

0 comments on commit 008749d

Please sign in to comment.