-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathagent.py
242 lines (205 loc) · 7.72 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
from __future__ import annotations
import asyncio
import logging
from dotenv import load_dotenv
import json
import os
from typing import Any
from livekit import rtc, api
from livekit.agents import (
AgentSession,
Agent,
JobContext,
function_tool,
RunContext,
get_job_context,
cli,
RoomInputOptions,
WorkerOptions,
)
from livekit.plugins import (
deepgram,
openai,
cartesia,
silero,
turn_detector,
noise_cancellation,
)
# load environment variables, this is optional, only used for local development
load_dotenv(dotenv_path=".env.local")
logger = logging.getLogger("outbound-caller")
logger.setLevel(logging.INFO)
outbound_trunk_id = os.getenv("SIP_OUTBOUND_TRUNK_ID")
class OutboundCaller(Agent):
def __init__(
self,
*,
name: str,
appointment_time: str,
dial_info: dict[str, Any],
):
super().__init__(
instructions=f"""
You are a scheduling assistant for a dental practice. Your interface with user will be voice.
You will be on a call with a patient who has an upcoming appointment. Your goal is to confirm the appointment details.
As a customer service representative, you will be polite and professional at all times. Allow user to end the conversation.
When the user would like to be transferred to a human agent, first confirm with them. upon confirmation, use the transfer_call tool.
The customer's name is {name}. His appointment is on {appointment_time}.
"""
)
# keep reference to the participant for transfers
self.participant: rtc.RemoteParticipant | None = None
self.dial_info = dial_info
def set_participant(self, participant: rtc.RemoteParticipant):
self.participant = participant
async def hangup(self):
"""Helper function to hang up the call by deleting the room"""
job_ctx = get_job_context()
await job_ctx.api.room.delete_room(
api.DeleteRoomRequest(
room=job_ctx.room.name,
)
)
@function_tool()
async def transfer_call(self, ctx: RunContext):
"""Transfer the call to a human agent, called after confirming with the user"""
transfer_to = self.dial_info["transfer_to"]
if not transfer_to:
return "cannot transfer call"
logger.info(f"transferring call to {transfer_to}")
# let the message play fully before transferring
await ctx.session.generate_reply(
instructions="let the user know you'll be transferring them"
)
job_ctx = get_job_context()
try:
await job_ctx.api.sip.transfer_sip_participant(
api.TransferSIPParticipantRequest(
room_name=job_ctx.room.name,
participant_identity=self.participant.identity,
transfer_to=f"tel:{transfer_to}",
)
)
logger.info(f"transferred call to {transfer_to}")
except Exception as e:
logger.error(f"error transferring call: {e}")
await ctx.session.generate_reply(
instructions="there was an error transferring the call."
)
await self.hangup()
@function_tool()
async def end_call(self, ctx: RunContext):
"""Called when the user wants to end the call"""
logger.info(f"ending the call for {self.participant.identity}")
# let the agent finish speaking
current_speech = ctx.session.current_speech
if current_speech:
await current_speech.done()
await self.hangup()
@function_tool()
async def look_up_availability(
self,
ctx: RunContext,
date: str,
):
"""Called when the user asks about alternative appointment availability
Args:
date: The date of the appointment to check availability for
"""
logger.info(
f"looking up availability for {self.participant.identity} on {date}"
)
await asyncio.sleep(3)
return {
"available_times": ["1pm", "2pm", "3pm"],
}
@function_tool()
async def confirm_appointment(
self,
ctx: RunContext,
date: str,
time: str,
):
"""Called when the user confirms their appointment on a specific date.
Use this tool only when they are certain about the date and time.
Args:
date: The date of the appointment
time: The time of the appointment
"""
logger.info(
f"confirming appointment for {self.participant.identity} on {date} at {time}"
)
return "reservation confirmed"
@function_tool()
async def detected_answering_machine(self, ctx: RunContext):
"""Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
logger.info(f"detected answering machine for {self.participant.identity}")
await self.hangup()
async def entrypoint(ctx: JobContext):
global _default_instructions, outbound_trunk_id
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect()
# when dispatching the agent, we'll pass it the approriate info to dial the user
# dial_info is a dict with the following keys:
# - phone_number: the phone number to dial
# - transfer_to: the phone number to transfer the call to when requested
dial_info = json.loads(ctx.job.metadata)
# look up the user's phone number and appointment details
agent = OutboundCaller(
name="Jayden",
appointment_time="next Tuesday at 3pm",
dial_info=dial_info,
)
# the following uses GPT-4o, Deepgram and Cartesia
session = AgentSession(
turn_detection=turn_detector.EOUModel(),
vad=silero.VAD.load(),
stt=deepgram.STT(),
# you can also use OpenAI's TTS with openai.TTS()
tts=cartesia.TTS(),
llm=openai.LLM(model="gpt-4o"),
# you can also use a speech-to-speech model like OpenAI's Realtime API
# llm=openai.realtime.RealtimeModel()
)
# start the session first before dialing, to ensure that when the user picks up
# the agent does not miss anything the user says
# creating a task for this because session.start does not return until the participant is available
asyncio.create_task(
session.start(
agent=agent,
room=ctx.room,
room_input_options=RoomInputOptions(
# enable Krisp background voice and noise removal
noise_cancellation=noise_cancellation.BVC(),
),
)
)
# `create_sip_participant` starts dialing the user
try:
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=outbound_trunk_id,
sip_call_to=dial_info["phone_number"],
participant_identity="phone_user",
# function blocks until user answers the call, or if the call fails
wait_until_answered=True,
)
)
# a participant phone user is now available
participant = await ctx.wait_for_participant(identity="phone_user")
agent.set_participant(participant)
except api.TwirpError as e:
logger.error(
f"error creating SIP participant: {e.message}, "
f"SIP status: {e.metadata.get('sip_status_code')} "
f"{e.metadata.get('sip_status')}"
)
ctx.shutdown()
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="outbound-caller",
)
)