Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ def substitute_variables_safe(
return result


def _is_allowed_atomic_command(command: str, executor_type: str) -> bool:
"""Return True if command exactly matches an embedded atomic test/cleanup command."""
try:
from .atomics import ATOMICS # local import to avoid circular import at module load
except Exception:
return False

et = (executor_type or "").lower().strip()
for technique in ATOMICS.values():
for test in technique.get("tests", []):
if str(test.get("executor_type", "")).lower().strip() != et:
continue
test_cmd = test.get("command")
cleanup_cmd = test.get("cleanup_command")
if command == test_cmd or (cleanup_cmd is not None and command == cleanup_cmd):
return True
return False


def execute(
command: str,
executor_type: str,
Expand All @@ -117,6 +136,13 @@ def execute(
"""
executor_type = executor_type.lower().strip()

if not _is_allowed_atomic_command(command, executor_type):
logger.warning("Rejected non-allowlisted command for executor=%s", executor_type)
return ExecutionResult(
command=command,
error="Command is not in the embedded atomic allowlist.",
)

if dry_run:
logger.info("[DRY RUN] executor=%s command=%s", executor_type, command[:80])
return ExecutionResult(
Expand Down
Loading