Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion backend/secuscan/plugin_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,49 @@ def _check_validation_block(self, data: dict, result: ValidationResult) -> None:
result.add("validation", "Must be an object if present")
return

field_ids = {
field.get("id")
for field in data.get("fields", [])
if isinstance(field, dict)
}

for key, rule in validation.items():
prefix = f"validation.{key}"

if not isinstance(rule, dict):
result.add(prefix, "Each validation rule must be an object")
continue

if "required" in rule and not isinstance(rule["required"], bool):
result.add(f"{prefix}.required", "'required' must be a boolean")
result.add(
f"{prefix}.required",
"'required' must be a boolean",
)

mutually_exclusive = rule.get("mutually_exclusive")

if mutually_exclusive is None:
continue

if not isinstance(mutually_exclusive, list):
result.add(
f"{prefix}.mutually_exclusive",
"'mutually_exclusive' must be a list",
)
continue

if len(mutually_exclusive) < 2:
result.add(
f"{prefix}.mutually_exclusive",
"Must contain at least two field ids",
)

for field_id in mutually_exclusive:
if field_id not in field_ids:
result.add(
f"{prefix}.mutually_exclusive",
f"Unknown field '{field_id}'",
)

def _check_checksum(self, data: dict, result: ValidationResult) -> None:
checksum = data.get("checksum")
Expand Down
24 changes: 23 additions & 1 deletion plugins/ssh_runner/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@
"required": true,
"default": "uptime",
"help": "Command to execute on the remote host."
},
{
"id": "password",
"label": "Password",
"type": "string",
"required": false,
"help": "Password-based SSH authentication."
},
{
"id": "private_key",
"label": "Private Key",
"type": "string",
"required": false,
"help": "Path to SSH private key."
}
],
"presets": {
Expand All @@ -60,6 +74,14 @@
"command": "cat /var/log/auth.log | tail -n 20"
}
},
"validation": {
"authentication": {
"mutually_exclusive": [
"password",
"private_key"
]
}
},
"output": {
"format": "text",
"parser": "custom"
Expand Down Expand Up @@ -88,5 +110,5 @@
]
},
"docker_image": "alpine:latest",
"checksum": "6d4495f58635dedb184e763e0e4467adf17dc3754a6b1ed7c2dca0d6b30a88ec"
"checksum": "76f62464b0e9fb8e0322b52461dae8029053769f60da9b4ce8f73604b7048aaa"
}
69 changes: 69 additions & 0 deletions testing/backend/unit/test_plugin_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,72 @@ def test_missing_category_is_not_flagged(self, tmp_path):
cat_errors = [e for e in result.errors if e.path == "category"]
assert len(cat_errors) == 1
assert "Required" in cat_errors[0].message

def test_mutually_exclusive_fields_must_reference_existing_fields(self, tmp_path):
data = _minimal_valid()
data["fields"] = [
{
"id": "password",
"label": "Password",
"type": "text",
"help": "Password authentication",
},
{
"id": "private_key",
"label": "Private Key",
"type": "text",
"help": "SSH private key",
},
]
data["validation"] = {
"authentication": {
"mutually_exclusive": [
"password",
"private_key",
]
}
}

plugin_dir = _write_metadata(tmp_path, data)
result = validate_one_plugin(plugin_dir)

mutually_exclusive_errors = [
e
for e in result.errors
if "mutually_exclusive" in e.path
]

assert mutually_exclusive_errors == []

def test_mutually_exclusive_fields_unknown_field_is_rejected(self, tmp_path):
data = _minimal_valid()
data["fields"] = [
{
"id": "password",
"label": "Password",
"type": "text",
"help": "Password authentication",
},
]
data["validation"] = {
"authentication": {
"mutually_exclusive": [
"password",
"private_key",
]
}
}

plugin_dir = _write_metadata(tmp_path, data)
result = validate_one_plugin(plugin_dir)

assert not result.valid

mutually_exclusive_errors = [
e
for e in result.errors
if "mutually_exclusive" in e.path
]

assert len(mutually_exclusive_errors) == 1
assert "private_key" in mutually_exclusive_errors[0].message
Loading