|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import asyncio |
| 4 | +import logging |
| 5 | +from collections.abc import Generator |
| 6 | +from functools import partial |
| 7 | +from pathlib import Path |
| 8 | +from typing import Callable |
| 9 | + |
| 10 | +_PATH = "/dev/bus/usb" |
| 11 | + |
| 12 | +_LOGGER = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +class InotifyNotAvailableError(Exception): |
| 16 | + """Raised when inotify is not available on the platform.""" |
| 17 | + |
| 18 | + |
| 19 | +def _get_directories_recursive_gen(path: Path) -> Generator[Path, None, None]: |
| 20 | + if path.is_dir(): |
| 21 | + yield path |
| 22 | + for child in path.iterdir(): |
| 23 | + yield from _get_directories_recursive_gen(child) |
| 24 | + |
| 25 | + |
| 26 | +def _get_directories_recursive(path: Path) -> list[Path]: |
| 27 | + return list(_get_directories_recursive_gen(path)) |
| 28 | + |
| 29 | + |
| 30 | +async def _async_get_directories_recursive( |
| 31 | + loop: asyncio.AbstractEventLoop, path: Path |
| 32 | +) -> list[Path]: |
| 33 | + return await loop.run_in_executor(None, _get_directories_recursive, path) |
| 34 | + |
| 35 | + |
| 36 | +class AIOUSBWatcher: |
| 37 | + """A watcher for USB devices that uses asyncio.""" |
| 38 | + |
| 39 | + def __init__(self) -> None: |
| 40 | + self._path = Path(_PATH) |
| 41 | + self._loop = asyncio.get_running_loop() |
| 42 | + self._task: asyncio.Task[None] | None = None |
| 43 | + self._callbacks: set[Callable[[], None]] = set() |
| 44 | + |
| 45 | + def async_start(self) -> Callable[[], None]: |
| 46 | + """Start the watcher.""" |
| 47 | + if self._task is not None: |
| 48 | + raise RuntimeError("Watcher already started") |
| 49 | + try: |
| 50 | + from asyncinotify import Inotify # noqa |
| 51 | + except Exception as ex: |
| 52 | + raise InotifyNotAvailableError( |
| 53 | + "Inotify not available on this platform" |
| 54 | + ) from ex |
| 55 | + self._task = self._loop.create_task(self._watcher()) |
| 56 | + return self._async_stop |
| 57 | + |
| 58 | + def async_register_callback( |
| 59 | + self, callback: Callable[[], None] |
| 60 | + ) -> Callable[[], None]: |
| 61 | + """Register callback that will be called when a USB device is added/removed.""" |
| 62 | + self._callbacks.add(callback) |
| 63 | + return partial(self._async_unregister_callback, callback) |
| 64 | + |
| 65 | + def _async_stop(self) -> None: |
| 66 | + """Stop the watcher.""" |
| 67 | + assert self._task is not None # noqa |
| 68 | + self._task.cancel() |
| 69 | + self._task = None |
| 70 | + |
| 71 | + async def _watcher(self) -> None: |
| 72 | + from asyncinotify import Inotify, Mask |
| 73 | + |
| 74 | + mask = ( |
| 75 | + Mask.CREATE |
| 76 | + | Mask.MOVED_FROM |
| 77 | + | Mask.MOVED_TO |
| 78 | + | Mask.CREATE |
| 79 | + | Mask.DELETE_SELF |
| 80 | + | Mask.IGNORED |
| 81 | + ) |
| 82 | + |
| 83 | + with Inotify() as inotify: |
| 84 | + for directory in await _async_get_directories_recursive( |
| 85 | + self._loop, self._path |
| 86 | + ): |
| 87 | + inotify.add_watch(directory, mask) |
| 88 | + |
| 89 | + async for event in inotify: |
| 90 | + # Add subdirectories to watch if a new directory is added. |
| 91 | + if Mask.CREATE in event.mask and event.path is not None: |
| 92 | + for directory in await _async_get_directories_recursive( |
| 93 | + self._loop, event.path |
| 94 | + ): |
| 95 | + inotify.add_watch(directory, mask) |
| 96 | + |
| 97 | + # If there is at least some overlap, assume the user wants this event. |
| 98 | + if event.mask & mask: |
| 99 | + self._async_call_callbacks() |
| 100 | + |
| 101 | + def _async_unregister_callback(self, callback: Callable[[], None]) -> None: |
| 102 | + self._callbacks.remove(callback) |
| 103 | + |
| 104 | + def _async_call_callbacks(self) -> None: |
| 105 | + for callback in self._callbacks: |
| 106 | + try: |
| 107 | + callback() |
| 108 | + except Exception as e: |
| 109 | + _LOGGER.exception("Error calling callback %s", callback, exc_info=e) |
0 commit comments