From 244e2fce86d5e6767e5dc710ab861b39486d97c0 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Fri, 3 Jan 2025 09:06:02 +0800 Subject: [PATCH] fix av sync example (#338) --- examples/video-stream/video_play.py | 80 ++++++++++++++++++------- livekit-rtc/livekit/rtc/synchronizer.py | 49 +++++++++++++-- 2 files changed, 101 insertions(+), 28 deletions(-) diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py index b15e1ccc..4fdbd21b 100644 --- a/examples/video-stream/video_play.py +++ b/examples/video-stream/video_play.py @@ -56,31 +56,38 @@ def __init__(self, media_file: Union[str, Path]) -> None: def info(self) -> MediaInfo: return self._info - async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: + async def stream_video(self) -> AsyncIterable[tuple[rtc.VideoFrame, float]]: """Streams video frames from the media file in an endless loop.""" - for av_frame in self._video_container.decode(video=0): + for i, av_frame in enumerate(self._video_container.decode(video=0)): # Convert video frame to RGBA frame = av_frame.to_rgb().to_ndarray() frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8) frame_rgba[:, :, :3] = frame - yield rtc.VideoFrame( - width=frame.shape[1], - height=frame.shape[0], - type=rtc.VideoBufferType.RGBA, - data=frame_rgba.tobytes(), + yield ( + rtc.VideoFrame( + width=frame.shape[1], + height=frame.shape[0], + type=rtc.VideoBufferType.RGBA, + data=frame_rgba.tobytes(), + ), + av_frame.time, ) - async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: + async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]: """Streams audio frames from the media file in an endless loop.""" for av_frame in self._audio_container.decode(audio=0): # Convert audio frame to raw int16 samples frame = av_frame.to_ndarray().T # Transpose to (samples, channels) frame = (frame * 32768).astype(np.int16) - yield rtc.AudioFrame( - data=frame.tobytes(), - sample_rate=self.info.audio_sample_rate, - num_channels=frame.shape[1], - samples_per_channel=frame.shape[0], + duration = len(frame) / self.info.audio_sample_rate + yield ( + rtc.AudioFrame( + data=frame.tobytes(), + sample_rate=self.info.audio_sample_rate, + num_channels=frame.shape[1], + samples_per_channel=frame.shape[0], + ), + av_frame.time + duration, ) def reset(self): @@ -102,6 +109,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str): api.VideoGrants( room_join=True, room=room_name, + agent=True, ) ) .to_jwt() @@ -121,7 +129,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str): media_info = streamer.info # Create video and audio sources/tracks - queue_size_ms = 1000 # 1 second + queue_size_ms = 1000 video_source = rtc.VideoSource( width=media_info.video_width, height=media_info.video_height, @@ -157,26 +165,54 @@ async def main(room: rtc.Room, room_name: str, media_path: str): ) async def _push_frames( - stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame], + stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]], av_sync: rtc.AVSynchronizer, ): - async for frame in stream: - await av_sync.push(frame) + async for frame, timestamp in stream: + await av_sync.push(frame, timestamp) await asyncio.sleep(0) + async def _log_fps(av_sync: rtc.AVSynchronizer): + start_time = asyncio.get_running_loop().time() + while True: + await asyncio.sleep(2) + wall_time = asyncio.get_running_loop().time() - start_time + diff = av_sync.last_video_time - av_sync.last_audio_time + logger.info( + f"fps: {av_sync.actual_fps:.2f}, wall_time: {wall_time:.3f}s, " + f"video_time: {av_sync.last_video_time:.3f}s, " + f"audio_time: {av_sync.last_audio_time:.3f}s, diff: {diff:.3f}s" + ) + try: while True: streamer.reset() - video_task = asyncio.create_task( - _push_frames(streamer.stream_video(), av_sync) - ) - audio_task = asyncio.create_task( - _push_frames(streamer.stream_audio(), av_sync) + + video_stream = streamer.stream_video() + audio_stream = streamer.stream_audio() + + # read the head frames and push them at the same time + first_video_frame, video_timestamp = await video_stream.__anext__() + first_audio_frame, audio_timestamp = await audio_stream.__anext__() + logger.info( + f"first video duration: {1/media_info.video_fps:.3f}s, " + f"first audio duration: {first_audio_frame.duration:.3f}s" ) + await av_sync.push(first_video_frame, video_timestamp) + await av_sync.push(first_audio_frame, audio_timestamp) + + video_task = asyncio.create_task(_push_frames(video_stream, av_sync)) + audio_task = asyncio.create_task(_push_frames(audio_stream, av_sync)) + + log_fps_task = asyncio.create_task(_log_fps(av_sync)) # wait for both tasks to complete await asyncio.gather(video_task, audio_task) await av_sync.wait_for_playout() + + # clean up + av_sync.reset() + log_fps_task.cancel() logger.info("playout finished") finally: await streamer.aclose() diff --git a/livekit-rtc/livekit/rtc/synchronizer.py b/livekit-rtc/livekit/rtc/synchronizer.py index 09d442cd..14df3b6c 100644 --- a/livekit-rtc/livekit/rtc/synchronizer.py +++ b/livekit-rtc/livekit/rtc/synchronizer.py @@ -9,6 +9,7 @@ from .audio_source import AudioSource from .video_source import VideoSource + logger = logging.getLogger(__name__) @@ -43,6 +44,9 @@ def __init__( self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False + # the time of the last video/audio frame captured + self._last_video_time: float = 0 + self._last_audio_time: float = 0 self._video_queue_max_size = int( self._video_fps * self._video_queue_size_ms / 1000 @@ -51,7 +55,7 @@ def __init__( # ensure queue is bounded if queue size is specified self._video_queue_max_size = max(1, self._video_queue_max_size) - self._video_queue = asyncio.Queue[VideoFrame]( + self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]]( maxsize=self._video_queue_max_size ) self._fps_controller = _FPSController( @@ -60,28 +64,47 @@ def __init__( ) self._capture_video_task = asyncio.create_task(self._capture_video()) - async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: + async def push( + self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None + ) -> None: + """Push a frame to the synchronizer + + Args: + frame: The video or audio frame to push. + timestamp: (optional) The timestamp of the frame, for logging purposes for now. + For AudioFrame, it should be the end time of the frame. + """ if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) + if timestamp is not None: + self._last_audio_time = timestamp return - await self._video_queue.put(frame) + await self._video_queue.put((frame, timestamp)) async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get() + self._video_queue.task_done() async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" - await self._audio_source.wait_for_playout() - await self._video_queue.join() + await asyncio.gather( + self._audio_source.wait_for_playout(), + self._video_queue.join(), + ) + + def reset(self) -> None: + self._fps_controller.reset() async def _capture_video(self) -> None: while not self._stopped: - frame = await self._video_queue.get() + frame, timestamp = await self._video_queue.get() async with self._fps_controller: self._video_source.capture_frame(frame) + if timestamp is not None: + self._last_video_time = timestamp self._video_queue.task_done() async def aclose(self) -> None: @@ -93,6 +116,16 @@ async def aclose(self) -> None: def actual_fps(self) -> float: return self._fps_controller.actual_fps + @property + def last_video_time(self) -> float: + """The time of the last video frame captured""" + return self._last_video_time + + @property + def last_audio_time(self) -> float: + """The time of the last audio frame played out""" + return self._last_audio_time - self._audio_source.queued_duration + class _FPSController: def __init__( @@ -123,6 +156,10 @@ async def __aenter__(self) -> None: async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: self.after_process() + def reset(self) -> None: + self._next_frame_time = None + self._send_timestamps.clear() + async def wait_next_process(self) -> None: """Wait until it's time for the next frame.