-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathtasks.py
More file actions
77 lines (70 loc) · 2.84 KB
/
tasks.py
File metadata and controls
77 lines (70 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from __future__ import annotations
from agentex import AsyncAgentex
from agentex.types.task import Task
from agentex.types.shared import DeleteResponse
from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.temporal import heartbeat_if_in_workflow
from agentex.lib.core.tracing.tracer import AsyncTracer
logger = make_logger(__name__)
class TasksService:
def __init__(
self,
agentex_client: AsyncAgentex,
tracer: AsyncTracer,
):
self._agentex_client = agentex_client
self._tracer = tracer
async def get_task(
self,
task_id: str | None = None,
task_name: str | None = None,
trace_id: str | None = None,
parent_span_id: str | None = None,
) -> Task:
trace = self._tracer.trace(trace_id)
async with trace.span(
parent_id=parent_span_id,
name="get_task",
input={"task_id": task_id, "task_name": task_name},
) as span:
heartbeat_if_in_workflow("get task")
if not task_id and not task_name:
raise ValueError("Either task_id or task_name must be provided.")
if task_id:
task_model = await self._agentex_client.tasks.retrieve(task_id=task_id)
elif task_name:
task_model = await self._agentex_client.tasks.retrieve_by_name(task_name=task_name)
else:
raise ValueError("Either task_id or task_name must be provided.")
if span:
span.output = task_model.model_dump()
return task_model
async def delete_task(
self,
task_id: str | None = None,
task_name: str | None = None,
trace_id: str | None = None,
parent_span_id: str | None = None,
) -> Task | DeleteResponse:
trace = self._tracer.trace(trace_id) if self._tracer else None
if trace is None:
# Handle case without tracing
response = await self._agentex_client.tasks.delete(task_id)
return Task(**response.model_dump())
async with trace.span(
parent_id=parent_span_id,
name="delete_task",
input={"task_id": task_id, "task_name": task_name},
) as span:
heartbeat_if_in_workflow("delete task")
if not task_id and not task_name:
raise ValueError("Either task_id or task_name must be provided.")
if task_id:
task_model = await self._agentex_client.tasks.delete(task_id=task_id)
elif task_name:
task_model = await self._agentex_client.tasks.delete_by_name(task_name=task_name)
else:
raise ValueError("Either task_id or task_name must be provided.")
if span:
span.output = task_model.model_dump()
return task_model