diff --git a/core/engine.py b/core/engine.py index c47512d..5dfad9b 100644 --- a/core/engine.py +++ b/core/engine.py @@ -19,7 +19,7 @@ from datetime import datetime, timezone from .atomics import get_all_techniques, get_technique, get_test -from .executor import execute, execute_cleanup, substitute_variables, DEFAULT_TIMEOUT +from .executor import execute, execute_cleanup, substitute_variables_safe, DEFAULT_TIMEOUT from .event_collector import collect_events, normalize_via_lognorm from .validator import validate_detection, validate_events_only from .storage import RunStorage @@ -147,12 +147,12 @@ def run_test( } # Substitute variables in command - command = substitute_variables( - test["command"], input_args, test.get("input_arguments", {}) + command = substitute_variables_safe( + test["command"], input_args, test.get("input_arguments", {}), test["executor_type"] ) cleanup_cmd_raw = test.get("cleanup_command") - cleanup_command = substitute_variables( - cleanup_cmd_raw, input_args, test.get("input_arguments", {}) + cleanup_command = substitute_variables_safe( + cleanup_cmd_raw, input_args, test.get("input_arguments", {}), test["executor_type"] ) if cleanup_cmd_raw else None executed_at = datetime.utcnow().isoformat() + "Z" diff --git a/core/executor.py b/core/executor.py index 083717f..56dc5d4 100644 --- a/core/executor.py +++ b/core/executor.py @@ -15,6 +15,7 @@ import os import platform import re +import shlex import subprocess import time from dataclasses import dataclass, field @@ -48,6 +49,50 @@ def substitute_variables(command: str, input_args: dict, test_input_defs: dict) return result +def _escape_for_executor(value: object, executor_type: str) -> str: + """Escape user-controlled values for the target shell/interpreter.""" + s = str(value) + et = (executor_type or "").lower().strip() + + if et in {"bash", "sh"}: + return shlex.quote(s) + + if et == "powershell": + # PowerShell single-quoted string escaping: ' -> '' + return "'" + s.replace("'", "''") + "'" + + if et == "cmd": + # Conservative quoting for cmd.exe arguments. + return '"' + s.replace('"', '""') + '"' + + # Fallback: safe POSIX-style quoting. + return shlex.quote(s) + + +def substitute_variables_safe( + command: str, + input_args: dict, + test_input_defs: dict, + executor_type: str, +) -> str: + """Replace #{variable} placeholders with safely escaped values for executor_type.""" + result = command + for arg_name, arg_def in test_input_defs.items(): + value = input_args.get(arg_name, arg_def.get("default", "")) + result = result.replace(f"#{{{arg_name}}}", _escape_for_executor(value, executor_type)) + + # Replace any remaining #{...} placeholders with escaped value or marker. + result = re.sub( + r"#\{([^}]+)\}", + lambda m: _escape_for_executor( + input_args.get(m.group(1), f"MISSING_{m.group(1)}"), + executor_type, + ), + result, + ) + return result + + def execute( command: str, executor_type: str,