Source code for automation_file.core.content_store

"""SHA-256 content-addressable store.

:class:`ContentStore` ingests files or byte blobs and keys them by the hex
digest of their contents. A two-character fanout directory keeps any single
directory small: ``<root>/ab/abcdef…``. Identical inputs map to the same blob —
callers get deduplication for free.
"""

from __future__ import annotations

import hashlib
import os
import shutil
from collections.abc import Callable, Iterator
from pathlib import Path
from typing import IO

from automation_file.exceptions import CASException

_HASH = "sha256"
_FANOUT = 2
_CHUNK = 1 << 20


[docs] class ContentStore: """Filesystem-backed CAS under ``root``.""" def __init__(self, root: str | os.PathLike[str]) -> None: self._root = Path(root) self._root.mkdir(parents=True, exist_ok=True) @property def root(self) -> Path: return self._root
[docs] def path_for(self, digest: str) -> Path: if len(digest) < _FANOUT + 1 or not all(c in "0123456789abcdef" for c in digest): raise CASException(f"invalid digest {digest!r}") return self._root / digest[:_FANOUT] / digest
[docs] def exists(self, digest: str) -> bool: return self.path_for(digest).is_file()
[docs] def put(self, source: str | os.PathLike[str]) -> str: """Ingest the file at ``source`` and return its hex digest.""" src = Path(source) if not src.is_file(): raise CASException(f"source is not a file: {src}") digest = self._hash_file(src) target = self.path_for(digest) if not target.exists(): self._write_atomic(target, lambda tmp: _copyfile(src, tmp)) return digest
[docs] def put_bytes(self, data: bytes) -> str: """Ingest raw bytes and return the hex digest.""" digest = hashlib.new(_HASH, data).hexdigest() target = self.path_for(digest) if not target.exists(): self._write_atomic(target, lambda tmp: _write_bytes(tmp, data)) return digest
def _write_atomic(self, target: Path, writer: Callable[[Path], None]) -> None: target.parent.mkdir(parents=True, exist_ok=True) tmp = target.with_suffix(target.suffix + ".tmp") try: writer(tmp) os.replace(tmp, target) except OSError as error: if tmp.exists(): tmp.unlink(missing_ok=True) raise CASException(f"failed to store blob: {error}") from error
[docs] def open(self, digest: str) -> IO[bytes]: """Open the stored blob for binary read.""" path = self.path_for(digest) if not path.is_file(): raise CASException(f"missing blob {digest}") return open(path, "rb")
[docs] def copy_to(self, digest: str, destination: str | os.PathLike[str]) -> Path: """Copy the blob at ``digest`` into ``destination``. Returns the path.""" src = self.path_for(digest) if not src.is_file(): raise CASException(f"missing blob {digest}") dest = Path(destination) dest.parent.mkdir(parents=True, exist_ok=True) shutil.copyfile(src, dest) return dest
[docs] def delete(self, digest: str) -> bool: """Remove a blob. Returns True when the blob existed.""" path = self.path_for(digest) if not path.is_file(): return False path.unlink() return True
[docs] def iter_digests(self) -> Iterator[str]: """Yield the digest of every stored blob.""" if not self._root.exists(): return for bucket in self._root.iterdir(): if not bucket.is_dir() or len(bucket.name) != _FANOUT: continue for blob in bucket.iterdir(): if blob.is_file() and blob.name.startswith(bucket.name): yield blob.name
[docs] def size(self) -> int: """Return the total number of stored blobs.""" return sum(1 for _ in self.iter_digests())
def _hash_file(self, path: Path) -> str: hasher = hashlib.new(_HASH) with open(path, "rb") as fh: for chunk in iter(lambda: fh.read(_CHUNK), b""): hasher.update(chunk) return hasher.hexdigest()
def _write_bytes(target: Path, data: bytes) -> None: with open(target, "wb") as fh: fh.write(data) def _copyfile(src: Path, dst: Path) -> None: shutil.copyfile(src, dst)