Skip to content

Commit

Permalink
Include job count in WorkerStatus and load_fnc (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
keepingitneil authored Nov 14, 2024
1 parent 0c9eac9 commit 2e3616a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/moody-poets-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

Include job count in WorkerStatus and pass in worker for load_fnc
38 changes: 29 additions & 9 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import asyncio
import contextlib
import datetime
import inspect
import math
import multiprocessing as mp
import os
Expand All @@ -25,7 +26,14 @@
from dataclasses import dataclass, field
from enum import Enum
from functools import reduce
from typing import Any, Awaitable, Callable, Generic, Literal, TypeVar
from typing import (
Any,
Awaitable,
Callable,
Generic,
Literal,
TypeVar,
)
from urllib.parse import urljoin, urlparse

import aiohttp
Expand Down Expand Up @@ -87,7 +95,7 @@ def _get_avg(self) -> float:
return self._m_avg.get_avg()

@classmethod
def get_load(cls) -> float:
def get_load(cls, worker: Worker) -> float:
if cls._instance is None:
cls._instance = _DefaultLoadCalc()

Expand Down Expand Up @@ -137,7 +145,9 @@ class WorkerOptions:
When left empty, all jobs are accepted."""
prewarm_fnc: Callable[[JobProcess], Any] = _default_initialize_process_fnc
"""A function to perform any necessary initialization before the job starts."""
load_fnc: Callable[[], float] = _DefaultLoadCalc.get_load
load_fnc: Callable[[Worker], float] | Callable[[], float] = (
_DefaultLoadCalc.get_load
)
"""Called to determine the current load of the worker. Should return a value between 0 and 1."""
job_executor_type: JobExecutorType = _default_job_executor_type
"""Which executor to use to run jobs. (currently thread or process are supported)"""
Expand Down Expand Up @@ -690,15 +700,24 @@ def _on_process_job_launched(self, proc: ipc.job_executor.JobExecutor) -> None:
self._update_job_status_sync(proc)

async def _update_worker_status(self):
job_cnt = len(self.active_jobs)
if self._draining:
update = agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL)
update = agent.UpdateWorkerStatus(
status=agent.WorkerStatus.WS_FULL, job_count=job_cnt
)
msg = agent.WorkerMessage(update_worker=update)
await self._queue_msg(msg)
return

current_load = await asyncio.get_event_loop().run_in_executor(
None, self._opts.load_fnc
)
def load_fnc():
signature = inspect.signature(self._opts.load_fnc)
parameters = list(signature.parameters.values())
if len(parameters) == 0:
return self._opts.load_fnc() # type: ignore

return self._opts.load_fnc(self) # type: ignore

current_load = await asyncio.get_event_loop().run_in_executor(None, load_fnc)

is_full = current_load >= _WorkerEnvOption.getvalue(
self._opts.load_threshold, self._devmode
Expand All @@ -711,7 +730,9 @@ async def _update_worker_status(self):
else agent.WorkerStatus.WS_FULL
)

update = agent.UpdateWorkerStatus(load=current_load, status=status)
update = agent.UpdateWorkerStatus(
load=current_load, status=status, job_count=job_cnt
)

# only log if status has changed
if self._previous_status != status and not self._draining:
Expand Down Expand Up @@ -743,7 +764,6 @@ def _update_job_status_sync(self, proc: ipc.job_executor.JobExecutor) -> None:
async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None:
job_info = proc.running_job
if not job_info:
logger.error("job_info not found for process")
return
status: agent.JobStatus = agent.JobStatus.JS_RUNNING
if proc.run_status == ipc.job_executor.RunStatus.FINISHED_FAILED:
Expand Down

0 comments on commit 2e3616a

Please sign in to comment.