Skip to content

Commit a20f226

Browse files
tercelclaude
andcommitted
feat: implement top 5 improvements from AP+Flow evaluation
1. task.create_tree MCP tool — AI agents can create entire task trees in a single call (TaskCreateTreeModule in task_modules.py) 2. Distributed wired into CLI — apflow serve --cluster, apflow worker commands. DistributedRuntime initialized in create_app(cluster=True) 3. End-to-end example — examples/quickstart.py shows full workflow: register executors → create tree → execute → get results 4. DAG support documented — README explains parent_id (tree) vs dependencies (fan-in DAG pattern) with examples 5. Plugin discovery via entry points — [project.entry-points."apflow.executors"] allows third-party packages to register executors without import ordering Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
1 parent 8ee5040 commit a20f226

File tree

8 files changed

+286
-2
lines changed

8 files changed

+286
-2
lines changed

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ serve(app.registry, name="apflow")
6262
# Or from the command line
6363
apflow serve # A2A HTTP server
6464
apflow serve --explorer # With Explorer UI
65+
apflow serve --cluster # Distributed cluster mode
66+
apflow worker --db ... # Start worker node
6567
apflow mcp # MCP server (for Claude/Cursor)
6668
apflow info # Show registered modules
6769
```
@@ -70,14 +72,25 @@ apflow info # Show registered modules
7072

7173
### Task Orchestration
7274

73-
Dependency graph execution with priority scheduling, parallel execution, and result aggregation.
75+
Dependency graph execution with priority scheduling, parallel execution, and result aggregation. Supports both tree structure (`parent_id`) and DAG patterns (`dependencies` for fan-in).
7476

7577
```python
78+
# Tree: sequential pipeline
7679
tasks = [
7780
{"id": "fetch", "name": "Fetch Data", "priority": 1},
7881
{"id": "process", "name": "Process", "parent_id": "fetch", "priority": 2},
7982
{"id": "notify", "name": "Notify", "parent_id": "process", "priority": 3},
8083
]
84+
85+
# DAG: fan-in pattern (task depends on multiple predecessors)
86+
tasks = [
87+
{"id": "a", "name": "Step A", "priority": 1},
88+
{"id": "b", "name": "Step B", "priority": 1},
89+
{"id": "merge", "name": "Merge Results", "priority": 2,
90+
"parent_id": "a",
91+
"dependencies": [{"id": "a", "required": True}, {"id": "b", "required": True}]},
92+
]
93+
8194
tree = await task_creator.create_task_tree_from_array(tasks)
8295
await task_manager.distribute_task_tree(tree)
8396
```

examples/quickstart.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""
2+
apflow Quickstart — End-to-End Example
3+
4+
This example shows the complete workflow:
5+
1. Register custom executors
6+
2. Create a task tree
7+
3. Execute it
8+
4. Get results
9+
10+
Run: python examples/quickstart.py
11+
"""
12+
13+
import asyncio
14+
from apflow.adapters.function_executor import function_executor
15+
from apflow.app import create_app
16+
17+
18+
# Step 1: Register custom executors using @function_executor
19+
@function_executor(
20+
id="fetch_price",
21+
description="Fetch the price of a product",
22+
input_schema={
23+
"type": "object",
24+
"properties": {"product": {"type": "string"}},
25+
"required": ["product"],
26+
},
27+
)
28+
async def fetch_price(inputs: dict) -> dict:
29+
"""Simulate fetching a product price."""
30+
prices = {"widget": 9.99, "gadget": 19.99, "doohickey": 4.99}
31+
product = inputs["product"]
32+
return {"product": product, "price": prices.get(product, 0.0)}
33+
34+
35+
@function_executor(
36+
id="calculate_total",
37+
description="Calculate total from fetched prices",
38+
)
39+
async def calculate_total(inputs: dict) -> dict:
40+
"""Aggregate results from dependency tasks."""
41+
total = sum(
42+
v.get("price", 0) for k, v in inputs.items() if isinstance(v, dict) and "price" in v
43+
)
44+
return {"total": round(total, 2), "item_count": len(inputs)}
45+
46+
47+
async def main():
48+
# Step 2: Create the app (bootstraps session, TaskManager, Registry)
49+
app = create_app(connection_string="sqlite:///:memory:")
50+
51+
print(f"Registered modules: {len(list(app.registry.list()))}")
52+
for m in sorted(app.registry.list()):
53+
print(f" {m}")
54+
55+
# Step 3: Create a task tree
56+
# fetch_widget ──┐
57+
# ├──→ calculate_total
58+
# fetch_gadget ──┘
59+
tasks = [
60+
{
61+
"id": "fetch_widget",
62+
"name": "Fetch Widget Price",
63+
"priority": 1,
64+
"inputs": {"product": "widget"},
65+
"params": {"executor_id": "fetch_price"},
66+
},
67+
{
68+
"id": "fetch_gadget",
69+
"name": "Fetch Gadget Price",
70+
"priority": 1,
71+
"inputs": {"product": "gadget"},
72+
"params": {"executor_id": "fetch_price"},
73+
},
74+
{
75+
"id": "total",
76+
"name": "Calculate Total",
77+
"priority": 2,
78+
"parent_id": "fetch_widget",
79+
"dependencies": [
80+
{"id": "fetch_widget", "required": True},
81+
{"id": "fetch_gadget", "required": True},
82+
],
83+
"params": {"executor_id": "calculate_total"},
84+
},
85+
]
86+
87+
tree = await app.task_creator.create_task_tree_from_array(tasks)
88+
print(f"\nTask tree created: root={tree.task.id}")
89+
90+
# Step 4: Execute
91+
await app.task_manager.distribute_task_tree(tree)
92+
93+
# Step 5: Get results
94+
for task_id in ["fetch_widget", "fetch_gadget", "total"]:
95+
task = await app.task_repository.get_task_by_id(task_id)
96+
print(f" {task.name}: status={task.status}, result={task.result}")
97+
98+
99+
if __name__ == "__main__":
100+
asyncio.run(main())

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ dependencies = [
4343
[project.scripts]
4444
apflow = "apflow.cli:main"
4545

46+
[project.entry-points."apflow.executors"]
47+
# Third-party packages register executors here:
48+
# my_executor = "my_package.executors:register"
49+
4650
# Optional dependencies
4751
[project.optional-dependencies]
4852
# Documentation (MkDocs with Material theme)

src/apflow/app.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
def create_app(
4646
connection_string: Optional[str] = None,
4747
namespace: str = "apflow",
48+
cluster: bool = False,
4849
) -> ApflowApp:
4950
"""Create and initialize the full apflow application stack.
5051
@@ -114,6 +115,18 @@ def create_app(
114115
)
115116
logger.info(f"apcore Registry populated ({len(list(registry.list()))} modules)")
116117

118+
# Start distributed runtime if cluster mode
119+
if cluster:
120+
try:
121+
from apflow.core.distributed.config import DistributedConfig
122+
from apflow.core.distributed.runtime import DistributedRuntime
123+
124+
dist_config = DistributedConfig.from_env()
125+
DistributedRuntime(dist_config) # Starts background tasks
126+
logger.info(f"Distributed runtime enabled (node: {dist_config.node_id})")
127+
except Exception as e:
128+
logger.warning(f"Distributed runtime failed to initialize: {e}")
129+
117130
return ApflowApp(
118131
session=session,
119132
task_manager=task_manager,

src/apflow/bridge/registry_setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
TaskCreateModule,
2525
TaskDeleteModule,
2626
TaskExecuteModule,
27+
TaskCreateTreeModule,
2728
TaskGetModule,
2829
TaskListModule,
2930
)
@@ -76,6 +77,7 @@ def create_apflow_registry(
7677
# 2. Register task management modules
7778
task_modules = {
7879
"task.create": TaskCreateModule(task_creator, task_repository),
80+
"task.create_tree": TaskCreateTreeModule(task_creator, task_repository),
7981
"task.execute": TaskExecuteModule(task_manager),
8082
"task.list": TaskListModule(task_repository),
8183
"task.get": TaskGetModule(task_repository),

src/apflow/bridge/scanner_bridge.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,28 @@ def discover_executor_modules() -> list[ExecutableTaskModuleAdapter]:
4949
except ImportError:
5050
pass
5151

52+
# Discover executors from entry points (third-party packages)
53+
try:
54+
from importlib.metadata import entry_points
55+
56+
eps = entry_points(group="apflow.executors")
57+
seen_ids = {a.executor_id for a in adapters}
58+
for ep in eps:
59+
try:
60+
register_fn = ep.load()
61+
register_fn() # Expected to call @function_executor or @executor_register
62+
# Re-scan function executors after registration
63+
for eid, ecls in get_function_executor_classes().items():
64+
if eid not in seen_ids:
65+
adapter = _create_adapter_from_class(eid, ecls)
66+
if adapter is not None:
67+
adapters.append(adapter)
68+
seen_ids.add(eid)
69+
except Exception as e:
70+
logger.warning(f"Failed to load executor entry point '{ep.name}': {e}")
71+
except Exception:
72+
pass
73+
5274
logger.info(f"Discovered {len(adapters)} executor modules for apcore registration")
5375
return adapters
5476

src/apflow/bridge/task_modules.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,98 @@ async def execute(self, inputs: dict[str, Any], context: Any = None) -> dict[str
244244

245245
await self._repo.delete_task(task_id)
246246
return {"task_id": task_id, "deleted": True}
247+
248+
249+
_TASK_CREATE_TREE_INPUT = {
250+
"type": "object",
251+
"properties": {
252+
"tasks": {
253+
"type": "array",
254+
"minItems": 1,
255+
"description": "Array of task definitions. Use parent_id to build tree, dependencies for DAG ordering.",
256+
"items": {
257+
"type": "object",
258+
"properties": {
259+
"id": {"type": "string", "description": "Optional task ID (auto-generated if omitted)"},
260+
"name": {"type": "string", "minLength": 1, "description": "Task name (required)"},
261+
"parent_id": {"type": "string", "description": "Parent task ID for tree structure"},
262+
"priority": {
263+
"type": "integer",
264+
"minimum": 0,
265+
"maximum": 3,
266+
"default": 2,
267+
"description": "0=urgent, 1=high, 2=normal, 3=low",
268+
},
269+
"inputs": {"type": "object", "description": "Task input parameters"},
270+
"params": {"type": "object", "description": "Executor init parameters"},
271+
"dependencies": {
272+
"type": "array",
273+
"items": {"type": "object"},
274+
"description": "Task dependencies for DAG ordering [{id: 'task_id', required: true}]",
275+
},
276+
"token_budget": {"type": "integer", "minimum": 0},
277+
"cost_policy": {"type": "string"},
278+
"max_attempts": {"type": "integer", "minimum": 1, "maximum": 100, "default": 3},
279+
},
280+
"required": ["name"],
281+
},
282+
},
283+
},
284+
"required": ["tasks"],
285+
}
286+
287+
_TASK_CREATE_TREE_OUTPUT = {
288+
"type": "object",
289+
"properties": {
290+
"root_task_id": {"type": "string", "description": "ID of the first root task"},
291+
"task_count": {"type": "integer", "description": "Total tasks created"},
292+
"task_ids": {
293+
"type": "array",
294+
"items": {"type": "string"},
295+
"description": "All created task IDs",
296+
},
297+
},
298+
}
299+
300+
301+
class TaskCreateTreeModule:
302+
"""Create a complete task tree from an array of task definitions in one call."""
303+
304+
description = (
305+
"Create a multi-step task workflow from an array of task definitions. "
306+
"Use parent_id for tree structure and dependencies for execution ordering. "
307+
"Tasks without parent_id are root tasks. Multiple roots are allowed."
308+
)
309+
310+
def __init__(self, task_creator: Any, task_repository: Any) -> None:
311+
self._creator = task_creator
312+
self._repo = task_repository
313+
self.input_schema = _make_schema(_TASK_CREATE_TREE_INPUT)
314+
self.output_schema = _make_schema(_TASK_CREATE_TREE_OUTPUT)
315+
316+
async def execute(self, inputs: dict[str, Any], context: Any = None) -> dict[str, Any]:
317+
tasks = inputs.get("tasks", [])
318+
if not tasks:
319+
raise ValueError("tasks array must be non-empty")
320+
321+
for t in tasks:
322+
if not t.get("name"):
323+
raise ValueError("Each task must have a non-empty 'name'")
324+
325+
tree = await self._creator.create_task_tree_from_array(tasks)
326+
327+
# Collect all task IDs from the tree
328+
task_ids: list[str] = []
329+
330+
def _collect_ids(node: Any) -> None:
331+
task_ids.append(node.task.id)
332+
for child in node.children:
333+
_collect_ids(child)
334+
335+
_collect_ids(tree)
336+
337+
return {
338+
"root_task_id": tree.task.id,
339+
"task_count": len(task_ids),
340+
"task_ids": task_ids,
341+
}

src/apflow/cli.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def cli() -> None:
3232
@click.option("--metrics", is_flag=True, help="Enable /metrics endpoint")
3333
@click.option("--cors", default=None, help="CORS origins (comma-separated)")
3434
@click.option("--db", default=None, help="Database connection string")
35+
@click.option(
36+
"--cluster", is_flag=True, help="Enable distributed cluster mode (requires PostgreSQL)"
37+
)
3538
@click.option("--log-level", default=None, help="Log level (DEBUG/INFO/WARNING/ERROR)")
3639
def serve(
3740
host: str,
@@ -41,12 +44,13 @@ def serve(
4144
metrics: bool,
4245
cors: Optional[str],
4346
db: Optional[str],
47+
cluster: bool,
4448
log_level: Optional[str],
4549
) -> None:
4650
"""Start A2A HTTP server (internal network service)."""
4751
from apflow.app import create_app
4852

49-
app = create_app(connection_string=db)
53+
app = create_app(connection_string=db, cluster=cluster)
5054

5155
cors_origins = [s.strip() for s in cors.split(",")] if cors else None
5256

@@ -148,6 +152,37 @@ def info() -> None:
148152
click.echo(f"Registry: error ({e})")
149153

150154

155+
@cli.command()
156+
@click.option("--db", required=True, help="PostgreSQL connection string (required for cluster)")
157+
@click.option("--node-id", default=None, help="Worker node ID (auto-generated if omitted)")
158+
@click.option("--log-level", default=None, help="Log level")
159+
def worker(db: str, node_id: Optional[str], log_level: Optional[str]) -> None:
160+
"""Start a distributed worker node (requires PostgreSQL)."""
161+
import asyncio
162+
163+
click.echo(f"Starting worker node: {node_id or 'auto'}")
164+
165+
from apflow.app import create_app
166+
167+
create_app(connection_string=db, cluster=True)
168+
169+
try:
170+
from apflow.core.distributed.config import DistributedConfig
171+
from apflow.core.distributed.worker import WorkerRuntime
172+
173+
config = DistributedConfig.from_env()
174+
if node_id:
175+
config.node_id = node_id
176+
177+
worker_rt = WorkerRuntime(config)
178+
click.echo(f"Worker {config.node_id} running (Ctrl+C to stop)")
179+
asyncio.run(worker_rt.start())
180+
except ImportError:
181+
click.echo("Error: distributed module not available", err=True)
182+
except KeyboardInterrupt:
183+
click.echo("Worker stopped")
184+
185+
151186
def main() -> None:
152187
"""Entry point for apflow CLI."""
153188
cli()

0 commit comments

Comments
 (0)