Source code for automation_file.core.file_lock

"""Cross-platform advisory file lock.

Uses ``fcntl.flock`` on POSIX and ``msvcrt.locking`` on Windows, so two processes
can serialise on a well-known lock path. Locks are exclusive (writer-style);
shared locks are not supported because ``msvcrt`` cannot express them portably.
"""

from __future__ import annotations

import contextlib
import os
import sys
import threading
import time
from pathlib import Path
from types import TracebackType
from typing import IO

from automation_file.exceptions import LockTimeoutException

_POLL_INTERVAL = 0.05


[docs] class FileLock: """Advisory exclusive lock on a sidecar lock file. ``path`` is the lock file itself — typically ``<resource>.lock`` next to the protected resource. ``timeout`` is the maximum seconds to wait when acquiring; ``None`` waits indefinitely, ``0`` fails immediately. """ def __init__(self, path: str | os.PathLike[str], timeout: float | None = None) -> None: self._path = Path(path) self._timeout = timeout self._fh: IO[bytes] | None = None self._thread_lock = threading.Lock() self._owned = False @property def path(self) -> Path: return self._path @property def is_held(self) -> bool: return self._owned
[docs] def acquire(self) -> None: """Block until the lock is held. Raises :class:`LockTimeoutException` on timeout.""" with self._thread_lock: if self._owned: raise LockTimeoutException(f"lock {self._path} already held by this instance") self._path.parent.mkdir(parents=True, exist_ok=True) # pylint: disable=consider-using-with fh = open(self._path, "a+b") # noqa: SIM115 — held across acquire/release try: self._acquire_os_lock(fh) except BaseException: fh.close() raise self._fh = fh self._owned = True
[docs] def release(self) -> None: """Release the lock; idempotent.""" with self._thread_lock: if not self._owned or self._fh is None: return try: self._release_os_lock(self._fh) finally: self._fh.close() self._fh = None self._owned = False
def __enter__(self) -> FileLock: self.acquire() return self def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> None: self.release() def _acquire_os_lock(self, fh: IO[bytes]) -> None: deadline = None if self._timeout is None else time.monotonic() + self._timeout while True: if _try_lock(fh): return if deadline is not None and time.monotonic() >= deadline: raise LockTimeoutException( f"timed out acquiring lock {self._path} after {self._timeout}s" ) time.sleep(_POLL_INTERVAL) def _release_os_lock(self, fh: IO[bytes]) -> None: _unlock(fh)
if sys.platform == "win32": import msvcrt def _try_lock(fh: IO[bytes]) -> bool: try: msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1) return True except OSError: return False def _unlock(fh: IO[bytes]) -> None: with contextlib.suppress(OSError): fh.seek(0) msvcrt.locking(fh.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl def _try_lock(fh: IO[bytes]) -> bool: try: fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) return True except OSError: return False def _unlock(fh: IO[bytes]) -> None: with contextlib.suppress(OSError): fcntl.flock(fh.fileno(), fcntl.LOCK_UN)