Source code for automation_file.core.progress

"""Transfer progress + cancellation primitives.

Long-running transfers (HTTP downloads, S3 uploads/downloads, …) accept a
named handle from the shared :data:`progress_registry`. The registry keeps a
:class:`ProgressReporter` (bytes transferred, optional total) and a
:class:`CancellationToken` per name so the GUI or a JSON action can observe
progress or cancel mid-flight.

Instrumentation is opt-in: callers pass ``progress_name="<label>"`` to enable
tracking. When omitted, transfers run exactly as before with zero overhead
beyond one attribute lookup.
"""

from __future__ import annotations

import threading
import time
from dataclasses import dataclass, field
from typing import Any

from automation_file.exceptions import FileAutomationException


[docs] class CancelledException(FileAutomationException): """Raised when a cancellable operation is asked to stop mid-flight."""
[docs] class CancellationToken: """Thread-safe boolean flag, pollable from worker threads.""" def __init__(self) -> None: self._event = threading.Event()
[docs] def cancel(self) -> None: self._event.set()
@property def is_cancelled(self) -> bool: return self._event.is_set()
[docs] def raise_if_cancelled(self) -> None: if self._event.is_set(): raise CancelledException("operation cancelled")
[docs] @dataclass class ProgressReporter: """Tracks bytes transferred for one named operation.""" name: str total: int | None = None transferred: int = 0 status: str = "running" started_at: float = field(default_factory=time.monotonic) finished_at: float | None = None _lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
[docs] def update(self, delta: int) -> None: if delta <= 0: return with self._lock: self.transferred += delta
[docs] def finish(self, status: str = "done") -> None: with self._lock: self.status = status self.finished_at = time.monotonic()
@property def is_finished(self) -> bool: return self.finished_at is not None
[docs] def snapshot(self) -> dict[str, Any]: with self._lock: return { "name": self.name, "total": self.total, "transferred": self.transferred, "status": self.status, "started_at": self.started_at, "finished_at": self.finished_at, }
[docs] class ProgressRegistry: """Named handles so JSON actions / the GUI can address ongoing transfers.""" def __init__(self) -> None: self._lock = threading.Lock() self._entries: dict[str, tuple[ProgressReporter, CancellationToken]] = {}
[docs] def create( self, name: str, total: int | None = None ) -> tuple[ProgressReporter, CancellationToken]: reporter = ProgressReporter(name=name, total=total) token = CancellationToken() with self._lock: self._entries[name] = (reporter, token) return reporter, token
[docs] def lookup(self, name: str) -> tuple[ProgressReporter, CancellationToken] | None: with self._lock: return self._entries.get(name)
[docs] def cancel(self, name: str) -> bool: with self._lock: entry = self._entries.get(name) if entry is None: return False entry[1].cancel() return True
[docs] def forget(self, name: str) -> bool: with self._lock: return self._entries.pop(name, None) is not None
[docs] def clear_finished(self) -> int: with self._lock: finished = [ name for name, (reporter, _) in self._entries.items() if reporter.is_finished ] for name in finished: self._entries.pop(name, None) return len(finished)
[docs] def list(self) -> list[dict[str, Any]]: with self._lock: snapshots = [reporter.snapshot() for reporter, _ in self._entries.values()] return snapshots
def __contains__(self, name: object) -> bool: return isinstance(name, str) and name in self._entries
progress_registry: ProgressRegistry = ProgressRegistry()
[docs] def progress_list() -> list[dict[str, Any]]: """Snapshot of every registered transfer.""" return progress_registry.list()
[docs] def progress_cancel(name: str) -> bool: """Cancel the named transfer. Returns ``False`` if no such handle.""" return progress_registry.cancel(name)
[docs] def progress_clear() -> int: """Drop every finished transfer from the registry.""" return progress_registry.clear_finished()
[docs] def register_progress_ops(registry: Any) -> None: """Wire ``FA_progress_*`` actions into an :class:`ActionRegistry`.""" registry.register_many( { "FA_progress_list": progress_list, "FA_progress_cancel": progress_cancel, "FA_progress_clear": progress_clear, } )