diff --git a/.claude/skills/int-clarity/SKILL.md b/.claude/skills/int-clarity/SKILL.md new file mode 100644 index 00000000..ee79fa2d --- /dev/null +++ b/.claude/skills/int-clarity/SKILL.md @@ -0,0 +1,159 @@ +--- +name: int-clarity +description: "Query Microsoft Clarity Data Export API for session analytics, engagement, scroll depth, rage/dead clicks, and UTM attribution. Supports multiple projects via CLARITY_TOKEN_ env vars. Calls clarity.ms/export-data directly — no SDK or proxy dependency." +homepage: https://clarity.microsoft.com +metadata: + openclaw: + requires: + env: + - CLARITY_API_TOKEN + bins: + - python3 + primaryEnv: CLARITY_API_TOKEN + files: + - "scripts/*" +--- + +# Microsoft Clarity — Data Export API + +Acesse métricas de comportamento do Clarity diretamente via API, sem SDK. + +## Setup (uma vez) + +1. Acesse https://clarity.microsoft.com → Settings → Export Data. +2. Gere um token de projeto. +3. Adicione ao `.env`: + + **Projeto único:** + ``` + CLARITY_API_TOKEN=seu_token_aqui + ``` + + **Múltiplos projetos (padrão recomendado):** + ``` + CLARITY_TOKEN_ACME=token_acme + CLARITY_TOKEN_GLOBEX=token_globex + CLARITY_TOKEN_INITECH=token_initech + ``` + + > `--project ACME` seleciona `CLARITY_TOKEN_ACME`. + > Sem `--project`: usa `CLARITY_API_TOKEN` como fallback. + + **Base URL (opcional):** + ``` + CLARITY_API_BASE=https://www.clarity.ms/export-data/api/v1 + ``` + +--- + +## Comandos + +### Listar projetos configurados +```bash +python3 .claude/skills/int-clarity/scripts/clarity_export.py list-projects +``` + +### Métricas dos últimos 1-3 dias +```bash +# Visão geral (sem dimensão — retorna todos os metricNames) +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --project ACME --days 1 + +# Com dimensão: atribuição UTM +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --project ACME --days 3 \ + --dim1 Source --dim2 Medium --dim3 Campaign + +# Fricção por URL (Rage Clicks + Dead Clicks) +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --project GLOBEX --days 2 \ + --dim1 URL + +# Device breakdown +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --project INITECH --days 1 \ + --dim1 Device --dim2 Browser + +# Saída JSON bruta +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --project ACME --days 1 --json + +# Varrer todos os projetos (atenção: consome 1 req/projeto da cota de 10/dia) +python3 .claude/skills/int-clarity/scripts/clarity_export.py live-insights --days 1 --all +``` + +### Health check (smoke) +```bash +# Testa env + chamada real — sempre exit 0, sempre JSON +python3 .claude/skills/int-clarity/scripts/clarity_export.py smoke + +# Smoke para projeto específico +python3 .claude/skills/int-clarity/scripts/clarity_export.py smoke --project ACME +``` + +Exemplo de saída PASS: +```json +{ + "overall": "PASS", + "steps": [ + {"step": "env_present", "status": "PASS", "tokens_found": 5, "duration_ms": 0}, + {"step": "live_call", "status": "PASS", "project": "ACME", "metrics_returned": 16, "total_rows": 48, "duration_ms": 312} + ], + "duration_ms": 313 +} +``` + +--- + +## Dimensões disponíveis + +`Browser` · `Device` · `Country` · `OS` · `Source` · `Medium` · `Campaign` · `Channel` · `URL` + +> **Nota:** `Popular Pages` NÃO é uma dimensão válida — retorna body vazio (bug conhecido da Microsoft). Use `URL` para segmentar por página. + +## Métricas retornadas + +`Traffic` · `Engagement Time` · `Scroll Depth` · `Popular Pages` · `Browser` · `Device` · `OS` · `Country/Region` · `Page Title` · `Referrer URL` · `Dead Click Count` · `Excessive Scroll` · `Rage Click Count` · `Quickback Click` · `Script Error Count` · `Error Click Count` + +> **Nota sobre Traffic:** A documentação oficial da Microsoft lista o campo `distantUserCount` (typo na doc). A API real devolve `distinctUserCount` (com "distinct"). O cliente não normaliza nomes de campos — o que a API devolver é preservado tal como veio, sem renomear. + +--- + +## Casos de uso + +### Atribuição UTM +Entenda quais canais trazem sessões reais com engajamento: +```bash +live-insights --project

--days 3 --dim1 Source --dim2 Medium --dim3 Campaign +``` + +### Diagnóstico de fricção (UX) +Identifique páginas com alta taxa de Rage Click + Dead Click: +```bash +live-insights --project

--days 2 --dim1 URL +``` +Combine com `Scroll Depth` por `URL` para detectar abandono. + +### Separação bot vs humano +O campo `totalBotSessionCount` em Traffic indica sessões de bot filtradas. `distantUserCount` (typo da Microsoft, preservado) = usuários únicos aproximados. + +--- + +## Limitações + +| Limite | Valor | +|--------|-------| +| Janela máxima | 3 dias (numOfDays ∈ {1, 2, 3}) | +| Requisições por dia | 10 por projeto | +| Linhas por resposta | 1.000 (sem paginação) | +| Histórico | Nenhum — apenas últimas 24-72h | +| Fuso horário | UTC | +| Dimensões simultâneas | Máximo 3 (dim1, dim2, dim3) | + +> Tokens são por-projeto e têm longa duração (sem TTL documentado pela Microsoft). + +--- + +## Erros comuns + +| Código | Causa | Ação | +|--------|-------|------| +| 400 | Parâmetro inválido (days fora de {1,2,3}, dimensão inválida) | O script valida antes da chamada | +| 401 | Token inválido ou expirado | Regenere em Settings → Export Data | +| 403 | Sem permissão de Export Data no projeto | Ative nas configurações do projeto | +| 429 | Cota diária esgotada (10 req/dia) | Aguardar reset 00:00 UTC | diff --git a/.claude/skills/int-clarity/scripts/clarity_export.py b/.claude/skills/int-clarity/scripts/clarity_export.py new file mode 100644 index 00000000..c69d4c2a --- /dev/null +++ b/.claude/skills/int-clarity/scripts/clarity_export.py @@ -0,0 +1,514 @@ +#!/usr/bin/env python3 +""" +Microsoft Clarity Data Export API client. +Calls clarity.ms/export-data directly — no third-party SDK or proxy. +""" +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Bootstrap +# --------------------------------------------------------------------------- + +def _load_dotenv() -> None: + """Load .env from workspace root (4 levels above this file).""" + env_path = Path(__file__).resolve().parents[4] / ".env" + if not env_path.exists(): + return + with open(env_path) as fh: + for line in fh: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key, value = key.strip(), value.strip() + if key and key not in os.environ: + os.environ[key] = value + + +_load_dotenv() + +API_BASE = os.environ.get("CLARITY_API_BASE", "https://www.clarity.ms/export-data/api/v1") + +# Valid values for dimension parameters +VALID_DIMS = { + "Browser", "Device", "Country", "OS", + "Source", "Medium", "Campaign", "Channel", "URL", +} + +# HTTP status codes that warrant a single retry +RETRYABLE_CODES = {500, 502, 503, 504} + +# 429 means daily quota exhausted — retry won't help; handled separately +QUOTA_EXCEEDED_CODE = 429 + + +# --------------------------------------------------------------------------- +# Token resolution +# --------------------------------------------------------------------------- + +def _discover_projects() -> dict[str, str]: + """ + Return {project_name: token} for every CLARITY_TOKEN_ env var found. + Names are returned upper-cased as stored; callers normalise for matching. + """ + projects: dict[str, str] = {} + for key, val in os.environ.items(): + if key.startswith("CLARITY_TOKEN_") and val: + name = key[len("CLARITY_TOKEN_"):] + projects[name] = val + return projects + + +def _resolve_token(project: Optional[str]) -> tuple: + """ + Return (project_label, token). + + Resolution order: + 1. --project → look for CLARITY_TOKEN_ (case-insensitive). + 2. No --project → fall back to CLARITY_API_TOKEN if set. + 3. Otherwise → list available projects and exit(1). + """ + if project: + upper = project.upper() + token = os.environ.get(f"CLARITY_TOKEN_{upper}") + if token: + return upper, token + # Not found + available = list(_discover_projects().keys()) + print( + f"Erro: nenhum token encontrado para --project {project!r}.\n" + f"Projetos disponíveis: {available or ['(nenhum)']}\n" + "Configure CLARITY_TOKEN_ no .env.", + file=sys.stderr, + ) + sys.exit(1) + + # No --project: fallback to CLARITY_API_TOKEN + fallback = os.environ.get("CLARITY_API_TOKEN") + if fallback: + return "default", fallback + + available = list(_discover_projects().keys()) + if available: + print( + "Erro: especifique --project . Projetos disponíveis:\n " + + "\n ".join(available), + file=sys.stderr, + ) + else: + print( + "Erro: nenhum token Clarity configurado.\n" + "Configure CLARITY_API_TOKEN ou CLARITY_TOKEN_ no .env.\n" + "Obtenha o token em https://clarity.microsoft.com → Settings → Export Data.", + file=sys.stderr, + ) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Security helpers +# --------------------------------------------------------------------------- + +def _redact(msg: str, token: str) -> str: + """Replace token value with *** in any string — last-resort defence-in-depth.""" + return msg.replace(token, "***") if token else msg + + +# --------------------------------------------------------------------------- +# HTTP helper — ALWAYS returns dict +# --------------------------------------------------------------------------- + +def _clarity_request(token: str, path: str, params: Optional[dict] = None) -> dict: + """ + Execute a GET request against the Clarity Export API. + + Always returns a dict: + - On success: parsed JSON body. + - On HTTP error: {"_error": True, "status": , "message": }. + Never returns bytes or str — lint-http-types safe. + """ + url = f"{API_BASE}/{path.lstrip('/')}" + if params: + url += "?" + urllib.parse.urlencode( + {k: v for k, v in params.items() if v is not None} + ) + + # Build request with auth header — token NEVER appears in error messages below + req = urllib.request.Request( + url, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/json", + }, + method="GET", + ) + + def _do_request() -> dict: + try: + with urllib.request.urlopen(req, timeout=30) as resp: + raw = resp.read() + if not raw: + return {} + return json.loads(raw) # type: ignore[return-value] + except urllib.error.HTTPError as exc: + code = exc.code + # 429: quota exhausted — body may not be JSON + if code == QUOTA_EXCEEDED_CODE: + try: + body_text = exc.read().decode("utf-8", errors="replace") + except Exception: + body_text = "(unreadable body)" + # Trim body; run through _redact as defence-in-depth + return {"_error": True, "status": code, "message": _redact(f"Limite diário de requisições esgotado (429). Body: {body_text[:200]}", token)} + # Other HTTP errors — extract message from JSON body when available + try: + body = json.loads(exc.read()) + msg = body.get("message") or body.get("error") or f"HTTP {code}" + except Exception: + # Use only the HTTP status code; str(exc) may include request repr + msg = f"HTTP {code}" + return {"_error": True, "status": code, "message": _redact(msg, token)} + except urllib.error.URLError as exc: + # URLError.reason is a string or OSError — safe to stringify + reason = str(exc.reason) if hasattr(exc, "reason") else "network error" + return {"_error": True, "status": 0, "message": _redact(f"Network error: {reason}", token)} + except Exception as exc: + # Generic fallback — exc.args[0] could theoretically contain the token + raw = exc.args[0] if exc.args else "unknown error" + return {"_error": True, "status": 0, "message": _redact(f"{type(exc).__name__}: {raw}", token)} + + result = _do_request() + + # Single retry for transient server errors (5xx) — not for 429 (quota) + # Only applies when result is a dict with _error (never a successful list response) + if isinstance(result, dict) and result.get("_error") and result.get("status") in RETRYABLE_CODES: + time.sleep(2) + result = _do_request() + + return result + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +def _validate_days(days: int) -> None: + if days not in (1, 2, 3): + print( + f"Erro: --days deve ser 1, 2 ou 3 (recebido: {days}). " + "A API Clarity suporta apenas janelas de 1-3 dias.", + file=sys.stderr, + ) + sys.exit(1) + + +def _validate_dims(dims: list[str]) -> None: + for d in dims: + if d not in VALID_DIMS: + print( + f"Erro: dimensão inválida: {d!r}.\n" + f"Valores aceitos: {sorted(VALID_DIMS)}\n" + "Nota: 'Popular Pages' NÃO é uma dimensão válida neste endpoint " + "(retorna body vazio — bug conhecido da Microsoft).", + file=sys.stderr, + ) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Formatting +# --------------------------------------------------------------------------- + +def _print_table(rows: list[dict], title: str = "") -> None: + if not rows: + return + if title: + print(f"\n=== {title} ===") + keys = list(rows[0].keys()) + col_w = {k: max(len(k), max(len(str(r.get(k, ""))) for r in rows)) for k in keys} + sep = "+-" + "-+-".join("-" * col_w[k] for k in keys) + "-+" + header = "| " + " | ".join(k.ljust(col_w[k]) for k in keys) + " |" + print(sep) + print(header) + print(sep) + for row in rows: + print("| " + " | ".join(str(row.get(k, "")).ljust(col_w[k]) for k in keys) + " |") + print(sep) + + +def _print_insights(data: list[dict], as_json: bool) -> None: + if as_json: + print(json.dumps(data, indent=2, ensure_ascii=False)) + return + + if not data: + print("Sem dados retornados. Verifique se o projeto tem tráfego no período solicitado.") + return + + for metric in data: + metric_name = metric.get("metricName", "?") + information = metric.get("information") or [] + if not information: + print(f"\n[{metric_name}] — sem dados") + continue + _print_table(information, title=metric_name) + print(f" {len(information)} linha(s)") + + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +def cmd_live_insights(args: argparse.Namespace) -> None: + """Busca métricas do projeto no endpoint project-live-insights.""" + _validate_days(args.days) + + dims = [d for d in [args.dim1, args.dim2, args.dim3] if d] + if dims: + _validate_dims(dims) + + # Build params + params: dict = {"numOfDays": args.days} + if args.dim1: + params["dimension1"] = args.dim1 + if args.dim2: + params["dimension2"] = args.dim2 + if args.dim3: + params["dimension3"] = args.dim3 + + if args.all: + # Varrer todos os projetos (cuidado: 10 req/dia/projeto) + projects = _discover_projects() + single_token = os.environ.get("CLARITY_API_TOKEN") + if single_token: + projects["default"] = single_token + if not projects: + print("Nenhum projeto configurado.", file=sys.stderr) + sys.exit(1) + ok_count = 0 + for name, token in projects.items(): + print(f"\n{'='*60}\nProjeto: {name}\n{'='*60}") + result = _clarity_request(token, "project-live-insights", params) + if isinstance(result, dict) and result.get("_error"): + print(f"Erro {result['status']}: {result['message']}", file=sys.stderr) + continue + ok_count += 1 + _print_insights(result if isinstance(result, list) else [result], args.json) + # Exit 1 se TODOS falharam; 0 se ao menos um sucesso + if ok_count == 0: + sys.exit(1) + return + + label, token = _resolve_token(args.project) + result = _clarity_request(token, "project-live-insights", params) + + if isinstance(result, dict) and result.get("_error"): + print(f"Erro {result['status']}: {result['message']}", file=sys.stderr) + sys.exit(1) + + data = result if isinstance(result, list) else [] + if not data: + print( + "Body vazio. Possíveis causas:\n" + " 1. Dimensão inválida (ex: 'Popular Pages' não é dimensão válida).\n" + " 2. Projeto sem tráfego no período.\n" + " 3. Token sem permissão de Export Data.", + ) + return + + if not args.json: + print(f"Projeto: {label} | Últimos {args.days} dia(s)") + _print_insights(data, args.json) + + +def cmd_list_projects(args: argparse.Namespace) -> None: + """Lista projetos configurados (NÃO imprime tokens).""" + projects = _discover_projects() + single = os.environ.get("CLARITY_API_TOKEN") + if single: + projects["[CLARITY_API_TOKEN]"] = "(via CLARITY_API_TOKEN)" + + if not projects: + print("Nenhum projeto Clarity configurado.") + print("Configure CLARITY_API_TOKEN ou CLARITY_TOKEN_ no .env.") + return + + print(f"Projetos Clarity configurados ({len(projects)}):") + for name in sorted(projects): + # Nunca imprimir o token + print(f" • {name}") + + +def cmd_smoke(args: argparse.Namespace) -> None: + """ + E2E health check — SEMPRE exit 0, SEMPRE emite JSON. + Contrato: {overall, steps[], duration_ms}. + """ + t0 = time.monotonic() + + def _ms(since: float) -> int: + return round((time.monotonic() - since) * 1000) + + out: dict = {"overall": "PASS", "steps": []} + + try: + # Step 1: env_present — verifica se há algum token configurado + ts = time.monotonic() + projects = _discover_projects() + single = os.environ.get("CLARITY_API_TOKEN") + if not projects and not single: + out["steps"].append({ + "step": "env_present", + "status": "FAIL", + "error": "Nenhum token Clarity encontrado (CLARITY_API_TOKEN ou CLARITY_TOKEN_)", + "duration_ms": _ms(ts), + }) + out["overall"] = "FAIL" + out["duration_ms"] = _ms(t0) + print(json.dumps(out)) + return + + token_count = len(projects) + (1 if single else 0) + out["steps"].append({ + "step": "env_present", + "status": "PASS", + "tokens_found": token_count, + "duration_ms": _ms(ts), + }) + + # Step 2: auth / live call — usa --project se fornecido, senão o primeiro disponível + ts = time.monotonic() + if args.project: + label, token = _resolve_token(args.project) + elif projects: + label = next(iter(projects)) + token = projects[label] + else: + label = "default" + token = single # type: ignore[assignment] + + # Chamada real com days=1 (mínimo possível — menor custo de quota) + result = _clarity_request(token, "project-live-insights", {"numOfDays": 1}) + + if isinstance(result, dict) and result.get("_error"): + status_code = result.get("status", 0) + # 429 = quota esgotada mas auth está OK (token válido) + if status_code == QUOTA_EXCEEDED_CODE: + out["steps"].append({ + "step": "live_call", + "status": "WARN", + "note": "Token válido mas quota diária esgotada (429). Auth OK.", + "duration_ms": _ms(ts), + }) + # WARN não falha o overall + else: + out["steps"].append({ + "step": "live_call", + "status": "FAIL", + "error": f"HTTP {status_code}: {result.get('message', '')}", + "duration_ms": _ms(ts), + }) + out["overall"] = "FAIL" + else: + row_count = sum( + len(m.get("information") or []) + for m in (result if isinstance(result, list) else []) + ) + out["steps"].append({ + "step": "live_call", + "status": "PASS", + "project": label, + "metrics_returned": len(result) if isinstance(result, list) else 0, + "total_rows": row_count, + "duration_ms": _ms(ts), + }) + + except SystemExit: + # _resolve_token pode chamar sys.exit; captura para não vazar + out["steps"].append({ + "step": "live_call", + "status": "FAIL", + "error": "Token inválido ou projeto não encontrado (sys.exit capturado)", + "duration_ms": 0, + }) + out["overall"] = "FAIL" + except Exception as exc: + # Redact any token value that might appear in the exception message + _known_tokens = list(_discover_projects().values()) + if os.environ.get("CLARITY_API_TOKEN"): + _known_tokens.append(os.environ["CLARITY_API_TOKEN"]) + _raw_err = str(exc)[:300] + for _tok in _known_tokens: + if _tok: + _raw_err = _raw_err.replace(_tok, "***") + out["steps"].append({ + "step": "unknown", + "status": "FAIL", + "error": _raw_err, + "duration_ms": 0, + }) + out["overall"] = "FAIL" + + out["duration_ms"] = _ms(t0) + print(json.dumps(out)) + sys.exit(0) # SEMPRE exit 0 + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="Microsoft Clarity Data Export API client", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + sub = parser.add_subparsers(dest="command") + + # ── smoke ───────────────────────────────────────────────────────────── + smoke_p = sub.add_parser("smoke", help="Health check — sempre exit 0, emite JSON") + smoke_p.add_argument("--project", help="Nome do projeto (CLARITY_TOKEN_)") + + # ── list-projects ───────────────────────────────────────────────────── + sub.add_parser("list-projects", help="Lista projetos configurados (sem imprimir tokens)") + + # ── live-insights ───────────────────────────────────────────────────── + li_p = sub.add_parser("live-insights", help="Busca métricas do projeto (project-live-insights)") + li_p.add_argument("--project", help="Nome do projeto (CLARITY_TOKEN_); omitir usa CLARITY_API_TOKEN") + li_p.add_argument("--days", type=int, default=1, choices=[1, 2, 3], + help="Janela em dias: 1, 2 ou 3 (default: 1)") + li_p.add_argument("--dim1", metavar="DIM", + help=f"Dimensão 1. Aceitos: {', '.join(sorted(VALID_DIMS))}") + li_p.add_argument("--dim2", metavar="DIM", help="Dimensão 2") + li_p.add_argument("--dim3", metavar="DIM", help="Dimensão 3") + li_p.add_argument("--all", action="store_true", + help="Buscar todos os projetos configurados (cuidado: consome quota de cada um)") + li_p.add_argument("--json", action="store_true", help="Saída em JSON bruto") + + args = parser.parse_args() + + dispatch = { + "smoke": cmd_smoke, + "list-projects": cmd_list_projects, + "live-insights": cmd_live_insights, + } + + if args.command not in dispatch: + parser.print_help() + sys.exit(1) + + dispatch[args.command](args) + + +if __name__ == "__main__": + main() diff --git a/.claude/skills/int-clarity/tests/test_clarity.py b/.claude/skills/int-clarity/tests/test_clarity.py new file mode 100644 index 00000000..ab8ddd34 --- /dev/null +++ b/.claude/skills/int-clarity/tests/test_clarity.py @@ -0,0 +1,545 @@ +#!/usr/bin/env python3 +""" +Testes unitários para clarity_export.py. +Uso: python3 tests/test_clarity.py +""" +import json +import os +import sys +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +# Adiciona scripts/ ao path para importar o módulo +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "scripts")) + +# Garante que _load_dotenv não quebre mesmo sem .env +os.environ.setdefault("CLARITY_API_BASE", "https://www.clarity.ms/export-data/api/v1") + +import clarity_export as ce + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +TRAFFIC_RESPONSE = [ + { + "metricName": "Traffic", + "information": [ + { + "totalSessionCount": 1200, + "totalBotSessionCount": 30, + # The API returns whatever field name Microsoft sends — no normalisation. + # The MS docs show "distantUserCount" (typo) but the live API may return + # "distinctUserCount". We use a neutral key here to test passthrough, + # not to assert which spelling is correct. + "distinctUserCount": 450, + "pagesPerSessionPercentage": 2.3, + "Browser": "Chrome", + } + ], + }, + { + "metricName": "Rage Click Count", + "information": [{"URL": "/contato", "value": 5}], + }, +] + +EMPTY_RESPONSE: list = [] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_http_response(body: bytes, status: int = 200): + """Cria um mock de urllib.request.urlopen que retorna body.""" + mock_resp = MagicMock() + mock_resp.read.return_value = body + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + return mock_resp + + +def _make_http_error(code: int, body: bytes = b"error"): + import urllib.error + err = urllib.error.HTTPError( + url="http://x", code=code, msg="err", hdrs=None, fp=None # type: ignore + ) + err.read = lambda: body + return err + + +# --------------------------------------------------------------------------- +# TC-01: Validação days fora do range (1-3) +# --------------------------------------------------------------------------- + +class TestValidateDays(unittest.TestCase): + + def test_days_valid_1(self): + # Não deve levantar + ce._validate_days(1) + + def test_days_valid_3(self): + ce._validate_days(3) + + def test_days_invalid_0(self): + with self.assertRaises(SystemExit): + ce._validate_days(0) + + def test_days_invalid_4(self): + with self.assertRaises(SystemExit): + ce._validate_days(4) + + +# --------------------------------------------------------------------------- +# TC-02: Validação de dimensões +# --------------------------------------------------------------------------- + +class TestValidateDims(unittest.TestCase): + + def test_valid_dim_browser(self): + ce._validate_dims(["Browser"]) + + def test_valid_dims_multiple(self): + ce._validate_dims(["Browser", "Device", "Country"]) + + def test_invalid_dim_popular_pages(self): + with self.assertRaises(SystemExit): + ce._validate_dims(["Popular Pages"]) + + def test_invalid_dim_unknown(self): + with self.assertRaises(SystemExit): + ce._validate_dims(["InvalidDim"]) + + +# --------------------------------------------------------------------------- +# TC-03: Parse de resposta Traffic +# --------------------------------------------------------------------------- + +class TestParseTrafficResponse(unittest.TestCase): + + def test_traffic_core_fields_present(self): + """Os campos de sessão estáveis estão presentes.""" + traffic = [m for m in TRAFFIC_RESPONSE if m["metricName"] == "Traffic"] + self.assertEqual(len(traffic), 1) + info = traffic[0]["information"][0] + self.assertIn("totalSessionCount", info) + self.assertIn("totalBotSessionCount", info) + + def test_other_metric_generic(self): + rage = [m for m in TRAFFIC_RESPONSE if m["metricName"] == "Rage Click Count"] + self.assertEqual(len(rage), 1) + self.assertEqual(rage[0]["information"][0]["URL"], "/contato") + + def test_user_count_field_passthrough(self): + """ + O código nunca normalisa nomes de campos da API — o que vier é preservado. + A MS documenta 'distantUserCount' (typo) mas a API real devolve + 'distinctUserCount'. Testamos que o campo presente no fixture chega intacto, + sem afirmar qual ortografia é correta. + """ + info = TRAFFIC_RESPONSE[0]["information"][0] + # A fixture usa 'distinctUserCount'; o código passa o que a API devolver + present_keys = {k for k in info if "usercount" in k.lower() or "userCount" in k} + self.assertTrue( + len(present_keys) >= 1, + "Esperado ao menos um campo de contagem de usuários no response.", + ) + + +# --------------------------------------------------------------------------- +# TC-04: Descoberta multi-token +# --------------------------------------------------------------------------- + +class TestDiscoverProjects(unittest.TestCase): + + def test_discovers_multiple_tokens(self): + env = { + "CLARITY_TOKEN_ACME": "tok-acme", + "CLARITY_TOKEN_GLOBEX": "tok-globex", + "CLARITY_TOKEN_INITECH": "tok-initech", + } + with patch.dict(os.environ, env, clear=False): + projects = ce._discover_projects() + self.assertIn("ACME", projects) + self.assertIn("GLOBEX", projects) + self.assertIn("INITECH", projects) + + def test_no_tokens_returns_empty(self): + # Remove todas as CLARITY_TOKEN_* do env para este teste + filtered = {k: v for k, v in os.environ.items() if not k.startswith("CLARITY_TOKEN_")} + with patch.dict(os.environ, filtered, clear=True): + projects = ce._discover_projects() + self.assertEqual(projects, {}) + + def test_empty_value_ignored(self): + env = {"CLARITY_TOKEN_EMPTY": ""} + with patch.dict(os.environ, env, clear=False): + projects = ce._discover_projects() + self.assertNotIn("EMPTY", projects) + + +# --------------------------------------------------------------------------- +# TC-05: Seleção de --project +# --------------------------------------------------------------------------- + +class TestResolveToken(unittest.TestCase): + + def test_resolve_by_project_name(self): + with patch.dict(os.environ, {"CLARITY_TOKEN_ACME": "tok-acme"}, clear=False): + label, token = ce._resolve_token("ACME") + self.assertEqual(token, "tok-acme") + + def test_resolve_case_insensitive(self): + with patch.dict(os.environ, {"CLARITY_TOKEN_ACME": "tok-acme"}, clear=False): + label, token = ce._resolve_token("acme") + self.assertEqual(token, "tok-acme") + + def test_resolve_fallback_to_api_token(self): + env = {"CLARITY_API_TOKEN": "fallback-tok"} + # Remove CLARITY_TOKEN_* e CLARITY_API_TOKEN para controle limpo + filtered = {k: v for k, v in os.environ.items() + if not k.startswith("CLARITY_TOKEN_") and k != "CLARITY_API_TOKEN"} + filtered["CLARITY_API_TOKEN"] = "fallback-tok" + with patch.dict(os.environ, filtered, clear=True): + label, token = ce._resolve_token(None) + self.assertEqual(label, "default") + self.assertEqual(token, "fallback-tok") + + def test_resolve_unknown_project_exits(self): + filtered = {k: v for k, v in os.environ.items() + if not k.startswith("CLARITY_TOKEN_") and k != "CLARITY_API_TOKEN"} + with patch.dict(os.environ, filtered, clear=True): + with self.assertRaises(SystemExit): + ce._resolve_token("NONEXISTENT") + + def test_resolve_no_token_at_all_exits(self): + filtered = {k: v for k, v in os.environ.items() + if not k.startswith("CLARITY_TOKEN_") and k != "CLARITY_API_TOKEN"} + with patch.dict(os.environ, filtered, clear=True): + with self.assertRaises(SystemExit): + ce._resolve_token(None) + + +# --------------------------------------------------------------------------- +# TC-06: Smoke — exit 0 e JSON válido +# --------------------------------------------------------------------------- + +class TestSmokeCommand(unittest.TestCase): + + def _run_smoke(self, env_patch: dict, mock_response=None, project=None): + """Roda cmd_smoke capturando stdout e garantindo exit 0.""" + import io + from contextlib import redirect_stdout + + args = MagicMock() + args.project = project + + buf = io.StringIO() + with patch.dict(os.environ, env_patch, clear=True): + if mock_response is not None: + mock_resp = _make_http_response(json.dumps(mock_response).encode()) + with patch("urllib.request.urlopen", return_value=mock_resp): + with redirect_stdout(buf): + try: + ce.cmd_smoke(args) + except SystemExit as e: + if e.code != 0: + raise + else: + with redirect_stdout(buf): + try: + ce.cmd_smoke(args) + except SystemExit as e: + if e.code != 0: + raise + + output = buf.getvalue().strip() + data = json.loads(output) + return data + + def test_smoke_no_token_overall_fail(self): + """Sem nenhum token → overall FAIL, mas exit 0.""" + # Roda direto sem mock de urlopen (não chega a fazer chamada) + import io + from contextlib import redirect_stdout + + args = MagicMock() + args.project = None + + filtered = {k: v for k, v in os.environ.items() + if not k.startswith("CLARITY_TOKEN_") and k != "CLARITY_API_TOKEN"} + buf = io.StringIO() + with patch.dict(os.environ, filtered, clear=True): + with redirect_stdout(buf): + try: + ce.cmd_smoke(args) + except SystemExit as e: + if e.code != 0: + raise + + data = json.loads(buf.getvalue().strip()) + self.assertEqual(data["overall"], "FAIL") + self.assertIn("duration_ms", data) + self.assertIsInstance(data["steps"], list) + + def test_smoke_with_token_pass(self): + """Token presente + resposta válida → overall PASS.""" + env = {"CLARITY_TOKEN_TEST": "tok-test", "CLARITY_API_BASE": ce.API_BASE} + data = self._run_smoke(env, mock_response=TRAFFIC_RESPONSE, project="TEST") + self.assertEqual(data["overall"], "PASS") + self.assertIn("duration_ms", data) + steps_names = [s["step"] for s in data["steps"]] + self.assertIn("env_present", steps_names) + self.assertIn("live_call", steps_names) + + def test_smoke_json_structure(self): + """Saída JSON tem os campos obrigatórios.""" + env = {"CLARITY_TOKEN_TEST": "tok-test", "CLARITY_API_BASE": ce.API_BASE} + data = self._run_smoke(env, mock_response=TRAFFIC_RESPONSE, project="TEST") + self.assertIn("overall", data) + self.assertIn("steps", data) + self.assertIn("duration_ms", data) + for step in data["steps"]: + self.assertIn("step", step) + self.assertIn("status", step) + self.assertIn("duration_ms", step) + + +# --------------------------------------------------------------------------- +# TC-07: Tratamento de 429 não-JSON +# --------------------------------------------------------------------------- + +class TestHttp429Handling(unittest.TestCase): + + def test_429_non_json_body_returns_dict(self): + """429 com body não-JSON deve retornar dict com _error, não quebrar.""" + import urllib.error + + err = urllib.error.HTTPError( + url="http://x", code=429, msg="Too Many Requests", hdrs=None, fp=None # type: ignore + ) + err.read = lambda: b"Rate limit exceeded (plain text, not JSON)" + + with patch("urllib.request.urlopen", side_effect=err): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + + self.assertIsInstance(result, dict) + self.assertTrue(result.get("_error")) + self.assertEqual(result.get("status"), 429) + self.assertIn("429", result.get("message", "")) + + def test_429_does_not_retry(self): + """429 não deve disparar retry (sleep não chamado).""" + import urllib.error + + err = urllib.error.HTTPError( + url="http://x", code=429, msg="Too Many Requests", hdrs=None, fp=None # type: ignore + ) + err.read = lambda: b"quota" + + call_count = 0 + + def _mock_urlopen(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise err + + with patch("urllib.request.urlopen", side_effect=_mock_urlopen): + with patch("time.sleep") as mock_sleep: + ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + mock_sleep.assert_not_called() + + self.assertEqual(call_count, 1) # só 1 tentativa + + +# --------------------------------------------------------------------------- +# TC-08: Body vazio +# --------------------------------------------------------------------------- + +class TestEmptyBody(unittest.TestCase): + + def test_empty_list_response(self): + """Resposta [] não deve quebrar — indica sem dados.""" + mock_resp = _make_http_response(b"[]") + with patch("urllib.request.urlopen", return_value=mock_resp): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertEqual(result, []) + + def test_empty_bytes_response(self): + """Body vazio (b'') deve retornar {}.""" + mock_resp = _make_http_response(b"") + with patch("urllib.request.urlopen", return_value=mock_resp): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertEqual(result, {}) + + def test_empty_list_not_error(self): + """[] não tem _error.""" + mock_resp = _make_http_response(b"[]") + with patch("urllib.request.urlopen", return_value=mock_resp): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertFalse(result.get("_error", False) if isinstance(result, dict) else False) + + +# --------------------------------------------------------------------------- +# TC-09: HTTP helper sempre retorna dict (lint-safety) +# --------------------------------------------------------------------------- + +class TestHttpHelperReturnType(unittest.TestCase): + + def test_success_returns_list_or_dict(self): + """Em sucesso, retorna list (que é JSON-parsed) ou dict — nunca bytes/str.""" + mock_resp = _make_http_response(json.dumps(TRAFFIC_RESPONSE).encode()) + with patch("urllib.request.urlopen", return_value=mock_resp): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertNotIsInstance(result, (bytes, str)) + + def test_error_returns_dict(self): + """Em erro HTTP, retorna dict com _error.""" + err = _make_http_error(401, b'{"message": "Unauthorized"}') + with patch("urllib.request.urlopen", side_effect=err): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertIsInstance(result, dict) + self.assertTrue(result.get("_error")) + + def test_network_error_returns_dict(self): + """Em erro de rede, retorna dict com _error.""" + with patch("urllib.request.urlopen", side_effect=ConnectionResetError("reset")): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + self.assertIsInstance(result, dict) + self.assertTrue(result.get("_error")) + + def test_500_retries_once(self): + """500 dispara 1 retry.""" + call_count = 0 + err = _make_http_error(500, b'{"message": "Server Error"}') + + def _mock_urlopen(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise err + + with patch("urllib.request.urlopen", side_effect=_mock_urlopen): + with patch("time.sleep"): + result = ce._clarity_request("tok", "project-live-insights", {"numOfDays": 1}) + + self.assertEqual(call_count, 2) # 1 original + 1 retry + self.assertIsInstance(result, dict) + self.assertTrue(result.get("_error")) + + +# --------------------------------------------------------------------------- +# TC-10: Token nunca vaza (stdout + stderr + mensagem de erro) +# --------------------------------------------------------------------------- + +class TestTokenNotLeaked(unittest.TestCase): + + SECRET = "eyJsZWFrZWQ6dHJ1ZX0.super-secret-clarity-jwt" + + def test_list_projects_does_not_print_token(self): + """cmd list-projects nunca deve imprimir valores de tokens.""" + import io + from contextlib import redirect_stdout + + args = MagicMock() + env = { + "CLARITY_TOKEN_MYPROJEKT": self.SECRET, + "CLARITY_API_BASE": ce.API_BASE, + } + buf = io.StringIO() + with patch.dict(os.environ, env, clear=False): + with redirect_stdout(buf): + ce.cmd_list_projects(args) + + output = buf.getvalue() + self.assertNotIn(self.SECRET, output) + self.assertIn("MYPROJEKT", output) + + def test_token_not_in_http_401_error_message(self): + """ + H1: Em 401 Unauthorized o token NÃO deve aparecer na mensagem de erro + retornada pelo _clarity_request — nem em stdout nem no dict de erro. + """ + import io + from contextlib import redirect_stderr, redirect_stdout + + err = _make_http_error(401, b'{"message": "Unauthorized"}') + + stdout_buf = io.StringIO() + stderr_buf = io.StringIO() + + with patch("urllib.request.urlopen", side_effect=err): + with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): + result = ce._clarity_request(self.SECRET, "project-live-insights", {"numOfDays": 1}) + + # O dict de retorno não deve conter o token + result_str = json.dumps(result) + self.assertNotIn(self.SECRET, result_str, "Token vazou no dict de erro") + self.assertNotIn(self.SECRET, stdout_buf.getvalue(), "Token vazou no stdout") + self.assertNotIn(self.SECRET, stderr_buf.getvalue(), "Token vazou no stderr") + + # Deve ser um dict de erro bem-formado + self.assertTrue(result.get("_error")) + self.assertEqual(result.get("status"), 401) + + def test_token_not_in_network_error_message(self): + """ + H1: Em erro de rede genérico o token NÃO deve aparecer na mensagem. + O teste é propositalmente adversarial: a exceção carrega o token no args[0] + (simulando HDR-STRINGIFY ou REQUEST-REPR que incluem o Bearer header). + FALHA com código sem _redact; PASSA após _redact aplicado. + """ + import io + from contextlib import redirect_stderr, redirect_stdout + + # Adversarial: a exception carrega o token literal no args[0] + class _LeakyError(Exception): + pass + + stdout_buf = io.StringIO() + stderr_buf = io.StringIO() + + # exc.args[0] contém o token — prova que _redact() na fonte é necessário + with patch("urllib.request.urlopen", side_effect=_LeakyError(f"Bearer {self.SECRET} leaked via boom")): + with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): + result = ce._clarity_request(self.SECRET, "project-live-insights", {"numOfDays": 1}) + + result_str = json.dumps(result) + self.assertNotIn(self.SECRET, result_str, "Token vazou no dict de erro genérico (_redact ausente ou incompleto)") + self.assertNotIn(self.SECRET, stdout_buf.getvalue()) + self.assertNotIn(self.SECRET, stderr_buf.getvalue()) + # Confirma que a mensagem de erro usa o placeholder + self.assertIn("***", result.get("message", ""), "Esperado '***' como substituto do token") + + def test_token_not_in_smoke_output_on_error(self): + """ + H1: cmd_smoke nunca inclui o token no JSON de saída, mesmo em falha 403. + """ + import io + from contextlib import redirect_stdout + + err = _make_http_error(403, b'{"message": "Forbidden"}') + env = {"CLARITY_TOKEN_SECTEST": self.SECRET, "CLARITY_API_BASE": ce.API_BASE} + args = MagicMock() + args.project = "SECTEST" + + buf = io.StringIO() + with patch.dict(os.environ, env, clear=True): + with patch("urllib.request.urlopen", side_effect=err): + with redirect_stdout(buf): + try: + ce.cmd_smoke(args) + except SystemExit: + pass + + output = buf.getvalue() + self.assertNotIn(self.SECRET, output, "Token vazou no JSON do smoke") + # Verifica que o output é JSON válido + data = json.loads(output.strip()) + self.assertIn("overall", data) + + +if __name__ == "__main__": + unittest.main(verbosity=2)