Skip to content

Commit 3b561a6

Browse files
committed
example: dag
1 parent f594aa9 commit 3b561a6

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed

example/BUILD

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ create_example_target("ex016-wait_group")
4343
create_example_target("ex017-backup_request")
4444
create_example_target("ex018-local_cache")
4545
create_example_target("ex019-task_and_series")
46+
create_example_target("ex020-dag")
4647

4748
# virtual target to build all examples
4849
cc_library(
@@ -67,5 +68,6 @@ cc_library(
6768
":backup_request",
6869
":local_cache",
6970
":task_and_series",
71+
":dag",
7072
],
7173
)

example/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ set(ALL_EXAMPLES
2020
ex017-backup_request
2121
ex018-local_cache
2222
ex019-task_and_series
23+
ex020-dag
2324
)
2425

2526
include (../cmake/find-workflow.cmake)

example/ex020-dag.cpp

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include <atomic>
2+
#include <iostream>
3+
#include <mutex>
4+
5+
#include "coke/dag.h"
6+
#include "coke/future.h"
7+
#include "coke/sleep.h"
8+
#include "coke/wait.h"
9+
10+
/**
11+
* This example shows how to use dag. For tasks with complex dependencies, using
12+
* dag can simplify the way of executing tasks.
13+
*/
14+
15+
struct MyGraphContext {
16+
char cancel_in_node{0};
17+
std::atomic<bool> canceled{false};
18+
};
19+
20+
using MyDagPtr = std::shared_ptr<coke::DagGraph<MyGraphContext>>;
21+
22+
auto create_node(char x) {
23+
return [x](MyGraphContext &ctx) -> coke::Task<> {
24+
// Coke's dag does not have a cancellation mechanism, but it can be
25+
// achieved by setting a flag in the Context.
26+
if (ctx.canceled.load()) {
27+
std::cout << x << ": the graph is canceled\n";
28+
co_return;
29+
}
30+
31+
std::cout << x << ": start\n";
32+
co_await coke::sleep(0.1);
33+
std::cout << x << ": finish\n";
34+
35+
if (ctx.cancel_in_node == x)
36+
ctx.canceled.store(true);
37+
};
38+
}
39+
40+
MyDagPtr create_dag() {
41+
coke::DagBuilder<MyGraphContext> builder;
42+
43+
auto root = builder.root();
44+
auto A = builder.node(create_node('A'), "This is the node name");
45+
auto B = builder.node(create_node('B'), "B");
46+
auto C = builder.node(create_node('C'), "C");
47+
auto D = builder.node(create_node('D'), "D");
48+
auto E = builder.node(create_node('E'), "E");
49+
auto F = builder.node(create_node('F'), "F");
50+
51+
/**
52+
* Create DAG
53+
* /-> B --\
54+
* root --> A --> C --> E --> F
55+
* \-> D --------/
56+
*/
57+
58+
using Group = coke::DagNodeGroup<MyGraphContext>;
59+
60+
// Connect root to A, short for builder.connect(root, A);
61+
// Each node must be reachable from root.
62+
root > A;
63+
64+
// Connect A to B, C, D, short for A > B; A > C; A > D;
65+
A > Group{B, C, D};
66+
67+
// Connect B, C to E, short for B > E; C > E;
68+
// And then connect E to F.
69+
Group{B, C} > E > F;
70+
71+
// Connect D to F
72+
D > F;
73+
74+
// It can also be simplified to
75+
// root > A > Group{B, C} > E > F;
76+
// A > D > F;
77+
78+
return builder.build();
79+
}
80+
81+
coke::Task<> use_dag() {
82+
auto dag = create_dag();
83+
84+
std::cout << "Is this DAG valid? " << (dag->valid() ? "yes!" : "no!") << std::endl;
85+
std::cout << "The DAG in dot format:\n";
86+
dag->dump(std::cout);
87+
88+
std::cout << std::string(64, '-') << std::endl;
89+
90+
// 1. Use it directly.
91+
{
92+
MyGraphContext ctx;
93+
co_await dag->run(ctx);
94+
}
95+
96+
std::cout << std::string(64, '-') << std::endl;
97+
98+
// 2. Cancel in node.
99+
{
100+
MyGraphContext ctx;
101+
ctx.cancel_in_node = 'C';
102+
co_await dag->run(ctx);
103+
}
104+
105+
std::cout << std::string(64, '-') << std::endl;
106+
107+
// 3. Cancel outside.
108+
{
109+
MyGraphContext ctx;
110+
111+
// Coke's dag don't support timed wait, use coke::Future.
112+
coke::Future<void> fut = coke::create_future(dag->run(ctx));
113+
int fut_ret = co_await fut.wait_for(std::chrono::milliseconds(150));
114+
if (fut_ret != coke::FUTURE_STATE_READY)
115+
ctx.canceled.store(true);
116+
117+
// Wait dag->run finish, before ctx is destroyed.
118+
co_await fut.wait();
119+
}
120+
}
121+
122+
int main() {
123+
coke::sync_wait(use_dag());
124+
return 0;
125+
}

0 commit comments

Comments
 (0)