Skip to content

Commit 8afc395

Browse files
authoredDec 23, 2024··
added ConversationPersistor() to document events/transcriptions in external file (#1209)
1 parent 12047cd commit 8afc395

File tree

1 file changed

+213
-0
lines changed

1 file changed

+213
-0
lines changed
 

‎examples/conversation_persistor.py

+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
import asyncio
2+
import logging
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Union
6+
7+
import aiofiles
8+
from dotenv import load_dotenv
9+
from livekit.agents import (
10+
AutoSubscribe,
11+
JobContext,
12+
WorkerOptions,
13+
cli,
14+
multimodal,
15+
utils,
16+
)
17+
from livekit.agents.llm import ChatMessage
18+
from livekit.agents.multimodal.multimodal_agent import EventTypes
19+
from livekit.plugins import openai
20+
21+
22+
@dataclass
23+
class EventLog:
24+
eventname: str | None
25+
"""name of recorded event"""
26+
time: str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
27+
"""time the event is recorded"""
28+
29+
30+
@dataclass
31+
class TranscriptionLog:
32+
role: str | None
33+
"""role of the speaker"""
34+
transcription: str | None
35+
"""transcription of speech"""
36+
time: str = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
37+
"""time the event is recorded"""
38+
39+
40+
class ConversationPersistor(utils.EventEmitter[EventTypes]):
41+
def __init__(
42+
self,
43+
*,
44+
model: multimodal.MultimodalAgent | None,
45+
log: str | None,
46+
transcriptions_only: bool = False,
47+
):
48+
"""
49+
Initializes a ConversationPersistor instance which records the events and transcriptions of a MultimodalAgent.
50+
51+
Args:
52+
model (multimodal.MultimodalAgent): an instance of a MultiModalAgent
53+
log (str): name of the external file to record events in
54+
transcriptions_only (bool): a boolean variable to determine if only transcriptions will be recorded, False by default
55+
user_transcriptions (arr): list of user transcriptions
56+
agent_transcriptions (arr): list of agent transcriptions
57+
events (arr): list of all events
58+
log_q (asyncio.Queue): a queue of EventLog and TranscriptionLog
59+
60+
"""
61+
super().__init__()
62+
63+
self._model = model
64+
self._log = log
65+
self._transcriptions_only = transcriptions_only
66+
67+
self._user_transcriptions = []
68+
self._agent_transcriptions = []
69+
self._events = []
70+
71+
self._log_q = asyncio.Queue[Union[EventLog, TranscriptionLog, None]]()
72+
73+
@property
74+
def log(self) -> str | None:
75+
return self._log
76+
77+
@property
78+
def model(self) -> multimodal.MultimodalAgent | None:
79+
return self._model
80+
81+
@property
82+
def user_transcriptions(self) -> dict:
83+
return self._user_transcriptions
84+
85+
@property
86+
def agent_transcriptions(self) -> dict:
87+
return self._agent_transcriptions
88+
89+
@property
90+
def events(self) -> dict:
91+
return self._events
92+
93+
@log.setter
94+
def log(self, newlog: str | None) -> None:
95+
self._log = newlog
96+
97+
async def _main_atask(self) -> None:
98+
# Writes to file asynchronously
99+
while True:
100+
log = await self._log_q.get()
101+
102+
if log is None:
103+
break
104+
105+
async with aiofiles.open(self._log, "a") as file:
106+
if type(log) is EventLog and not self._transcriptions_only:
107+
self._events.append(log)
108+
await file.write("\n" + log.time + " " + log.eventname)
109+
110+
if type(log) is TranscriptionLog:
111+
if log.role == "user":
112+
self._user_transcriptions.append(log)
113+
else:
114+
self._agent_transcriptions.append(log)
115+
116+
await file.write(
117+
"\n" + log.time + " " + log.role + " " + log.transcription
118+
)
119+
120+
async def aclose(self) -> None:
121+
# Exits
122+
self._log_q.put_nowait(None)
123+
await self._main_task
124+
125+
def start(self) -> None:
126+
# Listens for emitted MultimodalAgent events
127+
self._main_task = asyncio.create_task(self._main_atask())
128+
129+
@self._model.on("user_started_speaking")
130+
def _user_started_speaking():
131+
event = EventLog(eventname="user_started_speaking")
132+
self._log_q.put_nowait(event)
133+
134+
@self._model.on("user_stopped_speaking")
135+
def _user_stopped_speaking():
136+
event = EventLog(eventname="user_stopped_speaking")
137+
self._log_q.put_nowait(event)
138+
139+
@self._model.on("agent_started_speaking")
140+
def _agent_started_speaking():
141+
event = EventLog(eventname="agent_started_speaking")
142+
self._log_q.put_nowait(event)
143+
144+
@self._model.on("agent_stopped_speaking")
145+
def _agent_stopped_speaking():
146+
transcription = TranscriptionLog(
147+
role="agent",
148+
transcription=(self._model._playing_handle._tr_fwd.played_text)[1:],
149+
)
150+
self._log_q.put_nowait(transcription)
151+
152+
event = EventLog(eventname="agent_stopped_speaking")
153+
self._log_q.put_nowait(event)
154+
155+
@self._model.on("user_speech_committed")
156+
def _user_speech_committed(user_msg: ChatMessage):
157+
transcription = TranscriptionLog(
158+
role="user", transcription=user_msg.content
159+
)
160+
self._log_q.put_nowait(transcription)
161+
162+
event = EventLog(eventname="user_speech_committed")
163+
self._log_q.put_nowait(event)
164+
165+
@self._model.on("agent_speech_committed")
166+
def _agent_speech_committed():
167+
event = EventLog(eventname="agent_speech_committed")
168+
self._log_q.put_nowait(event)
169+
170+
@self._model.on("agent_speech_interrupted")
171+
def _agent_speech_interrupted():
172+
event = EventLog(eventname="agent_speech_interrupted")
173+
self._log_q.put_nowait(event)
174+
175+
@self._model.on("function_calls_collected")
176+
def _function_calls_collected():
177+
event = EventLog(eventname="function_calls_collected")
178+
self._log_q.put_nowait(event)
179+
180+
@self._model.on("function_calls_finished")
181+
def _function_calls_finished():
182+
event = EventLog(eventname="function_calls_finished")
183+
self._log_q.put_nowait(event)
184+
185+
186+
load_dotenv()
187+
188+
logger = logging.getLogger("my-worker")
189+
logger.setLevel(logging.INFO)
190+
191+
192+
async def entrypoint(ctx: JobContext):
193+
agent = multimodal.MultimodalAgent(
194+
model=openai.realtime.RealtimeModel(
195+
voice="alloy",
196+
temperature=0.8,
197+
instructions="You are a helpful assistant.",
198+
turn_detection=openai.realtime.ServerVadOptions(
199+
threshold=0.6, prefix_padding_ms=200, silence_duration_ms=500
200+
),
201+
),
202+
)
203+
204+
cp = ConversationPersistor(model=agent, log="log.txt")
205+
cp.start()
206+
207+
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
208+
participant = await ctx.wait_for_participant()
209+
agent.start(ctx.room, participant)
210+
211+
212+
if __name__ == "__main__":
213+
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

0 commit comments

Comments
 (0)
Please sign in to comment.