Skip to content

Commit

Permalink
fix av sync example (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
longcw authored Jan 3, 2025
1 parent 201b04d commit 244e2fc
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 28 deletions.
80 changes: 58 additions & 22 deletions examples/video-stream/video_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,38 @@ def __init__(self, media_file: Union[str, Path]) -> None:
def info(self) -> MediaInfo:
return self._info

async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]:
async def stream_video(self) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
"""Streams video frames from the media file in an endless loop."""
for av_frame in self._video_container.decode(video=0):
for i, av_frame in enumerate(self._video_container.decode(video=0)):
# Convert video frame to RGBA
frame = av_frame.to_rgb().to_ndarray()
frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8)
frame_rgba[:, :, :3] = frame
yield rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),
yield (
rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),
),
av_frame.time,
)

async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]:
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
"""Streams audio frames from the media file in an endless loop."""
for av_frame in self._audio_container.decode(audio=0):
# Convert audio frame to raw int16 samples
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
frame = (frame * 32768).astype(np.int16)
yield rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
duration = len(frame) / self.info.audio_sample_rate
yield (
rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
),
av_frame.time + duration,
)

def reset(self):
Expand All @@ -102,6 +109,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
api.VideoGrants(
room_join=True,
room=room_name,
agent=True,
)
)
.to_jwt()
Expand All @@ -121,7 +129,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
media_info = streamer.info

# Create video and audio sources/tracks
queue_size_ms = 1000 # 1 second
queue_size_ms = 1000
video_source = rtc.VideoSource(
width=media_info.video_width,
height=media_info.video_height,
Expand Down Expand Up @@ -157,26 +165,54 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
)

async def _push_frames(
stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame],
stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]],
av_sync: rtc.AVSynchronizer,
):
async for frame in stream:
await av_sync.push(frame)
async for frame, timestamp in stream:
await av_sync.push(frame, timestamp)
await asyncio.sleep(0)

async def _log_fps(av_sync: rtc.AVSynchronizer):
start_time = asyncio.get_running_loop().time()
while True:
await asyncio.sleep(2)
wall_time = asyncio.get_running_loop().time() - start_time
diff = av_sync.last_video_time - av_sync.last_audio_time
logger.info(
f"fps: {av_sync.actual_fps:.2f}, wall_time: {wall_time:.3f}s, "
f"video_time: {av_sync.last_video_time:.3f}s, "
f"audio_time: {av_sync.last_audio_time:.3f}s, diff: {diff:.3f}s"
)

try:
while True:
streamer.reset()
video_task = asyncio.create_task(
_push_frames(streamer.stream_video(), av_sync)
)
audio_task = asyncio.create_task(
_push_frames(streamer.stream_audio(), av_sync)

video_stream = streamer.stream_video()
audio_stream = streamer.stream_audio()

# read the head frames and push them at the same time
first_video_frame, video_timestamp = await video_stream.__anext__()
first_audio_frame, audio_timestamp = await audio_stream.__anext__()
logger.info(
f"first video duration: {1/media_info.video_fps:.3f}s, "
f"first audio duration: {first_audio_frame.duration:.3f}s"
)
await av_sync.push(first_video_frame, video_timestamp)
await av_sync.push(first_audio_frame, audio_timestamp)

video_task = asyncio.create_task(_push_frames(video_stream, av_sync))
audio_task = asyncio.create_task(_push_frames(audio_stream, av_sync))

log_fps_task = asyncio.create_task(_log_fps(av_sync))

# wait for both tasks to complete
await asyncio.gather(video_task, audio_task)
await av_sync.wait_for_playout()

# clean up
av_sync.reset()
log_fps_task.cancel()
logger.info("playout finished")
finally:
await streamer.aclose()
Expand Down
49 changes: 43 additions & 6 deletions livekit-rtc/livekit/rtc/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .audio_source import AudioSource
from .video_source import VideoSource


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -43,6 +44,9 @@ def __init__(
self._max_delay_tolerance_ms = _max_delay_tolerance_ms

self._stopped = False
# the time of the last video/audio frame captured
self._last_video_time: float = 0
self._last_audio_time: float = 0

self._video_queue_max_size = int(
self._video_fps * self._video_queue_size_ms / 1000
Expand All @@ -51,7 +55,7 @@ def __init__(
# ensure queue is bounded if queue size is specified
self._video_queue_max_size = max(1, self._video_queue_max_size)

self._video_queue = asyncio.Queue[VideoFrame](
self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]](
maxsize=self._video_queue_max_size
)
self._fps_controller = _FPSController(
Expand All @@ -60,28 +64,47 @@ def __init__(
)
self._capture_video_task = asyncio.create_task(self._capture_video())

async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
async def push(
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
) -> None:
"""Push a frame to the synchronizer
Args:
frame: The video or audio frame to push.
timestamp: (optional) The timestamp of the frame, for logging purposes for now.
For AudioFrame, it should be the end time of the frame.
"""
if isinstance(frame, AudioFrame):
await self._audio_source.capture_frame(frame)
if timestamp is not None:
self._last_audio_time = timestamp
return

await self._video_queue.put(frame)
await self._video_queue.put((frame, timestamp))

async def clear_queue(self) -> None:
self._audio_source.clear_queue()
while not self._video_queue.empty():
await self._video_queue.get()
self._video_queue.task_done()

async def wait_for_playout(self) -> None:
"""Wait until all video and audio frames are played out."""
await self._audio_source.wait_for_playout()
await self._video_queue.join()
await asyncio.gather(
self._audio_source.wait_for_playout(),
self._video_queue.join(),
)

def reset(self) -> None:
self._fps_controller.reset()

async def _capture_video(self) -> None:
while not self._stopped:
frame = await self._video_queue.get()
frame, timestamp = await self._video_queue.get()
async with self._fps_controller:
self._video_source.capture_frame(frame)
if timestamp is not None:
self._last_video_time = timestamp
self._video_queue.task_done()

async def aclose(self) -> None:
Expand All @@ -93,6 +116,16 @@ async def aclose(self) -> None:
def actual_fps(self) -> float:
return self._fps_controller.actual_fps

@property
def last_video_time(self) -> float:
"""The time of the last video frame captured"""
return self._last_video_time

@property
def last_audio_time(self) -> float:
"""The time of the last audio frame played out"""
return self._last_audio_time - self._audio_source.queued_duration


class _FPSController:
def __init__(
Expand Down Expand Up @@ -123,6 +156,10 @@ async def __aenter__(self) -> None:
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.after_process()

def reset(self) -> None:
self._next_frame_time = None
self._send_timestamps.clear()

async def wait_next_process(self) -> None:
"""Wait until it's time for the next frame.
Expand Down

0 comments on commit 244e2fc

Please sign in to comment.