Source code for automation_file.core.secrets

"""Secret provider abstraction.

A secret is a (name -> value) lookup. Three built-in providers compose via
:class:`ChainedSecretProvider`:

    - :class:`EnvSecretProvider` resolves from ``os.environ``
    - :class:`FileSecretProvider` resolves from a directory of per-secret files
      (Docker / K8s secrets layout)
    - :class:`ChainedSecretProvider` tries a list in order

The purpose is to keep secrets out of the config file itself. Callers
write references like ``${env:SLACK_WEBHOOK_URL}`` inside
``automation_file.toml``; :func:`resolve_secret_refs` walks the document
and substitutes. Missing secrets raise :class:`SecretNotFoundException`
so a typo in a reference never silently becomes an empty string.
"""

from __future__ import annotations

import os
import re
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any

from automation_file.exceptions import FileAutomationException
from automation_file.logging_config import file_automation_logger

_SECRET_PATTERN = re.compile(r"\$\{(env|file):([^}]+)\}")


[docs] class SecretException(FileAutomationException): """Base for secret-provider failures."""
[docs] class SecretNotFoundException(SecretException): """Raised when a ``${provider:name}`` reference cannot be resolved."""
[docs] class SecretProvider(ABC): """Contract for a read-only secret lookup.""" scheme: str
[docs] @abstractmethod def get(self, name: str) -> str | None: """Return the secret's value, or ``None`` if this provider has no entry."""
[docs] class EnvSecretProvider(SecretProvider): """Read secrets from process environment variables.""" scheme = "env"
[docs] def get(self, name: str) -> str | None: return os.environ.get(name)
[docs] class FileSecretProvider(SecretProvider): """Read secrets from ``<root>/<name>`` (e.g., ``/run/secrets/<name>``).""" scheme = "file" def __init__(self, root: str | os.PathLike[str]) -> None: self._root = Path(root)
[docs] def get(self, name: str) -> str | None: candidate = self._root / name if not candidate.is_file(): return None try: return candidate.read_text(encoding="utf-8").rstrip("\r\n") except OSError as err: file_automation_logger.warning("FileSecretProvider: read %s failed: %r", candidate, err) return None
[docs] class ChainedSecretProvider(SecretProvider): """Try each child provider in order; return the first non-``None`` value. Providers are grouped by ``scheme`` — ``${env:X}`` only consults :class:`EnvSecretProvider` children, ``${file:X}`` only consults :class:`FileSecretProvider` children. Unknown schemes raise :class:`SecretNotFoundException` at resolution time. """ scheme = "*" def __init__(self, providers: list[SecretProvider]) -> None: self._providers = list(providers)
[docs] def get(self, name: str) -> str | None: # Default: consult every child regardless of scheme. Callers who # want scheme-scoped lookup should go through resolve_one(). for provider in self._providers: value = provider.get(name) if value is not None: return value return None
[docs] def resolve_one(self, scheme: str, name: str) -> str: for provider in self._providers: if provider.scheme not in (scheme, "*"): continue value = provider.get(name) if value is not None: return value raise SecretNotFoundException(f"no secret for ${{{scheme}:{name}}}")
[docs] def resolve_secret_refs(value: Any, provider: ChainedSecretProvider) -> Any: """Walk ``value`` (dict/list/str) and substitute every ``${scheme:name}``. Strings without references are returned unchanged; non-string scalars pass through untouched. Any unresolved reference raises :class:`SecretNotFoundException` — callers should treat this as a hard configuration error rather than continuing with a hole in the config. """ if isinstance(value, str): return _resolve_string(value, provider) if isinstance(value, dict): return {key: resolve_secret_refs(item, provider) for key, item in value.items()} if isinstance(value, list): return [resolve_secret_refs(item, provider) for item in value] return value
def _resolve_string(text: str, provider: ChainedSecretProvider) -> str: def substitute(match: re.Match[str]) -> str: scheme, name = match.group(1), match.group(2).strip() return provider.resolve_one(scheme, name) return _SECRET_PATTERN.sub(substitute, text)
[docs] def default_provider(file_root: str | os.PathLike[str] | None = None) -> ChainedSecretProvider: """Return an env-first chain, optionally augmented with a file provider.""" providers: list[SecretProvider] = [EnvSecretProvider()] if file_root is not None: providers.append(FileSecretProvider(file_root)) return ChainedSecretProvider(providers)