Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datetime import datetime, timezone

from .atomics import get_all_techniques, get_technique, get_test
from .executor import execute, execute_cleanup, substitute_variables, DEFAULT_TIMEOUT
from .executor import execute, execute_cleanup, substitute_variables_safe, DEFAULT_TIMEOUT
from .event_collector import collect_events, normalize_via_lognorm
from .validator import validate_detection, validate_events_only
from .storage import RunStorage
Expand Down Expand Up @@ -147,12 +147,12 @@ def run_test(
}

# Substitute variables in command
command = substitute_variables(
test["command"], input_args, test.get("input_arguments", {})
command = substitute_variables_safe(
test["command"], input_args, test.get("input_arguments", {}), test["executor_type"]
)
cleanup_cmd_raw = test.get("cleanup_command")
cleanup_command = substitute_variables(
cleanup_cmd_raw, input_args, test.get("input_arguments", {})
cleanup_command = substitute_variables_safe(
cleanup_cmd_raw, input_args, test.get("input_arguments", {}), test["executor_type"]
) if cleanup_cmd_raw else None

executed_at = datetime.utcnow().isoformat() + "Z"
Expand Down
45 changes: 45 additions & 0 deletions core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import platform
import re
import shlex
import subprocess
import time
from dataclasses import dataclass, field
Expand Down Expand Up @@ -48,6 +49,50 @@ def substitute_variables(command: str, input_args: dict, test_input_defs: dict)
return result


def _escape_for_executor(value: object, executor_type: str) -> str:
"""Escape user-controlled values for the target shell/interpreter."""
s = str(value)
et = (executor_type or "").lower().strip()

if et in {"bash", "sh"}:
return shlex.quote(s)

if et == "powershell":
# PowerShell single-quoted string escaping: ' -> ''
return "'" + s.replace("'", "''") + "'"

if et == "cmd":
# Conservative quoting for cmd.exe arguments.
return '"' + s.replace('"', '""') + '"'

# Fallback: safe POSIX-style quoting.
return shlex.quote(s)


def substitute_variables_safe(
command: str,
input_args: dict,
test_input_defs: dict,
executor_type: str,
) -> str:
"""Replace #{variable} placeholders with safely escaped values for executor_type."""
result = command
for arg_name, arg_def in test_input_defs.items():
value = input_args.get(arg_name, arg_def.get("default", ""))
result = result.replace(f"#{{{arg_name}}}", _escape_for_executor(value, executor_type))

# Replace any remaining #{...} placeholders with escaped value or marker.
result = re.sub(
r"#\{([^}]+)\}",
lambda m: _escape_for_executor(
input_args.get(m.group(1), f"MISSING_{m.group(1)}"),
executor_type,
),
result,
)
return result


def execute(
command: str,
executor_type: str,
Expand Down
Loading