-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
93 lines (69 loc) · 2.75 KB
/
agent.py
File metadata and controls
93 lines (69 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Task executor agent - receives and executes tasks from other agents via A2A.
Listens for intents via SSE. Simulates data enrichment work
(geo lookup on records) and resumes with execution results.
AXME handles retries if this agent crashes mid-task.
Usage:
export AXME_API_KEY="<agent-key>"
python agent.py
"""
import os
import sys
import time
sys.stdout.reconfigure(line_buffering=True)
from axme import AxmeClient, AxmeClientConfig
AGENT_ADDRESS = "a2a-executor-demo"
def handle_intent(client, intent_id):
"""Execute a task dispatched from another agent."""
intent_data = client.get_intent(intent_id)
intent = intent_data.get("intent", intent_data)
payload = intent.get("payload", {})
if "parent_payload" in payload:
payload = payload["parent_payload"]
task_id = payload.get("task_id", "unknown")
task_type = payload.get("task_type", "unknown")
source_agent = payload.get("source_agent", "unknown")
input_data = payload.get("input_data", {})
records = input_data.get("records", 0)
enrichment = input_data.get("enrichment", "unknown")
print(f" Receiving task from dispatcher ({source_agent})...")
print(f" Task: {task_id} ({task_type})")
time.sleep(1)
print(f" Enriching {records} records with geo data...")
time.sleep(2)
print(f" Enrichment complete.")
time.sleep(1)
result = {
"action": "complete",
"task_id": task_id,
"records_processed": records,
"enrichment_applied": enrichment,
"success_rate": 0.98,
"executed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
client.resume_intent(intent_id, result)
print(f" Task {task_id} completed. {records} records enriched ({enrichment}).")
print(f" Result sent back to dispatcher via AXME.")
def main():
api_key = os.environ.get("AXME_API_KEY", "")
if not api_key:
print("Error: AXME_API_KEY not set.")
print("Run the scenario first: axme scenarios apply scenario.json")
print("Then get the agent key from ~/.config/axme/scenario-agents.json")
sys.exit(1)
client = AxmeClient(AxmeClientConfig(api_key=api_key))
print(f"Agent listening on {AGENT_ADDRESS}...")
print("Waiting for intents (Ctrl+C to stop)\n")
for delivery in client.listen(AGENT_ADDRESS):
intent_id = delivery.get("intent_id", "")
status = delivery.get("status", "")
if not intent_id:
continue
if status in ("DELIVERED", "CREATED", "IN_PROGRESS"):
print(f"[{status}] Intent received: {intent_id}")
try:
handle_intent(client, intent_id)
except Exception as e:
print(f" Error processing intent: {e}")
if __name__ == "__main__":
main()