Skip to content

Commit

Permalink
handle process initialization failure & increase concurrent proc init (
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Feb 27, 2025
1 parent d877c6b commit b283301
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-dragons-arrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

handle process initialization failure
6 changes: 5 additions & 1 deletion livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ def proc_main(args: ProcStartArgs) -> None:

pid = current_process().pid
logger.info("initializing job process", extra={"pid": pid})
client.initialize()
try:
client.initialize()
except Exception:
return # initialization failed, exit

logger.info("job process initialized", extra={"pid": pid})

client.run()
Expand Down
9 changes: 7 additions & 2 deletions livekit-agents/livekit/agents/ipc/proc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ def initialize(self) -> None:
)

self._init_req = first_req
self._initialize_fnc(self._init_req, self)
send_message(cch, InitializeResponse())
try:
self._initialize_fnc(self._init_req, self)
send_message(cch, InitializeResponse())
except Exception as e:
send_message(cch, InitializeResponse(error=str(e)))
raise

self._initialized = True
cch.detach()
except aio.duplex_unix.DuplexClosed as e:
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"process_job_launched",
]

MAX_CONCURRENT_INITIALIZATIONS = 1
MAX_CONCURRENT_INITIALIZATIONS = 5


class ProcPool(utils.EventEmitter[EventTypes]):
Expand Down
7 changes: 7 additions & 0 deletions livekit-agents/livekit/agents/ipc/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class InitializeResponse:
"""mark the process as initialized"""

MSG_ID: ClassVar[int] = 1
error: str = ""

def write(self, b: io.BytesIO) -> None:
channel.write_string(b, self.error)

def read(self, b: io.BytesIO) -> None:
self.error = channel.read_string(b)


@dataclass
Expand Down
17 changes: 14 additions & 3 deletions livekit-agents/livekit/agents/ipc/supervised_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _add_proc_ctx_log(record: logging.LogRecord) -> None:
log_listener.start()

self._proc = self._create_process(mp_cch, mp_log_cch)
self._proc.start()
await self._loop.run_in_executor(None, self._proc.start)
mp_log_cch.close()
mp_cch.close()

Expand Down Expand Up @@ -168,6 +168,19 @@ async def initialize(self) -> None:
assert isinstance(init_res, proto.InitializeResponse), (
"first message must be InitializeResponse"
)

if init_res.error:
self._initialize_fut.set_exception(
RuntimeError(f"process initialization failed: {init_res.error}")
)
logger.error(
f"process initialization failed: {init_res.error}",
extra=self.logging_extra(),
)
raise RuntimeError(f"process initialization failed: {init_res.error}")
else:
self._initialize_fut.set_result(None)

except asyncio.TimeoutError:
self._initialize_fut.set_exception(
asyncio.TimeoutError("process initialization timed out")
Expand All @@ -180,8 +193,6 @@ async def initialize(self) -> None:
except Exception as e: # should be channel.ChannelClosed most of the time
self._initialize_fut.set_exception(e)
raise
else:
self._initialize_fut.set_result(None)

async def aclose(self) -> None:
"""attempt to gracefully close the supervised process"""
Expand Down

0 comments on commit b283301

Please sign in to comment.