Source code for aiousbwatcher.impl

from __future__ import annotations

import asyncio
import logging
import warnings
from functools import partial
from pathlib import Path
from typing import Callable

_INOTIFY_EXCEPTION: Exception | None = None
try:
    with warnings.catch_warnings(action="ignore", category=UserWarning):
        from asyncinotify._ffi import libc  # noqa: F401
    from asyncinotify import Inotify, Mask
except Exception as ex:
    _INOTIFY_EXCEPTION = ex
    Mask = Inotify = None


_PATH = "/dev/bus/usb"

_LOGGER = logging.getLogger(__name__)


[docs] class InotifyNotAvailableError(Exception): """Raised when inotify is not available on the platform."""
def _get_directories_recursive(path: Path) -> list[Path]: return [dirpath for dirpath, dirnames, filenames in path.walk()] async def _async_get_directories_recursive( loop: asyncio.AbstractEventLoop, path: Path ) -> list[Path]: return await loop.run_in_executor(None, _get_directories_recursive, path)
[docs] class AIOUSBWatcher: """A watcher for USB devices that uses asyncio.""" def __init__(self) -> None: self._path = Path(_PATH) self._loop = asyncio.get_running_loop() self._task: asyncio.Task[None] | None = None self._callbacks: set[Callable[[], None]] = set()
[docs] def async_start(self) -> Callable[[], None]: """Start the watcher.""" if self._task is not None: raise RuntimeError("Watcher already started") if _INOTIFY_EXCEPTION is not None: raise InotifyNotAvailableError( "Inotify not available on this platform" ) from _INOTIFY_EXCEPTION self._task = self._loop.create_task(self._watcher()) return self._async_stop
[docs] def async_register_callback( self, callback: Callable[[], None] ) -> Callable[[], None]: """Register callback that will be called when a USB device is added/removed.""" self._callbacks.add(callback) return partial(self._async_unregister_callback, callback)
def _async_stop(self) -> None: """Stop the watcher.""" assert self._task is not None # noqa self._task.cancel() self._task = None async def _watcher(self) -> None: mask = ( Mask.CREATE | Mask.MOVED_FROM | Mask.MOVED_TO | Mask.CREATE | Mask.DELETE_SELF | Mask.DELETE | Mask.IGNORED ) with Inotify() as inotify: for directory in await _async_get_directories_recursive( self._loop, self._path ): inotify.add_watch(directory, mask) async for event in inotify: # Add subdirectories to watch if a new directory is added. if Mask.CREATE in event.mask and event.path is not None: for directory in await _async_get_directories_recursive( self._loop, event.path ): inotify.add_watch(directory, mask) # If there is at least some overlap, assume the user wants this event. if event.mask & mask: self._async_call_callbacks() def _async_unregister_callback(self, callback: Callable[[], None]) -> None: self._callbacks.remove(callback) def _async_call_callbacks(self) -> None: for callback in self._callbacks: try: callback() except Exception as e: _LOGGER.exception("Error calling callback %s", callback, exc_info=e)