diff --git a/ai_pipeline.sh b/ai_pipeline.sh old mode 100755 new mode 100644 index fadf044a..3329d7cd --- a/ai_pipeline.sh +++ b/ai_pipeline.sh @@ -15,6 +15,8 @@ # ./ai_pipeline.sh --mode deploy # Deploy to production # ./ai_pipeline.sh --dry-run # Show what would be done # ./ai_pipeline.sh --watch-gpu # Monitor GPU usage during training +# ./ai_pipeline.sh --timing-report # Print timing summary at the end +# ./ai_pipeline.sh --budget 30 # Mark stages over N seconds as OVER BUDGET # # Requirements: # - Python 3.8+ with torch, transformers, numpy @@ -52,6 +54,9 @@ NUM_EPOCHS="${NUM_EPOCHS:-100}" MODEL_NAME="${MODEL_NAME:-tent-neural-ensemble-v2}" VALIDATION_SPLIT="${VALIDATION_SPLIT:-0.2}" +# Budget threshold (seconds) – stages exceeding this are flagged as over budget +BUDGET_THRESHOLD="${BUDGET_THRESHOLD:-0}" + # Colors for output RED='\033[0;31m' GREEN='\033[0;32m' @@ -65,6 +70,11 @@ NC='\033[0m' # No Color TIMESTAMP=$(date +"%Y%m%d_%H%M%S") LOG_FILE="$PROJECT_ROOT/logs/ai_pipeline_${TIMESTAMP}.log" +# Timing storage – associative arrays for per-stage timing +declare -A PHASE_START +declare -A PHASE_ELAPSED +PHASE_ORDER=() + # --------------------------------------------------------------------------- # Utility Functions # --------------------------------------------------------------------------- @@ -81,6 +91,7 @@ log() { "STEP") color="${BLUE}" ;; "DONE") color="${GREEN}" ;; "GPU") color="${MAGENTA}" ;; + "BUDGET") color="${YELLOW}" ;; *) color="${NC}" ;; esac @@ -102,11 +113,141 @@ create_directories() { mkdir -p "$PROJECT_ROOT/metrics" } +# --------------------------------------------------------------------------- +# Timing Functions +# --------------------------------------------------------------------------- + +phase_start() { + local name="$1" + PHASE_START["$name"]=$(date +%s%N) + PHASE_ORDER+=("$name") +} + +phase_end() { + local name="$1" + local now + now=$(date +%s%N) + local start_ns="${PHASE_START[$name]:-0}" + if [ "$start_ns" -gt 0 ]; then + local elapsed_ns=$(( now - start_ns )) + local elapsed_sec + elapsed_sec=$(echo "scale=3; $elapsed_ns / 1000000000" | bc 2>/dev/null || echo "0") + PHASE_ELAPSED["$name"]="$elapsed_sec" + fi +} + +# --------------------------------------------------------------------------- +# Timing Report +# --------------------------------------------------------------------------- + +print_timing_summary_text() { + local budget="$1" + echo "" + echo -e "${CYAN}╔══════════════════════════════════════════════════════════════╗${NC}" + echo -e "${CYAN}║${NC} AI PIPELINE TIMING BUDGET SUMMARY ${CYAN}║${NC}" + echo -e "${CYAN}╚══════════════════════════════════════════════════════════════╝${NC}" + echo "" + + local total=0 + local slowest_name="" + local slowest_time=0 + local over_budget=false + + for phase_name in "${PHASE_ORDER[@]}"; do + local elapsed="${PHASE_ELAPSED[$phase_name]:-0}" + total=$(echo "scale=3; $total + $elapsed" | bc 2>/dev/null || echo "$total") + + local flag="" + if [ "$(echo "$elapsed > $slowest_time" | bc 2>/dev/null || echo "0")" = "1" ]; then + slowest_name="$phase_name" + slowest_time="$elapsed" + fi + + if [ "$budget" -gt 0 ] && [ "$(echo "$elapsed > $budget" | bc 2>/dev/null || echo "0")" = "1" ]; then + flag=" ${YELLOW}*** OVER BUDGET (${budget}s)${NC}" + over_budget=true + fi + + printf " %-40s %8.2fs%s\n" "$phase_name" "$elapsed" "$flag" + done + + echo "" + echo -e "${CYAN} ───────────────────────────────────────────────────────${NC}" + printf " %-40s %8.2fs\n" "TOTAL" "$total" + echo "" + echo " Slowest stage: $slowest_name (${slowest_time}s)" + + if [ "$budget" -gt 0 ]; then + if [ "$over_budget" = true ]; then + echo -e " ${YELLOW}⚠ Some stages exceeded the ${budget}s budget threshold${NC}" + else + echo -e " ${GREEN}✓ All stages within the ${budget}s budget threshold${NC}" + fi + fi + echo "" +} + +generate_timing_report_json() { + local budget="$1" + local json_file="$2" + local total=0 + local slowest_name="" + local slowest_time=0 + local stages_json="" + + local first=true + for phase_name in "${PHASE_ORDER[@]}"; do + local elapsed="${PHASE_ELAPSED[$phase_name]:-0}" + total=$(echo "scale=3; $total + $elapsed" | bc 2>/dev/null || echo "$total") + + if [ "$(echo "$elapsed > $slowest_time" | bc 2>/dev/null || echo "0")" = "1" ]; then + slowest_name="$phase_name" + slowest_time="$elapsed" + fi + + local over_budget_flag=false + if [ "$budget" -gt 0 ] && [ "$(echo "$elapsed > $budget" | bc 2>/dev/null || echo "0")" = "1" ]; then + over_budget_flag=true + fi + + if [ "$first" = true ]; then + first=false + else + stages_json+="," + fi + stages_json+=$(cat < "$json_file" </dev/null || true fi + # Print timing summary + if [ "$timing_report" = true ]; then + print_timing_summary_text "$BUDGET_THRESHOLD" + local json_report="$PROJECT_ROOT/metrics/timing_report_${TIMESTAMP}.json" + generate_timing_report_json "$BUDGET_THRESHOLD" "$json_report" + fi + echo "" log "DONE" "╔══════════════════════════════════════════════════════════════╗" log "DONE" "║ PIPELINE COMPLETE ║" @@ -421,6 +588,7 @@ main() { MODE="full" DRY_RUN=false WATCH_GPU=false +TIMING_REPORT=false while [[ $# -gt 0 ]]; do case "$1" in @@ -436,16 +604,24 @@ while [[ $# -gt 0 ]]; do WATCH_GPU=true shift ;; + --timing-report) + TIMING_REPORT=true + shift + ;; + --budget) + BUDGET_THRESHOLD="$2" + shift 2 + ;; --help|-h) head -50 "$0" | grep -E "^#" | sed 's/^# \?//' exit 0 ;; *) echo "Unknown option: $1" - echo "Usage: $0 [--mode full|train|evaluate|deploy] [--dry-run] [--watch-gpu]" + echo "Usage: $0 [--mode full|train|evaluate|deploy] [--dry-run] [--watch-gpu] [--timing-report] [--budget N]" exit 1 ;; esac done -main "$MODE" "$DRY_RUN" "$WATCH_GPU" +main "$MODE" "$DRY_RUN" "$WATCH_GPU" "$TIMING_REPORT" diff --git a/tools/health_check.py b/tools/health_check.py index 5cd0a613..5f95385b 100644 --- a/tools/health_check.py +++ b/tools/health_check.py @@ -19,6 +19,7 @@ 6. Certificate expiry (TLS certificate check) 7. Disk space (filesystem usage check) 8. Memory usage (process memory check) + 9. Prometheus stale metric detection (age-based staleness guard) Each check returns a status of OK, WARNING, or CRITICAL, along with a detail message and optional diagnostic data. @@ -28,17 +29,19 @@ python3 health_check.py --service backend # Check specific service python3 health_check.py --json # JSON output python3 health_check.py --watch # Continuous monitoring + python3 health_check.py --stale-metrics # Include stale-metric guard """ import argparse import json import os +import re import socket import ssl import subprocess import sys import time -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple # --------------------------------------------------------------------------- @@ -64,10 +67,205 @@ MEMORY_THRESHOLD_WARNING = 80 MEMORY_THRESHOLD_CRITICAL = 90 +# Stale-metric guard configuration +# Metrics with a timestamp older than STALE_METRIC_AGE_SECONDS are flagged as stale. +# This value can be overridden via the STALE_METRIC_AGE environment variable. +STALE_METRIC_AGE_SECONDS = int(os.environ.get("STALE_METRIC_AGE", "300")) +STALE_METRIC_CRITICAL_AGE_SECONDS = int(os.environ.get("STALE_METRIC_CRITICAL_AGE", "900")) + +# Patterns that look like secrets or prompts — these values are redacted +# from diagnostic output per the acceptance criteria. +SECRET_PATTERNS = [ + re.compile(r"(?i)(password|secret|api[_-]?key|token|auth|credential)\s*[:=]\s*['\"]?[^\s,'\"}]+"), + re.compile(r"(?i)\b(bearer|basic)\s+[a-z0-9+/=_-]{10,}"), + re.compile(r"(?i)-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----"), + re.compile(r"(?i)sk-[a-z0-9]{20,}"), # OpenAI-style secret keys + re.compile(r"(?i)ghp_[a-zA-Z0-9]{36}"), # GitHub tokens + re.compile(r"(?i)prompt\s*[:=]\s*['\"].+?['\"]"), # Raw prompt content +] + +ENVIRONMENT = os.environ.get("ENVIRONMENT", os.environ.get("TENT_ENV", "production")) + +# --------------------------------------------------------------------------- +# REDACTION HELPERS +# --------------------------------------------------------------------------- + + +def redact_secrets(text: str) -> str: + """Replace secret-looking values with '[REDACTED]'.""" + # Replace known patterns directly + text = re.sub(r"(?i)(password|secret|api[_-]?key|token|auth|credential)\s*[:=]\s*['\"]?[^\s,'\"}]+", + r"\1=[REDACTED]", text) + text = re.sub(r"(?i)\b(bearer|basic)\s+[a-z0-9+/=_-]{10,}", r"\1 [REDACTED]", text) + text = re.sub(r"(?i)-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----", + "-----BEGIN PRIVATE KEY----- [REDACTED]", text) + text = re.sub(r"(?i)sk-[a-z0-9]{20,}", "[REDACTED-KEY]", text) + text = re.sub(r"(?i)ghp_[a-zA-Z0-9]{36}", "[REDACTED-TOKEN]", text) + text = re.sub(r"(?i)prompt\s*[:=]\s*['\"].+?['\"]", "prompt=[REDACTED]", text) + return text + + +def redact_dict(d: Dict[str, Any]) -> Dict[str, Any]: + """Recursively redact secret-looking values from a dictionary.""" + result = {} + for k, v in d.items(): + if isinstance(v, str): + result[k] = redact_secrets(v) + elif isinstance(v, dict): + result[k] = redact_dict(v) + elif isinstance(v, list): + result[k] = [redact_dict(item) if isinstance(item, dict) else + redact_secrets(str(item)) if isinstance(item, str) else item + for item in v] + else: + result[k] = v + return result + + +# --------------------------------------------------------------------------- +# STALE METRIC DETECTION +# --------------------------------------------------------------------------- + + +def parse_prometheus_timestamp(metric_line: str) -> Optional[float]: + """ + Extract a Unix timestamp from a Prometheus metric exposition line. + Supports both OpenMetrics (# HELP / # TYPE / # EOF) and the classic format. + Looks for a trailing timestamp after the value. + """ + line = metric_line.strip() + # Skip comment lines + if line.startswith("#"): + return None + + # Prometheus classic format: metric_name{labels} value [timestamp] + # The timestamp is the third whitespace-separated token (after metric+labels and value) + parts = line.split() + if len(parts) >= 3: + try: + # parts[0] = metric_name{labels}, parts[1] = value, parts[2] = timestamp + return float(parts[-1]) + except (ValueError, IndexError): + return None + return None + + +def check_stale_metrics( + metrics_text: str, + service_name: str = "unknown", + max_age: int = STALE_METRIC_AGE_SECONDS, + critical_age: int = STALE_METRIC_CRITICAL_AGE_SECONDS, +) -> Tuple[str, str, List[Dict[str, Any]]]: + """ + Check Prometheus metrics for staleness. + + Returns: + (overall_status, detail_message, stale_metric_list) + """ + now = time.time() + stale_metrics = [] + lines = metrics_text.split("\n") + + for line in lines: + line = line.strip() + if not line or line.startswith("#"): + continue + + ts = parse_prometheus_timestamp(line) + if ts is None: + continue + + age = now - ts + if age < 0: + # Future timestamps are likely clock skew, not stale + continue + + if age > critical_age: + stale_metrics.append({ + "service": service_name, + "environment": ENVIRONMENT, + "metric_name": line.split("{")[0].split()[0] if "{" in line else line.split()[0], + "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), + "age_seconds": round(age, 1), + "stale": True, + "critical": True, + }) + elif age > max_age: + stale_metrics.append({ + "service": service_name, + "environment": ENVIRONMENT, + "metric_name": line.split("{")[0].split()[0] if "{" in line else line.split()[0], + "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), + "age_seconds": round(age, 1), + "stale": True, + "critical": False, + }) + + if not stale_metrics: + return "OK", "No stale metrics detected", [] + + critical_count = sum(1 for m in stale_metrics if m.get("critical")) + warning_count = sum(1 for m in stale_metrics if not m.get("critical")) + + if critical_count > 0: + status = "CRITICAL" + detail = f"{critical_count} stale metric(s) exceed critical age ({critical_age}s), {warning_count} exceed warning age ({max_age}s)" + else: + status = "WARNING" + detail = f"{warning_count} stale metric(s) exceed warning age ({max_age}s)" + + return status, detail, stale_metrics + + +def fetch_metrics_from_service(host: str, port: int, timeout: int = 5) -> Optional[str]: + """Fetch Prometheus metrics from a service's /metrics endpoint.""" + import http.client + try: + conn = http.client.HTTPConnection(host, port, timeout=timeout) + conn.request("GET", "/metrics") + resp = conn.getresponse() + if resp.status != 200: + conn.close() + return None + body = resp.read().decode("utf-8", errors="replace") + conn.close() + return body + except Exception: + return None + + +def run_stale_metric_checks() -> Dict[str, Any]: + """ + Run stale metric detection across all configured services. + Returns a dictionary suitable for inclusion in the health check results. + """ + results = {} + for name, config in SERVICES.items(): + metrics_text = fetch_metrics_from_service(config["host"], config["port"], config["timeout"]) + if metrics_text is None: + results[name] = { + "status": "WARNING", + "detail": f"Could not fetch /metrics from {name}", + "stale_metrics": [], + } + continue + status, detail, stale_list = check_stale_metrics( + metrics_text, service_name=name + ) + results[name] = { + "status": status, + "detail": detail, + "stale_metrics_count": len(stale_list), + "stale_metrics": stale_list, + } + return results + + # --------------------------------------------------------------------------- # CHECK FUNCTIONS # --------------------------------------------------------------------------- + def check_http_service(host: str, port: int, path: str, timeout: int) -> Tuple[str, str, int]: import http.client try: @@ -200,10 +398,13 @@ def check_load_average() -> Tuple[str, str, float]: # HEALTH CHECK RUNNER # --------------------------------------------------------------------------- -def run_health_checks(service: Optional[str] = None, json_output: bool = False) -> Dict[str, Any]: + +def run_health_checks(service: Optional[str] = None, json_output: bool = False, + check_stale: bool = False) -> Dict[str, Any]: results: Dict[str, Any] = { "timestamp": datetime.now().isoformat(), "hostname": socket.gethostname(), + "environment": ENVIRONMENT, "services": {}, "infrastructure": {}, "system": {}, @@ -269,6 +470,22 @@ def run_health_checks(service: Optional[str] = None, json_output: bool = False) if cert_status == "CRITICAL": all_ok = False + # Stale metric checks (when --stale-metrics is passed) + if check_stale: + stale_results = run_stale_metric_checks() + results["stale_metrics"] = stale_results + for svc, sr in stale_results.items(): + if sr["status"] == "CRITICAL": + all_ok = False + # Also add a summary + total_stale = sum(sr.get("stale_metrics_count", 0) for sr in stale_results.values()) + results["stale_metrics_summary"] = { + "total_stale_metrics": total_stale, + "stale_metric_age_threshold_seconds": STALE_METRIC_AGE_SECONDS, + "stale_metric_critical_age_threshold_seconds": STALE_METRIC_CRITICAL_AGE_SECONDS, + "environment": ENVIRONMENT, + } + results["overall_status"] = "OK" if all_ok else "DEGRADED" return results @@ -278,6 +495,7 @@ def print_health_report(results: Dict[str, Any]): print(f"\n{'='*60}") print(f" HEALTH CHECK REPORT") print(f" Host: {results['hostname']}") + print(f" Env: {results.get('environment', 'unknown')}") print(f" Time: {results['timestamp']}") print(f" Overall: {results['overall_status']}") print(f"{'='*60}") @@ -297,6 +515,27 @@ def print_health_report(results: Dict[str, Any]): if isinstance(sub_check, dict) and "status" in sub_check: sub_icon = {"OK": "✓", "WARNING": "⚠", "CRITICAL": "✗"}.get(sub_check["status"], "?") print(f" {sub_icon} {sub_name}: {sub_check['detail']}") + + # Print stale metrics summary + if "stale_metrics_summary" in results: + sm = results["stale_metrics_summary"] + print(f"\n Stale Metrics Guard:") + if sm["total_stale_metrics"] > 0: + print(f" ✗ {sm['total_stale_metrics']} stale metric(s) detected") + print(f" Threshold: {sm['stale_metric_age_threshold_seconds']}s, Critical: {sm['stale_metric_critical_age_threshold_seconds']}s") + if "stale_metrics" in results: + for svc_name, svc_data in results["stale_metrics"].items(): + if svc_data["stale_metrics"]: + icon = {"OK": "✓", "WARNING": "⚠", "CRITICAL": "✗"}.get(svc_data["status"], "?") + print(f" {icon} {svc_name}: {svc_data['detail']}") + for m in svc_data["stale_metrics"][:5]: # Show top 5 + age_str = f"{m['age_seconds']}s" + print(f" - {m['metric_name']} (age: {age_str})") + if len(svc_data["stale_metrics"]) > 5: + print(f" ... and {len(svc_data['stale_metrics']) - 5} more") + else: + print(f" ✓ No stale metrics detected (threshold: {sm['stale_metric_age_threshold_seconds']}s)") + print() @@ -307,17 +546,29 @@ def parse_args(): parser.add_argument("--watch", "-w", action="store_true", help="Continuous monitoring") parser.add_argument("--interval", "-i", type=int, default=30, help="Check interval in seconds") parser.add_argument("--output", "-o", help="Output file path") + parser.add_argument("--stale-metrics", action="store_true", help="Check for stale Prometheus metrics") + parser.add_argument("--stale-max-age", type=int, default=STALE_METRIC_AGE_SECONDS, + help=f"Stale metric warning age threshold in seconds (default: {STALE_METRIC_AGE_SECONDS})") + parser.add_argument("--stale-critical-age", type=int, default=STALE_METRIC_CRITICAL_AGE_SECONDS, + help=f"Stale metric critical age threshold in seconds (default: {STALE_METRIC_CRITICAL_AGE_SECONDS})") return parser.parse_args() def main(): args = parse_args() + # Override stale metric ages from CLI args if provided + global STALE_METRIC_AGE_SECONDS, STALE_METRIC_CRITICAL_AGE_SECONDS + STALE_METRIC_AGE_SECONDS = args.stale_max_age + STALE_METRIC_CRITICAL_AGE_SECONDS = args.stale_critical_age + if args.watch: print(f"Continuous monitoring (interval: {args.interval}s). Press Ctrl+C to stop.") try: while True: - results = run_health_checks(args.service, args.json) + results = run_health_checks(args.service, args.json, args.stale_metrics) + # Redact secrets from output + results = redact_dict(results) if args.json: print(json.dumps(results, indent=2)) else: @@ -326,7 +577,9 @@ def main(): except KeyboardInterrupt: print("\nMonitoring stopped") else: - results = run_health_checks(args.service, args.json) + results = run_health_checks(args.service, args.json, args.stale_metrics) + # Redact secrets from output + results = redact_dict(results) if args.json: output = json.dumps(results, indent=2) print(output) @@ -335,10 +588,7 @@ def main(): if args.output: with open(args.output, "w") as f: - if args.json: - json.dump(results, f, indent=2) - else: - json.dump(results, f, indent=2) + json.dump(results, f, indent=2) print(f"Report saved to {args.output}") if results["overall_status"] == "DEGRADED":