"""Registry of named callables (Registry + Command pattern).
The registry decouples "what to run" (a string name inside a JSON action list)
from "how to run it" (a Python callable). Executors delegate name resolution
to an :class:`ActionRegistry`, which keeps look-up O(1) and lets plugins add
commands at runtime without touching the executor class.
"""
from __future__ import annotations
from collections.abc import Callable, Iterable, Iterator, Mapping
from typing import Any
from automation_file.exceptions import AddCommandException
from automation_file.logging_config import file_automation_logger
Command = Callable[..., Any]
[docs]
class ActionRegistry:
"""Mapping of action name -> callable."""
def __init__(self, initial: Mapping[str, Command] | None = None) -> None:
self._commands: dict[str, Command] = {}
if initial:
for name, command in initial.items():
self.register(name, command)
[docs]
def register(self, name: str, command: Command) -> None:
"""Add or overwrite a command. Raises if ``command`` is not callable."""
if not callable(command):
raise AddCommandException(f"{name!r} is not callable")
self._commands[name] = command
[docs]
def register_many(self, mapping: Mapping[str, Command]) -> None:
"""Register every ``name -> command`` pair in ``mapping``."""
for name, command in mapping.items():
self.register(name, command)
[docs]
def update(self, mapping: Mapping[str, Command]) -> None:
"""Alias for :meth:`register_many` (dict-compatible)."""
self.register_many(mapping)
[docs]
def unregister(self, name: str) -> None:
self._commands.pop(name, None)
[docs]
def resolve(self, name: str) -> Command | None:
return self._commands.get(name)
def __contains__(self, name: object) -> bool:
return isinstance(name, str) and name in self._commands
def __len__(self) -> int:
return len(self._commands)
def __iter__(self) -> Iterator[str]:
return iter(self._commands)
[docs]
def names(self) -> Iterable[str]:
return self._commands.keys()
@property
def event_dict(self) -> dict[str, Command]:
"""Backwards-compatible view used by older ``package_manager`` style code."""
return self._commands
def _local_commands() -> dict[str, Command]:
from automation_file.local import (
conditional,
data_ops,
diff_ops,
dir_ops,
file_ops,
json_edit,
shell_ops,
sync_ops,
tar_ops,
text_ops,
zip_ops,
)
return {
# Files
"FA_create_file": file_ops.create_file,
"FA_copy_file": file_ops.copy_file,
"FA_rename_file": file_ops.rename_file,
"FA_remove_file": file_ops.remove_file,
"FA_copy_all_file_to_dir": file_ops.copy_all_file_to_dir,
"FA_copy_specify_extension_file": file_ops.copy_specify_extension_file,
# Directories
"FA_copy_dir": dir_ops.copy_dir,
"FA_create_dir": dir_ops.create_dir,
"FA_remove_dir_tree": dir_ops.remove_dir_tree,
"FA_rename_dir": dir_ops.rename_dir,
"FA_sync_dir": sync_ops.sync_dir,
# Shell
"FA_run_shell": shell_ops.run_shell,
# JSON edit
"FA_json_get": json_edit.json_get,
"FA_json_set": json_edit.json_set,
"FA_json_delete": json_edit.json_delete,
# Tar
"FA_create_tar": tar_ops.create_tar,
"FA_extract_tar": tar_ops.extract_tar,
# Zip
"FA_zip_dir": zip_ops.zip_dir,
"FA_zip_file": zip_ops.zip_file,
"FA_zip_info": zip_ops.zip_info,
"FA_zip_file_info": zip_ops.zip_file_info,
"FA_set_zip_password": zip_ops.set_zip_password,
"FA_unzip_file": zip_ops.unzip_file,
"FA_read_zip_file": zip_ops.read_zip_file,
"FA_unzip_all": zip_ops.unzip_all,
# Conditional dispatch
"FA_if_exists": conditional.if_exists,
"FA_if_newer": conditional.if_newer,
"FA_if_size_gt": conditional.if_size_gt,
# Text / binary
"FA_file_split": text_ops.file_split,
"FA_file_merge": text_ops.file_merge,
"FA_encoding_convert": text_ops.encoding_convert,
"FA_line_count": text_ops.line_count,
"FA_sed_replace": text_ops.sed_replace,
# Diff / patch
"FA_diff_files": diff_ops.diff_text_files,
"FA_diff_dirs": diff_ops.diff_dirs_summary,
"FA_apply_patch": diff_ops.apply_text_patch,
# Structured data (CSV / JSONL)
"FA_csv_filter": data_ops.csv_filter,
"FA_csv_to_jsonl": data_ops.csv_to_jsonl,
"FA_jsonl_iter": data_ops.jsonl_iter,
"FA_jsonl_append": data_ops.jsonl_append,
# Structured data (YAML)
"FA_yaml_get": data_ops.yaml_get,
"FA_yaml_set": data_ops.yaml_set,
"FA_yaml_delete": data_ops.yaml_delete,
# Structured data (Parquet)
"FA_parquet_read": data_ops.parquet_read,
"FA_parquet_write": data_ops.parquet_write,
"FA_csv_to_parquet": data_ops.csv_to_parquet,
}
def _drive_commands() -> dict[str, Command]:
from automation_file.remote.google_drive import (
client,
delete_ops,
download_ops,
folder_ops,
search_ops,
share_ops,
upload_ops,
)
return {
"FA_drive_later_init": client.driver_instance.later_init,
"FA_drive_search_all_file": search_ops.drive_search_all_file,
"FA_drive_search_field": search_ops.drive_search_field,
"FA_drive_search_file_mimetype": search_ops.drive_search_file_mimetype,
"FA_drive_upload_dir_to_folder": upload_ops.drive_upload_dir_to_folder,
"FA_drive_upload_to_folder": upload_ops.drive_upload_to_folder,
"FA_drive_upload_dir_to_drive": upload_ops.drive_upload_dir_to_drive,
"FA_drive_upload_to_drive": upload_ops.drive_upload_to_drive,
"FA_drive_add_folder": folder_ops.drive_add_folder,
"FA_drive_share_file_to_anyone": share_ops.drive_share_file_to_anyone,
"FA_drive_share_file_to_domain": share_ops.drive_share_file_to_domain,
"FA_drive_share_file_to_user": share_ops.drive_share_file_to_user,
"FA_drive_delete_file": delete_ops.drive_delete_file,
"FA_drive_download_file": download_ops.drive_download_file,
"FA_drive_download_file_from_folder": download_ops.drive_download_file_from_folder,
}
def _http_commands() -> dict[str, Command]:
from automation_file.remote import http_download
return {"FA_download_file": http_download.download_file}
def _utils_commands() -> dict[str, Command]:
from automation_file.core import checksum, crypto, manifest, tracing
from automation_file.remote import cross_backend
from automation_file.utils import deduplicate, fast_find, grep, rotate
return {
"FA_fast_find": fast_find.fast_find,
"FA_file_checksum": checksum.file_checksum,
"FA_verify_checksum": checksum.verify_checksum,
"FA_find_duplicates": deduplicate.find_duplicates,
"FA_execute_action_dag": _lazy_execute_action_dag,
"FA_write_manifest": manifest.write_manifest,
"FA_verify_manifest": manifest.verify_manifest,
"FA_grep": grep.grep_files,
"FA_rotate_backups": rotate.rotate_backups,
"FA_copy_between": cross_backend.copy_between,
"FA_encrypt_file": crypto.encrypt_file,
"FA_decrypt_file": crypto.decrypt_file,
"FA_tracing_init": tracing.init_tracing,
}
def _lazy_execute_action_dag(
nodes: list,
max_workers: int = 4,
fail_fast: bool = True,
) -> dict[str, Any]:
"""Deferred import shim so the registry module doesn't depend on the DAG executor."""
from automation_file.core.dag_executor import execute_action_dag
return execute_action_dag(nodes, max_workers=max_workers, fail_fast=fail_fast)
def _register_cloud_backends(registry: ActionRegistry) -> None:
from automation_file.remote.azure_blob import register_azure_blob_ops
from automation_file.remote.box import register_box_ops
from automation_file.remote.dropbox_api import register_dropbox_ops
from automation_file.remote.ftp import register_ftp_ops
from automation_file.remote.onedrive import register_onedrive_ops
from automation_file.remote.s3 import register_s3_ops
from automation_file.remote.sftp import register_sftp_ops
register_s3_ops(registry)
register_azure_blob_ops(registry)
register_dropbox_ops(registry)
register_sftp_ops(registry)
register_ftp_ops(registry)
register_onedrive_ops(registry)
register_box_ops(registry)
def _register_trigger_ops(registry: ActionRegistry) -> None:
from automation_file.trigger import register_trigger_ops
register_trigger_ops(registry)
def _register_scheduler_ops(registry: ActionRegistry) -> None:
from automation_file.scheduler import register_scheduler_ops
register_scheduler_ops(registry)
def _register_progress_ops(registry: ActionRegistry) -> None:
from automation_file.core.progress import register_progress_ops
register_progress_ops(registry)
def _register_notify_ops(registry: ActionRegistry) -> None:
from automation_file.notify import register_notify_ops
register_notify_ops(registry)
[docs]
def build_default_registry() -> ActionRegistry:
"""Return a registry pre-populated with every built-in ``FA_*`` action.
After the built-ins are registered, any third-party package advertising
an ``automation_file.actions`` entry point is loaded so its commands
land in the same registry. Plugins may override built-in names.
"""
registry = ActionRegistry()
registry.register_many(_local_commands())
registry.register_many(_http_commands())
registry.register_many(_utils_commands())
registry.register_many(_drive_commands())
_register_cloud_backends(registry)
_register_trigger_ops(registry)
_register_scheduler_ops(registry)
_register_progress_ops(registry)
_register_notify_ops(registry)
_load_plugins(registry)
file_automation_logger.info(
"action_registry: built default registry with %d commands", len(registry)
)
return registry
def _load_plugins(registry: ActionRegistry) -> None:
from automation_file.core.plugins import load_entry_point_plugins
load_entry_point_plugins(registry.register_many)