Skip to content

Commit 61eb7fd

Browse files
committed
add write lock to file streams
1 parent e311a53 commit 61eb7fd

File tree

1 file changed

+16
-13
lines changed

1 file changed

+16
-13
lines changed

livekit-rtc/livekit/rtc/data_stream.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import asyncio
1718
import uuid
1819
import datetime
1920
from dataclasses import dataclass
@@ -303,21 +304,23 @@ def __init__(
303304
extensions=dict(self._header.extensions),
304305
file_name=self._header.file_header.file_name,
305306
)
307+
self._write_lock = asyncio.Lock()
306308

307309
async def write(self, data: bytes):
308-
chunked_data = [
309-
data[i : i + STREAM_CHUNK_SIZE]
310-
for i in range(0, len(data), STREAM_CHUNK_SIZE)
311-
]
312-
313-
for chunk in chunked_data:
314-
self._next_chunk_index += 1
315-
chunk_msg = proto_DataStream.Chunk(
316-
stream_id=self._header.stream_id,
317-
chunk_index=self._next_chunk_index,
318-
content=chunk,
319-
)
320-
await self._send_chunk(chunk_msg)
310+
async with self._write_lock:
311+
chunked_data = [
312+
data[i : i + STREAM_CHUNK_SIZE]
313+
for i in range(0, len(data), STREAM_CHUNK_SIZE)
314+
]
315+
316+
for chunk in chunked_data:
317+
self._next_chunk_index += 1
318+
chunk_msg = proto_DataStream.Chunk(
319+
stream_id=self._header.stream_id,
320+
chunk_index=self._next_chunk_index,
321+
content=chunk,
322+
)
323+
await self._send_chunk(chunk_msg)
321324

322325
@property
323326
def info(self) -> FileStreamInfo:

0 commit comments

Comments
 (0)