diff --git a/.changeset/twelve-hotels-march.md b/.changeset/twelve-hotels-march.md new file mode 100644 index 000000000..bec7983ed --- /dev/null +++ b/.changeset/twelve-hotels-march.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +Add memory to load calc. The default load calc now returns the maximum of the memory usage and CPU usage. diff --git a/livekit-agents/livekit/agents/utils/hw/__init__.py b/livekit-agents/livekit/agents/utils/hw/__init__.py index 0610f3748..3a1fa968a 100644 --- a/livekit-agents/livekit/agents/utils/hw/__init__.py +++ b/livekit-agents/livekit/agents/utils/hw/__init__.py @@ -1,3 +1,18 @@ from .cpu import CGroupV2CPUMonitor, CPUMonitor, DefaultCPUMonitor, get_cpu_monitor +from .memory import ( + CGroupV2MemoryMonitor, + DefaultMemoryMonitor, + MemoryMonitor, + get_memory_monitor, +) -__all__ = ["get_cpu_monitor", "CPUMonitor", "CGroupV2CPUMonitor", "DefaultCPUMonitor"] +__all__ = [ + "get_cpu_monitor", + "CPUMonitor", + "CGroupV2CPUMonitor", + "DefaultCPUMonitor", + "get_memory_monitor", + "MemoryMonitor", + "CGroupV2MemoryMonitor", + "DefaultMemoryMonitor", +] diff --git a/livekit-agents/livekit/agents/utils/hw/memory.py b/livekit-agents/livekit/agents/utils/hw/memory.py new file mode 100644 index 000000000..4edb9607f --- /dev/null +++ b/livekit-agents/livekit/agents/utils/hw/memory.py @@ -0,0 +1,63 @@ +import os +from abc import ABC, abstractmethod + +import psutil + + +class MemoryMonitor(ABC): + @abstractmethod + def memory_total(self) -> int: + """Total memory available in bytes.""" + pass + + @abstractmethod + def memory_used(self) -> int: + """Memory currently in use in bytes.""" + pass + + @abstractmethod + def memory_percent(self) -> float: + """Memory usage percentage between 0 and 1""" + pass + + +class DefaultMemoryMonitor(MemoryMonitor): + def memory_total(self) -> int: + return psutil.virtual_memory().total + + def memory_used(self) -> int: + return psutil.virtual_memory().used + + def memory_percent(self) -> float: + return psutil.virtual_memory().percent / 100.0 + + +class CGroupV2MemoryMonitor(MemoryMonitor): + def memory_total(self) -> int: + try: + with open("/sys/fs/cgroup/memory.max", "r") as f: + max_memory = f.read().strip() + if max_memory == "max": + return psutil.virtual_memory().total + return int(max_memory) + except FileNotFoundError: + return psutil.virtual_memory().total + + def memory_used(self) -> int: + with open("/sys/fs/cgroup/memory.current", "r") as f: + return int(f.read().strip()) + + def memory_percent(self) -> float: + used = self.memory_used() + total = self.memory_total() + return min(used / total, 1.0) + + +def get_memory_monitor() -> MemoryMonitor: + if _is_cgroup_v2(): + return CGroupV2MemoryMonitor() + return DefaultMemoryMonitor() + + +def _is_cgroup_v2() -> bool: + return os.path.exists("/sys/fs/cgroup/memory.current") diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 4708a34d3..85f4396c7 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -53,7 +53,7 @@ RunningJobInfo, ) from .log import DEV_LEVEL, logger -from .utils.hw import get_cpu_monitor +from .utils.hw import get_cpu_monitor, get_memory_monitor from .version import __version__ ASSIGNMENT_TIMEOUT = 7.5 @@ -77,8 +77,11 @@ class _DefaultLoadCalc: _instance = None def __init__(self) -> None: - self._m_avg = utils.MovingAverage(5) # avg over 2.5 + # Take average reading over 2.5s of CPU and memory + self._m_avg_cpu = utils.MovingAverage(5) + self._m_avg_mem = utils.MovingAverage(5) self._cpu_monitor = get_cpu_monitor() + self._mem_monitor = get_memory_monitor() self._thread = threading.Thread( target=self._calc_load, daemon=True, name="worker_cpu_load_monitor" ) @@ -88,19 +91,21 @@ def __init__(self) -> None: def _calc_load(self) -> None: while True: cpu_p = self._cpu_monitor.cpu_percent(interval=0.5) + mem_p = self._mem_monitor.memory_percent() with self._lock: - self._m_avg.add_sample(cpu_p) + self._m_avg_cpu.add_sample(cpu_p) + self._m_avg_mem.add_sample(mem_p) - def _get_avg(self) -> float: + def _get_avgs(self) -> tuple[float, float]: with self._lock: - return self._m_avg.get_avg() + return self._m_avg_cpu.get_avg(), self._m_avg_mem.get_avg() @classmethod def get_load(cls, worker: Worker) -> float: if cls._instance is None: cls._instance = _DefaultLoadCalc() - return cls._instance._m_avg.get_avg() + return max(*cls._instance._get_avgs()) @dataclass