Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/compliance-engine-python-wheel.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Build Compliance Engine Python interface for Augmentation Engine

on:
pull_request:
paths:
- 'src/modules/complianceengine/src/lib/compliance_engine.py'
- 'src/modules/complianceengine/src/lib/pyproject.toml'
workflow_dispatch:

env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true

jobs:
build-wheel:
name: Build wheel
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4.2.2

- name: Set up Python
uses: actions/setup-python@v5.6.0
with:
python-version: '3.11'

- name: Install build
run: pip install build

- name: Build the package
working-directory: src/modules/complianceengine/src/lib
run: python -m build --wheel --outdir dist/

- name: Upload the package as an artifact
uses: actions/upload-artifact@v4.6.2
with:
name: compliance-engine-python-interface
path: src/modules/complianceengine/src/lib/dist/*.whl
if-no-files-found: error
retention-days: 90
2 changes: 1 addition & 1 deletion .github/workflows/formatting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
- name: Install pre-commit
run: pip install pre-commit
- name: Run pre-commit
run: pre-commit run --all-files
run: pre-commit run --all-files --show-diff-on-failure --verbose
3 changes: 2 additions & 1 deletion src/adapters/mc/complianceengine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ add_custom_target(stage_create_compliance_engine_shell_zip
COMMAND ${CMAKE_COMMAND} -E copy_if_different "${CMAKE_CURRENT_SOURCE_DIR}/ComplianceEngineShell.mof" ${PROJECT_BINARY_DIR}/StagingComplianceEngineShell
COMMAND ${CMAKE_COMMAND} -E copy_if_different $<TARGET_FILE:OsConfigResourceComplianceEngine> ${PROJECT_BINARY_DIR}/StagingComplianceEngineShell/Modules/DscNativeResources/OsConfigResource/libOsConfigResource.so
COMMAND ${CMAKE_COMMAND} -E copy_if_different $<TARGET_PROPERTY:complianceengineschema,BINARY_DIR>/$<TARGET_PROPERTY:complianceengineschema,OUTPUT> ${PROJECT_BINARY_DIR}/StagingComplianceEngineShell/
COMMAND ${CMAKE_COMMAND} -E copy_if_different $<TARGET_FILE:compliance-engine-assessor> ${PROJECT_BINARY_DIR}/StagingComplianceEngineShell/compliance-engine-assessor
${STAGING_COMMANDS}
DEPENDS OsConfigResourceComplianceEngine ComplianceEngineShell.mof complianceengineschema $<$<BOOL:${BUILD_TELEMETRY}>:telemetry-external>)
DEPENDS OsConfigResourceComplianceEngine ComplianceEngineShell.mof complianceengineschema compliance-engine-assessor $<$<BOOL:${BUILD_TELEMETRY}>:telemetry-external>)

add_custom_target(create_compliance_engine_shell_zip ALL
BYPRODUCTS ${OsConfigRootBinaryDir}/ComplianceEngineShell.zip
Expand Down
323 changes: 323 additions & 0 deletions src/modules/complianceengine/src/lib/GenInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,326 @@ def generate_json_schema(model: Model, basedir: str):

generate_global_json_schema(model, basedir)

def _cpp_type_to_python(type_str: str, enums: dict) -> str:
"""Map a C++ parameter type string to a Python type annotation string."""
# Strip Optional<...>
inner = type_str
if inner.startswith("Optional<") and inner.endswith(">"):
inner = inner[len("Optional<"):-1]

# Strip Separated<T, delim> → keep only the element type
if inner.startswith("Separated<") and inner.endswith(">"):
inner = inner[len("Separated<"):-1]
inner = inner.split(",", 1)[0].strip()

mapping = {
"std::string": "str",
"int": "int",
"bool": "bool",
"mode_t": "str", # octal string, e.g. "0644"
"regex": "str",
"Pattern": "str",
}
if inner in mapping:
return mapping[inner]
if inner in enums:
return inner
return "str"


def _is_separated(type_str: str) -> tuple:
"""Return (True, delimiter_char) if type_str is Separated<T, delim>, else (False, None)."""
inner = type_str
if inner.startswith("Optional<") and inner.endswith(">"):
inner = inner[len("Optional<"):-1]
if inner.startswith("Separated<") and inner.endswith(">"):
content = inner[len("Separated<"):-1]
parts = content.split(",", 1)
if len(parts) == 2:
delim = parts[1].strip()
# delim is e.g. '\'' followed by the char and '\''
# It may be written as '|' or ',' or ':'
raw = delim.strip("'\"")
if len(raw) == 1:
return True, raw
# handle escaped chars like '\'' (single-quote)
return True, raw[0]
return False, None


def generate_python_bindings(model: Model, filename: str):
"""Generate a standalone Python module with marshmallow-dataclass bindings for all procedures."""
with open(filename, "w", encoding="utf-8") as f:
f.write("# This file is auto-generated. Do not edit manually.\n")
f.write("# Python bindings for ComplianceEngine built-in procedures.\n")
f.write("# Requires: marshmallow-dataclass, marshmallow\n")
f.write("\n")
f.write("from __future__ import annotations\n")
f.write("\n")
f.write("import dataclasses\n")
f.write("import re\n")
f.write("from enum import Enum\n")
f.write("from typing import List, Optional, Union\n")
f.write("\n")
f.write("import marshmallow\n")
f.write("import marshmallow.fields as mf\n")
f.write("import marshmallow_dataclass\n")
f.write("\n")
f.write("\n")

# ------------------------------------------------------------------ #
# Shared helper types
# ------------------------------------------------------------------ #
f.write("# ---------------------------------------------------------------------------\n")
f.write("# Shared helper types\n")
f.write("# ---------------------------------------------------------------------------\n")
f.write("\n")

# mode_t: octal string, validated by marshmallow field
f.write("class OctalString(mf.String):\n")
f.write(' """Marshmallow field for octal permission strings, e.g. \'0644\'."""\n')
f.write("\n")
f.write(" def _validate(self, value):\n")
f.write(" super()._validate(value)\n")
f.write(" if not re.fullmatch(r'[0-7]{3,4}', value):\n")
f.write(' raise marshmallow.ValidationError(\n')
f.write(' f"Expected an octal permission string (3-4 octal digits), got {value!r}"\n')
f.write(" )\n")
f.write("\n")
f.write("\n")

# Pattern: string validated as regex
f.write("class PatternString(mf.String):\n")
f.write(' """Marshmallow field for regex-pattern strings."""\n')
f.write("\n")
f.write(" def _validate(self, value):\n")
f.write(" super()._validate(value)\n")
f.write(" try:\n")
f.write(" re.compile(value)\n")
f.write(" except re.error as exc:\n")
f.write(' raise marshmallow.ValidationError(\n')
f.write(' f"Invalid regex pattern {value!r}: {exc}"\n')
f.write(" ) from exc\n")
f.write("\n")
f.write("\n")

# Separated<T, delim> as a typed list that serialises/deserialises from a delimited string
f.write("class SeparatedField(mf.Field):\n")
f.write(' """Marshmallow field for a delimiter-separated list of strings."""\n')
f.write("\n")
f.write(" def __init__(self, delimiter: str = '|', *args, **kwargs):\n")
f.write(" super().__init__(*args, **kwargs)\n")
f.write(" self.delimiter = delimiter\n")
f.write("\n")
f.write(" def _serialize(self, value, attr, obj, **kwargs):\n")
f.write(" if value is None:\n")
f.write(" return None\n")
f.write(" if isinstance(value, list):\n")
f.write(" return self.delimiter.join(str(v) for v in value)\n")
f.write(" return str(value)\n")
f.write("\n")
f.write(" def _deserialize(self, value, attr, data, **kwargs):\n")
f.write(" if value is None:\n")
f.write(" return None\n")
f.write(" if not isinstance(value, str):\n")
f.write(' raise marshmallow.ValidationError("Expected a string")\n')
f.write(" return value.split(self.delimiter)\n")
f.write("\n")
f.write("\n")

# ------------------------------------------------------------------ #
# Enum classes
# ------------------------------------------------------------------ #
if model.enums:
f.write("# ---------------------------------------------------------------------------\n")
f.write("# Enums\n")
f.write("# ---------------------------------------------------------------------------\n")
f.write("\n")
for enum_name, enum_obj in sorted(model.enums.items()):
f.write(f"class {enum_name}(str, Enum):\n")
if enum_obj.description:
f.write(f' """{enum_obj.description}"""\n')
for cpp_member, json_label in enum_obj.mapping.items():
# Use the label as both the enum member name (sanitised) and value
py_member = re.sub(r"[^a-zA-Z0-9_]", "_", json_label).upper()
f.write(f' {py_member} = "{json_label}"\n')
f.write("\n")
f.write("\n")

# ------------------------------------------------------------------ #
# Procedure parameter dataclasses
# One class per procedure declaration, named after the procedure.
# Fields are taken from the params struct referenced in the declaration.
# Audits take priority when an audit and a remediation share the same name.
# ------------------------------------------------------------------ #
f.write("# ---------------------------------------------------------------------------\n")
f.write("# Procedure parameter dataclasses\n")
f.write("# ---------------------------------------------------------------------------\n")
f.write("\n")

# Collect unique procedure name → Parameters object, audits first
all_procs: dict = {} # proc_name → Parameters | None
for name, procedure in sorted(model.audits.items()):
params = model.parameters[procedure.params_name] if procedure.params_name else None
all_procs[name] = params
for name, procedure in sorted(model.remediations.items()):
if name not in all_procs:
params = model.parameters[procedure.params_name] if procedure.params_name else None
all_procs[name] = params

for proc_name, parameters in sorted(all_procs.items()):
f.write(f"@marshmallow_dataclass.dataclass\n")
f.write(f"class {proc_name}:\n")
f.write(f' """Parameters for the {proc_name} procedure."""\n')
f.write("\n")

if parameters is None:
# No parameters: marshmallow-dataclass needs at least a pass
f.write(" pass\n")
f.write("\n")
f.write("\n")
continue

for param in parameters.params:
py_type = _cpp_type_to_python(param.type, model.enums)
is_sep, delim = _is_separated(param.type)

# Determine the marshmallow field class override when needed
field_override = None
inner = param.type
if inner.startswith("Optional<") and inner.endswith(">"):
inner = inner[len("Optional<"):-1]
allow_none_arg = ", allow_none=True" if param.optional else ""
if inner == "mode_t":
field_override = f"OctalString({allow_none_arg.strip(', ')})" if param.optional else "OctalString()"
elif inner == "Pattern":
field_override = f"PatternString({allow_none_arg.strip(', ')})" if param.optional else "PatternString()"
elif inner in model.enums:
allow_none = "True" if param.optional else "False"
field_override = f"mf.Enum({inner}, by_value=True, allow_none={allow_none})"

if is_sep:
field_override = f"SeparatedField(delimiter={repr(delim)}{allow_none_arg})"
if param.optional:
annotation = f"Optional[List[str]]"
default_part = " = dataclasses.field(default=None)"
else:
annotation = "List[str]"
default_part = ""
else:
if param.optional:
annotation = f"Optional[{py_type}]"
default_part = " = dataclasses.field(default=None)"
else:
annotation = py_type
default_part = ""

if field_override or param.description:
meta_items = []
if field_override:
meta_items.append(f'"marshmallow_field": {field_override}')
meta_dict = "{" + ", ".join(meta_items) + "}" if meta_items else None
default_val = "None" if param.optional else "marshmallow.missing"
if meta_dict:
field_call = f"dataclasses.field(default={default_val}, metadata={meta_dict})"
else:
field_call = f"dataclasses.field(default={default_val})"
default_part = f" = {field_call}"

if param.description:
f.write(f" # {param.description}\n")
if param.pattern:
f.write(f" # pattern: {param.pattern}\n")
f.write(f" {param.name}: {annotation}{default_part}\n")
f.write("\n")

f.write("\n")

# Registry: procedure name → class (trivial since class name == procedure name)
f.write("_PROCEDURE_REGISTRY: dict = {\n")
for proc_name in sorted(all_procs.keys()):
f.write(f' "{proc_name}": {proc_name},\n')
f.write("}\n")
f.write("\n")

# ------------------------------------------------------------------ #
# Rule expression combinators
# ------------------------------------------------------------------ #
f.write("# ---------------------------------------------------------------------------\n")
f.write("# Rule expression combinators\n")
f.write("# ---------------------------------------------------------------------------\n")
f.write("\n")
f.write("\n")
f.write("@dataclasses.dataclass\n")
f.write("class LuaExpression:\n")
f.write(' \"\"\"Inline Lua script expression.\"\"\"\n')
f.write("\n")
f.write(" script: str\n")
f.write("\n")
f.write("\n")
f.write("@dataclasses.dataclass\n")
f.write("class AllOf:\n")
f.write(' \"\"\"Logical AND: all conditions must be compliant (short-circuit).\"\"\"\n')
f.write("\n")
f.write(' conditions: List["RuleExpression"]\n')
f.write("\n")
f.write("\n")
f.write("@dataclasses.dataclass\n")
f.write("class AnyOf:\n")
f.write(' \"\"\"Logical OR: at least one condition must be compliant (short-circuit).\"\"\"\n')
f.write("\n")
f.write(' conditions: List["RuleExpression"]\n')
f.write("\n")
f.write("\n")
f.write("@dataclasses.dataclass\n")
f.write("class Not:\n")
f.write(' \"\"\"Logical NOT: inverts the inner expression (audit-only, no remediation).\"\"\"\n')
f.write("\n")
f.write(' expression: "RuleExpression"\n')
f.write("\n")
f.write("\n")
f.write("# Union of all possible nodes in a rule expression tree\n")
f.write("RuleExpression = Union[AllOf, AnyOf, Not, LuaExpression]\n")
f.write("\n")
f.write("\n")
f.write("def serialize_expression(expr) -> dict:\n")
f.write(' \"\"\"Serialise a rule expression tree to a JSON-compatible dict.\n')
f.write(' Typed procedure params classes are serialised by class name and Schema.dump().\"\"\"\n')
f.write(" if isinstance(expr, AllOf):\n")
f.write(" return {\"allOf\": [serialize_expression(e) for e in expr.conditions]}\n")
f.write(" if isinstance(expr, AnyOf):\n")
f.write(" return {\"anyOf\": [serialize_expression(e) for e in expr.conditions]}\n")
f.write(" if isinstance(expr, Not):\n")
f.write(" return {\"not\": serialize_expression(expr.expression)}\n")
f.write(" if isinstance(expr, LuaExpression):\n")
f.write(" return {\"Lua\": {\"script\": expr.script}}\n")
f.write(" # Typed params class instance: serialise via its marshmallow Schema\n")
f.write(" cls = type(expr)\n")
f.write(" if not hasattr(cls, 'Schema'):\n")
f.write(" raise TypeError(f\"Cannot serialise {cls.__name__}: not a marshmallow dataclass\")\n")
f.write(" params = {k: v for k, v in cls.Schema().dump(expr).items() if v is not None}\n")
f.write(" return {cls.__name__: params}\n")
f.write("\n")
f.write("\n")
f.write("def deserialize_expression(data: dict) -> RuleExpression:\n")
f.write(' \"\"\"Deserialise a JSON-compatible dict into a rule expression tree.\"\"\"\n')
f.write(" if not isinstance(data, dict) or len(data) != 1:\n")
f.write(" raise ValueError(f\"Expected a dict with exactly one key, got: {data!r}\")\n")
f.write(" key, value = next(iter(data.items()))\n")
f.write(" if key == \"allOf\":\n")
f.write(" return AllOf(conditions=[deserialize_expression(e) for e in value])\n")
f.write(" if key == \"anyOf\":\n")
f.write(" return AnyOf(conditions=[deserialize_expression(e) for e in value])\n")
f.write(" if key == \"not\":\n")
f.write(" return Not(expression=deserialize_expression(value))\n")
f.write(" if key == \"Lua\":\n")
f.write(" return LuaExpression(script=value[\"script\"])\n")
f.write(" cls = _PROCEDURE_REGISTRY.get(key)\n")
f.write(" if cls is None:\n")
f.write(" raise ValueError(f\"Unknown procedure name: {key!r}\")\n")
f.write(" return cls.Schema().load(value)\n")


def main():
"""Main function to process a directory and print results."""
basedir = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -559,5 +879,8 @@ def main():
# Generate JSON schema files
generate_json_schema(model, basedir)

# Generate Python bindings module
generate_python_bindings(model, f"{basedir}/compliance_engine.py")

if __name__ == "__main__":
main()
Loading
Loading