-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstash.diff
More file actions
388 lines (369 loc) · 15 KB
/
stash.diff
File metadata and controls
388 lines (369 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
diff --git a/app/agents/resolution/agent.py b/app/agents/resolution/agent.py
index b225ac9..a445ed5 100644
--- a/app/agents/resolution/agent.py
+++ b/app/agents/resolution/agent.py
@@ -59,6 +59,22 @@ class ResolutionAgent:
) -> AgentTurnResult:
return self._invoke(borrower_id, message or "", borrower_case, chat_history or [], instruction)
+ def analyze_completed_voice_call(
+ self,
+ borrower_id: str,
+ borrower_case: BorrowerCase,
+ transcript: str,
+ chat_history: list[ChatMessage] | None = None,
+ ) -> AgentTurnResult:
+ instruction = (
+ "Analyze this completed Retell voice call transcript for the resolution stage. "
+ "The live call is over, so do not continue the conversation. "
+ "Return DEAL_AGREED only if the borrower clearly accepted a specific payment or settlement commitment. "
+ "Otherwise return NO_DEAL. Do not return CONTINUE. "
+ "If you return NO_DEAL, latest_handoff_summary must let Agent 3 continue naturally without restarting."
+ )
+ return self._invoke(borrower_id, transcript, borrower_case, chat_history or [], instruction)
+
def _invoke(
self,
borrower_id: str,
diff --git a/app/domain/borrower_case.py b/app/domain/borrower_case.py
index 2d86935..fbd8f83 100644
--- a/app/domain/borrower_case.py
+++ b/app/domain/borrower_case.py
@@ -84,6 +84,8 @@ class BorrowerCase(BaseModel):
assessment_notes: Optional[str] = None
resolution_notes: Optional[str] = None
final_notice_notes: Optional[str] = None
+ resolution_call_id: Optional[str] = None
+ resolution_call_status: Optional[str] = None
latest_handoff_summary: Optional[str] = None
latest_handoff_stage: Optional[Stage] = None
final_disposition: Optional[str] = None
diff --git a/app/main.py b/app/main.py
index c227b0e..f8db9ac 100644
--- a/app/main.py
+++ b/app/main.py
@@ -6,6 +6,8 @@ from app.api.borrower_profile import router as borrower_profile_router
from app.api.lender_policy import router as lender_policy_router
from app.api.lender_profile import router as lender_profile_router
from app.api.loan import router as loan_router
+from app.api.retell_functions import router as retell_functions_router
+from app.api.retell_webhooks import router as retell_webhooks_router
from app.api.workflows import router as workflows_router
load_env_file()
@@ -16,4 +18,6 @@ app.include_router(borrower_profile_router)
app.include_router(lender_policy_router)
app.include_router(lender_profile_router)
app.include_router(loan_router)
+app.include_router(retell_functions_router)
+app.include_router(retell_webhooks_router)
app.include_router(workflows_router)
diff --git a/app/orchestrator/activities.py b/app/orchestrator/activities.py
index ccd6d83..6a5ea94 100644
--- a/app/orchestrator/activities.py
+++ b/app/orchestrator/activities.py
@@ -1,23 +1,31 @@
from __future__ import annotations
+from typing import Any
+
from temporalio import activity
from app.agents.assessment.agent import AssessmentAgent
from app.agents.final_notice.agent import FinalNoticeAgent
from app.agents.resolution.agent import ResolutionAgent
-from app.domain.borrower_case import BorrowerCase, ContactChannel, Stage
+from app.domain.borrower_case import AgentStageOutcome, AgentTurnResult, BorrowerCase, ContactChannel, Stage
from app.services.borrower_case import FileBorrowerCaseService
from app.services.borrower_case_state import BorrowerCaseStateService
+from app.services.borrower_profile import FileBorrowerProfileService
from app.services.chat_message import get_chat_message_service
+from app.services.retell import RetellService
from app.orchestrator.models import (
AgentTurnActivityInput,
AgentTurnActivityResult,
+ ResolutionCallActivityInput,
+ StartResolutionCallResult,
)
borrower_case_service = FileBorrowerCaseService()
borrower_case_state_service = BorrowerCaseStateService()
+borrower_profile_service = FileBorrowerProfileService()
chat_message_service = get_chat_message_service()
+retell_service = RetellService()
def _load_case(borrower_id: str) -> BorrowerCase:
@@ -52,6 +60,62 @@ def _ensure_handoff_message(borrower_case: BorrowerCase, stage: Stage) -> None:
)
+def _normalize_turn_text(turn: dict[str, Any]) -> str:
+ return str(
+ turn.get("content")
+ or turn.get("message")
+ or turn.get("transcript")
+ or turn.get("utterance")
+ or ""
+ ).strip()
+
+
+def _normalize_turn_role(turn: dict[str, Any]) -> str:
+ return str(
+ turn.get("role")
+ or turn.get("speaker")
+ or turn.get("name")
+ or turn.get("participant")
+ or ""
+ ).strip().lower()
+
+
+def _extract_transcript_turns(call: dict[str, Any]) -> list[tuple[str, str]]:
+ raw_turns = call.get("transcript_object")
+ if not isinstance(raw_turns, list) or not raw_turns:
+ raw_turns = call.get("transcript_with_tool_calls")
+ if not isinstance(raw_turns, list):
+ raw_turns = []
+
+ turns: list[tuple[str, str]] = []
+ for item in raw_turns:
+ if not isinstance(item, dict):
+ continue
+ text = _normalize_turn_text(item)
+ if not text:
+ continue
+ role = _normalize_turn_role(item)
+ if role in {"agent", "assistant", "ai"}:
+ sender_type = "agent"
+ elif role in {"user", "caller", "borrower", "customer"}:
+ sender_type = "borrower"
+ else:
+ sender_type = "agent" if len(turns) % 2 else "borrower"
+ turns.append((sender_type, text))
+
+ if turns:
+ return turns
+
+ transcript = str(call.get("transcript") or "").strip()
+ if transcript:
+ return [("borrower", transcript)]
+ return []
+
+
+def _transcript_as_text(turns: list[tuple[str, str]]) -> str:
+ return "\n".join(f"{sender_type.upper()}: {text}" for sender_type, text in turns)
+
+
@activity.defn
def load_borrower_case(borrower_id: str) -> BorrowerCase:
return _load_case(borrower_id)
@@ -143,3 +207,69 @@ def run_final_notice_turn(input: AgentTurnActivityInput) -> AgentTurnActivityRes
borrower_case=updated_case,
stage_result=result,
)
+
+
+@activity.defn
+def start_resolution_call(borrower_case: BorrowerCase) -> StartResolutionCallResult:
+ _ensure_handoff_message(borrower_case, Stage.RESOLUTION)
+ borrower_profile = borrower_profile_service.get_borrower_profile(borrower_case.borrower_id)
+ if borrower_profile is None:
+ raise ValueError(f"Borrower profile not found for {borrower_case.borrower_id}")
+
+ call = retell_service.place_phone_call(
+ borrower_case=borrower_case,
+ borrower_profile=borrower_profile,
+ handoff_summary=borrower_case.latest_handoff_summary,
+ )
+ call_id = str(call.get("call_id") or "")
+ if not call_id:
+ raise ValueError("Retell did not return call_id")
+
+ return StartResolutionCallResult(
+ call_id=call_id,
+ call_status=str(call.get("call_status") or ""),
+ )
+
+
+@activity.defn
+def finalize_resolution_call(input: ResolutionCallActivityInput) -> AgentTurnActivityResult:
+ borrower_case = input.borrower_case
+ call = input.call
+ chat_history = _list_stage_messages(borrower_case.borrower_id, Stage.RESOLUTION)
+ transcript_turns = _extract_transcript_turns(call)
+ transcript_text = _transcript_as_text(transcript_turns)
+
+ if transcript_text:
+ agent = ResolutionAgent(lender_id=borrower_case.lender_id)
+ result = agent.analyze_completed_voice_call(
+ borrower_id=borrower_case.borrower_id,
+ borrower_case=borrower_case,
+ transcript=transcript_text,
+ chat_history=chat_history,
+ )
+ else:
+ result = AgentTurnResult(
+ reply="",
+ stage_outcome=AgentStageOutcome.NO_DEAL,
+ case_delta={},
+ latest_handoff_summary="Agent 2 attempted a voice call but no usable conversation transcript was captured. Continue by chat without restarting known account context.",
+ )
+
+ updated_case = borrower_case_state_service.apply_delta(
+ borrower_case=borrower_case,
+ case_delta=result.case_delta,
+ stage=Stage.RESOLUTION,
+ latest_handoff_summary=result.latest_handoff_summary,
+ )
+ updated_case.stage = Stage.RESOLUTION
+ updated_case.last_contact_channel = ContactChannel.VOICE
+ updated_case.resolution_call_id = str(call.get("call_id") or borrower_case.resolution_call_id or "")
+ updated_case.resolution_call_status = str(call.get("call_status") or "ended")
+
+ for sender_type, message in transcript_turns:
+ _append_message(borrower_case.borrower_id, Stage.RESOLUTION, sender_type, message)
+
+ return AgentTurnActivityResult(
+ borrower_case=updated_case,
+ stage_result=result,
+ )
diff --git a/app/orchestrator/models.py b/app/orchestrator/models.py
index b9fa25e..a9a45c1 100644
--- a/app/orchestrator/models.py
+++ b/app/orchestrator/models.py
@@ -1,5 +1,7 @@
from __future__ import annotations
+from typing import Any
+
from pydantic import BaseModel
from app.domain.borrower_case import AgentTurnResult, BorrowerCase, Stage
@@ -29,3 +31,17 @@ class AgentTurnActivityInput(BaseModel):
class AgentTurnActivityResult(BaseModel):
borrower_case: BorrowerCase
stage_result: AgentTurnResult
+
+
+class StartResolutionCallResult(BaseModel):
+ call_id: str
+ call_status: str | None = None
+
+
+class ResolutionCallActivityInput(BaseModel):
+ borrower_case: BorrowerCase
+ call: dict[str, Any]
+
+
+class ResolutionVoiceCallCompletedInput(BaseModel):
+ call: dict[str, Any]
diff --git a/app/orchestrator/workflows.py b/app/orchestrator/workflows.py
index 759c990..452a79e 100644
--- a/app/orchestrator/workflows.py
+++ b/app/orchestrator/workflows.py
@@ -8,16 +8,19 @@ from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from app.domain.borrower_case import AgentStageOutcome, CaseStatus, Stage
from app.orchestrator.activities import (
+ finalize_resolution_call,
load_borrower_case,
run_assessment_turn,
run_final_notice_turn,
- run_resolution_turn,
save_borrower_case,
+ start_resolution_call,
)
from app.orchestrator.models import (
AgentTurnActivityInput,
CollectionsWorkflowInput,
CollectionsWorkflowState,
+ ResolutionCallActivityInput,
+ ResolutionVoiceCallCompletedInput,
)
@@ -60,24 +63,13 @@ class BorrowerCollectionsWorkflow:
if turn_result.stage_result.stage_outcome == AgentStageOutcome.ASSESSMENT_COMPLETE:
self.state.borrower_case.stage = Stage.RESOLUTION
self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
+ call_result = await self._activity(start_resolution_call, self.state.borrower_case)
+ self.state.borrower_case.resolution_call_id = call_result.call_id
+ self.state.borrower_case.resolution_call_status = call_result.call_status or "registered"
+ self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
return self.state
if self.state.borrower_case.stage == Stage.RESOLUTION:
- turn_result = await self._activity(
- run_resolution_turn,
- AgentTurnActivityInput(
- borrower_case=self.state.borrower_case,
- message=message,
- ),
- )
- self.state.borrower_case = turn_result.borrower_case
- self.state.last_agent_reply = turn_result.stage_result.reply
- self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
- if turn_result.stage_result.stage_outcome == AgentStageOutcome.DEAL_AGREED:
- await self._complete_workflow("AGREEMENT_LOGGED", CaseStatus.RESOLVED)
- elif turn_result.stage_result.stage_outcome == AgentStageOutcome.NO_DEAL:
- self.state.borrower_case.stage = Stage.FINAL_NOTICE
- self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
return self.state
turn_result = await self._activity(
@@ -100,6 +92,35 @@ class BorrowerCollectionsWorkflow:
def get_state(self) -> CollectionsWorkflowState | None:
return self.state
+ @workflow.update
+ async def handle_resolution_call_completed(
+ self,
+ input: ResolutionVoiceCallCompletedInput,
+ ) -> CollectionsWorkflowState:
+ if self.state is None:
+ await self._ensure_state(self.input)
+
+ assert self.state is not None
+ if self.state.borrower_case.stage != Stage.RESOLUTION:
+ return self.state
+
+ turn_result = await self._activity(
+ finalize_resolution_call,
+ ResolutionCallActivityInput(
+ borrower_case=self.state.borrower_case,
+ call=input.call,
+ ),
+ )
+ self.state.borrower_case = turn_result.borrower_case
+ self.state.last_agent_reply = turn_result.stage_result.reply
+ self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
+ if turn_result.stage_result.stage_outcome == AgentStageOutcome.DEAL_AGREED:
+ await self._complete_workflow("AGREEMENT_LOGGED", CaseStatus.RESOLVED)
+ elif turn_result.stage_result.stage_outcome == AgentStageOutcome.NO_DEAL:
+ self.state.borrower_case.stage = Stage.FINAL_NOTICE
+ self.state.borrower_case = await self._activity(save_borrower_case, self.state.borrower_case)
+ return self.state
+
async def _ensure_state(self, input: CollectionsWorkflowInput) -> None:
if self.state is not None:
return
diff --git a/requirements.txt b/requirements.txt
index 8b29e48..7e0c210 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,5 +2,6 @@ fastapi==0.115.12
langchain==0.2.17
langchain-openai==0.1.25
pydantic==2.11.3
+retell-sdk==4.55.0
temporalio==1.9.0
uvicorn==0.34.1
diff --git a/scripts/run_temporal_worker.py b/scripts/run_temporal_worker.py
index 33a256b..a9aa17f 100644
--- a/scripts/run_temporal_worker.py
+++ b/scripts/run_temporal_worker.py
@@ -15,11 +15,13 @@ if str(ROOT_DIR) not in sys.path:
from env_loader import load_env_file
from app.orchestrator.activities import (
+ finalize_resolution_call,
load_borrower_case,
run_assessment_turn,
run_final_notice_turn,
run_resolution_turn,
save_borrower_case,
+ start_resolution_call,
)
from app.orchestrator.workflows import BorrowerCollectionsWorkflow
@@ -43,6 +45,8 @@ async def main() -> None:
run_assessment_turn,
run_resolution_turn,
run_final_notice_turn,
+ start_resolution_call,
+ finalize_resolution_call,
],
activity_executor=activity_executor,
)