diff --git a/backend/secuscan/plugin_validator.py b/backend/secuscan/plugin_validator.py index ad6aefa7e..0a9fe955b 100644 --- a/backend/secuscan/plugin_validator.py +++ b/backend/secuscan/plugin_validator.py @@ -285,13 +285,49 @@ def _check_validation_block(self, data: dict, result: ValidationResult) -> None: result.add("validation", "Must be an object if present") return + field_ids = { + field.get("id") + for field in data.get("fields", []) + if isinstance(field, dict) + } + for key, rule in validation.items(): prefix = f"validation.{key}" + if not isinstance(rule, dict): result.add(prefix, "Each validation rule must be an object") continue + if "required" in rule and not isinstance(rule["required"], bool): - result.add(f"{prefix}.required", "'required' must be a boolean") + result.add( + f"{prefix}.required", + "'required' must be a boolean", + ) + + mutually_exclusive = rule.get("mutually_exclusive") + + if mutually_exclusive is None: + continue + + if not isinstance(mutually_exclusive, list): + result.add( + f"{prefix}.mutually_exclusive", + "'mutually_exclusive' must be a list", + ) + continue + + if len(mutually_exclusive) < 2: + result.add( + f"{prefix}.mutually_exclusive", + "Must contain at least two field ids", + ) + + for field_id in mutually_exclusive: + if field_id not in field_ids: + result.add( + f"{prefix}.mutually_exclusive", + f"Unknown field '{field_id}'", + ) def _check_checksum(self, data: dict, result: ValidationResult) -> None: checksum = data.get("checksum") diff --git a/plugins/ssh_runner/metadata.json b/plugins/ssh_runner/metadata.json index 6673d75ad..a0af53824 100644 --- a/plugins/ssh_runner/metadata.json +++ b/plugins/ssh_runner/metadata.json @@ -50,6 +50,20 @@ "required": true, "default": "uptime", "help": "Command to execute on the remote host." + }, + { + "id": "password", + "label": "Password", + "type": "string", + "required": false, + "help": "Password-based SSH authentication." + }, + { + "id": "private_key", + "label": "Private Key", + "type": "string", + "required": false, + "help": "Path to SSH private key." } ], "presets": { @@ -60,6 +74,14 @@ "command": "cat /var/log/auth.log | tail -n 20" } }, + "validation": { + "authentication": { + "mutually_exclusive": [ + "password", + "private_key" + ] + } + }, "output": { "format": "text", "parser": "custom" @@ -88,5 +110,5 @@ ] }, "docker_image": "alpine:latest", - "checksum": "6d4495f58635dedb184e763e0e4467adf17dc3754a6b1ed7c2dca0d6b30a88ec" + "checksum": "76f62464b0e9fb8e0322b52461dae8029053769f60da9b4ce8f73604b7048aaa" } diff --git a/testing/backend/unit/test_plugin_validator.py b/testing/backend/unit/test_plugin_validator.py index 0cdc69c27..d390c2079 100644 --- a/testing/backend/unit/test_plugin_validator.py +++ b/testing/backend/unit/test_plugin_validator.py @@ -600,3 +600,72 @@ def test_missing_category_is_not_flagged(self, tmp_path): cat_errors = [e for e in result.errors if e.path == "category"] assert len(cat_errors) == 1 assert "Required" in cat_errors[0].message + + def test_mutually_exclusive_fields_must_reference_existing_fields(self, tmp_path): + data = _minimal_valid() + data["fields"] = [ + { + "id": "password", + "label": "Password", + "type": "text", + "help": "Password authentication", + }, + { + "id": "private_key", + "label": "Private Key", + "type": "text", + "help": "SSH private key", + }, + ] + data["validation"] = { + "authentication": { + "mutually_exclusive": [ + "password", + "private_key", + ] + } + } + + plugin_dir = _write_metadata(tmp_path, data) + result = validate_one_plugin(plugin_dir) + + mutually_exclusive_errors = [ + e + for e in result.errors + if "mutually_exclusive" in e.path + ] + + assert mutually_exclusive_errors == [] + + def test_mutually_exclusive_fields_unknown_field_is_rejected(self, tmp_path): + data = _minimal_valid() + data["fields"] = [ + { + "id": "password", + "label": "Password", + "type": "text", + "help": "Password authentication", + }, + ] + data["validation"] = { + "authentication": { + "mutually_exclusive": [ + "password", + "private_key", + ] + } + } + + plugin_dir = _write_metadata(tmp_path, data) + result = validate_one_plugin(plugin_dir) + + assert not result.valid + + mutually_exclusive_errors = [ + e + for e in result.errors + if "mutually_exclusive" in e.path + ] + + assert len(mutually_exclusive_errors) == 1 + assert "private_key" in mutually_exclusive_errors[0].message