Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions sentinel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ Integration with AgenTrust

Sentinel fills the documented gap: "no dedicated behavioral anomaly detection or agent quarantine tooling."

## Security

Sentinel fails closed. Two controls are configured by environment variable:

- **Trace verification gate.** Incoming traces are scored and enforced only
after their Ed25519 signature is verified against a trusted key supplied in
`TRACE_TRUSTED_JWK` (an OKP/Ed25519 public JWK as JSON). Unsigned traces,
bad signatures, or a missing trusted key are rejected. To run against
unsigned demo data, set `SENTINEL_ALLOW_UNVERIFIED=1` — this bypasses
verification and logs a loud warning on every use, and must not be used in
production.
- **Incident report signatures.** Exported incident reports are signed with
HMAC-SHA256 using the secret in `SENTINEL_SIGNING_KEY`. If the key is unset
the report is marked `"signature_status": "unsigned"` and carries no
signature (it is never emitted with a value that merely looks signed). The
`/verify` endpoint checks the keyed HMAC in constant time and returns
`UNVERIFIABLE` when no key is configured.

```bash
# Example: run the demo against unsigned sample data
SENTINEL_ALLOW_UNVERIFIED=1 python -m src.cli sample_trace.json --output report.json

# Production: verify traces and sign incidents
export TRACE_TRUSTED_JWK='{"kty":"OKP","crv":"Ed25519","x":"<base64url public key>"}'
export SENTINEL_SIGNING_KEY='<high-entropy secret>'
```

License

MIT
Expand Down
7 changes: 5 additions & 2 deletions sentinel/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ spec:
usage:
command: |
pip install -r requirements.txt
python -m src.cli sample_trace.json --output report.json
# Sentinel fails closed: traces are verified before scoring. For signed
# production traces set TRACE_TRUSTED_JWK. To run the unsigned demo trace,
# set SENTINEL_ALLOW_UNVERIFIED=1 (logs a loud warning, dev/demo only).
SENTINEL_ALLOW_UNVERIFIED=1 python -m src.cli sample_trace.json --output report.json
# For fleet evaluation:
# python -m src.cli fleet_trace.json --fleet --output fleet_report.json
# SENTINEL_ALLOW_UNVERIFIED=1 python -m src.cli fleet_trace.json --fleet --output fleet_report.json
evidence:
- type: manual
description: |
Expand Down
3 changes: 2 additions & 1 deletion sentinel/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ httpx==0.28.1
jinja2==3.1.4
pandas==2.2.3
numpy==1.26.4
scikit-learn==1.5.2
scikit-learn==1.5.2
cryptography==44.0.0
7 changes: 7 additions & 0 deletions sentinel/src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from src.trace_ingester import ingest_trace
from src.risk_engine import RiskEngine
from src.models import SentinelInput
from src.trace_verification import verify_trace, TraceVerificationError

@click.command()
@click.argument('trace_path', type=click.Path(exists=True))
Expand All @@ -15,6 +16,12 @@ def main(trace_path, output, fleet):

if fleet or "agents" in data:
# Fleet mode
# Verification gate: refuse to score/enforce on unverified trace input.
try:
for agent_data in data.get("agents", []):
verify_trace(agent_data)
except TraceVerificationError as e:
raise click.ClickException(f"Trace verification failed: {e}")
engine = RiskEngine()
inputs = []
for agent_data in data.get("agents", []):
Expand Down
1 change: 1 addition & 0 deletions sentinel/src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,6 @@ class IncidentReport(BaseModel):
evidence_export: Dict[str, Any]
receipt: Optional[Receipt] = None
signature: Optional[str] = None
signature_status: str = "unsigned" # "signed" or "unsigned" (fail closed)
claim_hash: Optional[str] = None
incident_hash: Optional[str] = None
73 changes: 58 additions & 15 deletions sentinel/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
)
from src.risk_engine import RiskEngine
from src.replay_engine import ReplayEngine
from src.signing import sign_payload, verify_payload, is_signing_configured, SigningKeyMissing
from src.trace_verification import verify_trace, TraceVerificationError
import traceback
import uuid
import json
import hashlib
import base64
import hmac
from datetime import datetime

app = FastAPI(title="Agent Sentinel")
Expand Down Expand Up @@ -44,11 +46,6 @@ def log_enforcement(action: str, claim_id: str, result: dict, status: str = "SUC
print(f"Result: {result.get('message', result)}")
print(f"Status: {status}\n")

def sign_payload(payload: dict) -> str:
data = json.dumps(payload, sort_keys=True).encode('utf-8')
hash_digest = hashlib.sha256(data).digest()
return base64.b64encode(hash_digest + b"signed").decode('utf-8')

def hash_payload(payload: dict) -> str:
data = json.dumps(payload, sort_keys=True).encode('utf-8')
return hashlib.sha256(data).hexdigest()
Expand All @@ -66,6 +63,16 @@ async def evaluate(request: Request):
if not agents_list:
return JSONResponse(content={"error": "No agents provided"}, status_code=400)

# Verification gate: refuse to score/enforce on unverified trace input.
try:
for agent_data in agents_list:
verify_trace(agent_data)
except TraceVerificationError as e:
return JSONResponse(
content={"error": f"Trace verification failed: {str(e)}"},
status_code=403,
)

inputs = []
for agent_data in agents_list:
inp = SentinelInput(
Expand Down Expand Up @@ -120,6 +127,14 @@ async def evaluate(request: Request):
}
return JSONResponse(content=serializable)
else:
# Verification gate: refuse to score/enforce on unverified trace input.
try:
verify_trace(data)
except TraceVerificationError as e:
return JSONResponse(
content={"error": f"Trace verification failed: {str(e)}"},
status_code=403,
)
try:
inp = SentinelInput(**data)
result = engine.evaluate(inp)
Expand Down Expand Up @@ -286,12 +301,25 @@ async def export_incident(claim_id: str, request: Request):
if claim_id in receipt_store:
report.receipt = receipt_store[claim_id]

# Generate hashes and signature for the report (without the signature and hash fields)
report_dict = report.model_dump(mode='json', exclude={'signature', 'claim_hash', 'incident_hash'})
# Generate hashes and a keyed signature for the report (excluding the
# signature / hash fields and the signature_status marker).
report_dict = report.model_dump(
mode='json',
exclude={'signature', 'claim_hash', 'incident_hash', 'signature_status'},
)
claim_data = {"claim_id": claim_id, "agent_id": agent_id, "detection_type": detection_type, "risk_score": risk_score}
report.claim_hash = hash_payload(claim_data)
report.incident_hash = hash_payload(report_dict)
report.signature = sign_payload(report_dict)

# Fail closed: only emit a signature when a signing key is configured.
# Otherwise mark the report explicitly unsigned rather than emitting a
# value that merely looks signed.
try:
report.signature = sign_payload(report_dict)
report.signature_status = "signed"
except SigningKeyMissing:
report.signature = None
report.signature_status = "unsigned"

return JSONResponse(content=report.model_dump(mode='json'))
except Exception as e:
Expand All @@ -318,20 +346,35 @@ async def verify_incident(claim_id: str, request: Request):
if not report_data:
return JSONResponse(content={"error": "Missing report data"}, status_code=400)

# Recompute hashes and signature from the report (excluding signature and hash fields)
report_copy = {k: v for k, v in report_data.items() if k not in ["signature", "claim_hash", "incident_hash"]}
# Fail closed: a keyed signature is required to verify. Without a
# configured signing key there is nothing to verify against.
if not is_signing_configured():
return JSONResponse(content={
"claim_id": claim_id,
"status": "UNVERIFIABLE",
"details": {
"reason": "No signing key configured (SENTINEL_SIGNING_KEY). "
"Cannot verify incident signatures.",
}
})

# Recompute integrity hashes; verify the keyed signature with HMAC.
report_copy = {
k: v for k, v in report_data.items()
if k not in ["signature", "claim_hash", "incident_hash", "signature_status"]
}
recomputed_claim_hash = hash_payload({
"claim_id": claim_id,
"agent_id": report_data.get("agent_id"),
"detection_type": report_data.get("detection_type"),
"risk_score": report_data.get("risk_score")
})
recomputed_incident_hash = hash_payload(report_copy)
recomputed_signature = sign_payload(report_copy)

valid_claim_hash = recomputed_claim_hash == report_data.get("claim_hash")
valid_incident_hash = recomputed_incident_hash == report_data.get("incident_hash")
valid_signature = recomputed_signature == report_data.get("signature")
valid_claim_hash = hmac.compare_digest(recomputed_claim_hash, report_data.get("claim_hash") or "")
valid_incident_hash = hmac.compare_digest(recomputed_incident_hash, report_data.get("incident_hash") or "")
# Keyed HMAC verification with constant-time comparison.
valid_signature = verify_payload(report_copy, report_data.get("signature") or "")

status = "VERIFIED" if (valid_claim_hash and valid_incident_hash and valid_signature) else "TAMPERED"
return JSONResponse(content={
Expand Down
60 changes: 60 additions & 0 deletions sentinel/src/signing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Keyed signing for Sentinel incident reports.

Incident reports are signed with HMAC-SHA256 using a secret loaded from the
``SENTINEL_SIGNING_KEY`` environment variable. Signing and verification both
FAIL CLOSED when the key is unset: ``sign_payload`` raises and ``verify_payload``
returns ``False`` rather than emitting or accepting a value that merely looks
signed.
"""

import hashlib
import hmac
import json
import os
from typing import Any, Dict

SIGNING_KEY_ENV = "SENTINEL_SIGNING_KEY"


class SigningKeyMissing(RuntimeError):
"""Raised when an incident must be signed but no signing key is configured."""


def _signing_key() -> bytes:
key = os.environ.get(SIGNING_KEY_ENV)
if not key:
raise SigningKeyMissing(
f"{SIGNING_KEY_ENV} is not set. Refusing to emit an incident "
"signature without a secret key (fail closed). Set "
f"{SIGNING_KEY_ENV} to a high-entropy secret to enable signing."
)
return key.encode("utf-8")


def _canonical_bytes(payload: Dict[str, Any]) -> bytes:
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")


def is_signing_configured() -> bool:
"""Return True if a signing key is configured."""
return bool(os.environ.get(SIGNING_KEY_ENV))


def sign_payload(payload: Dict[str, Any]) -> str:
"""Return a hex HMAC-SHA256 signature over the canonical JSON of *payload*.

Raises ``SigningKeyMissing`` if no signing key is configured.
"""
return hmac.new(_signing_key(), _canonical_bytes(payload), hashlib.sha256).hexdigest()


def verify_payload(payload: Dict[str, Any], signature: str) -> bool:
"""Return True iff *signature* is a valid HMAC for *payload* under the key.

Fails closed: returns False if no key is configured or *signature* is empty.
Uses a constant-time comparison.
"""
if not signature or not is_signing_configured():
return False
expected = sign_payload(payload)
return hmac.compare_digest(expected, signature)
Loading
Loading