From 44d1976838634f2fcaee3a4e089815a35a0c277f Mon Sep 17 00:00:00 2001 From: iscarelli Date: Wed, 10 Jun 2026 18:24:52 -0300 Subject: [PATCH] =?UTF-8?q?feat(security):=20RBAC=20de=20escrita,=20revoga?= =?UTF-8?q?=C3=A7=C3=A3o=20de=20sess=C3=A3o,=20chave=20de=20API=20fora=20d?= =?UTF-8?q?o=20DOM=20(v1.36.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Endurecimento a partir de um teste externo (caixa-preta). Achados reais corrigidos, cada um com teste automatizado (26 novos; 150 no total, sem regressão): - #2 RBAC: novo @write_required enforça papel no servidor nas rotas de mutação de inventário (viewer = 403 no GET-form e POST); botões somem na UI via can_write. - #3 Revogação de sessão (CWE-613): session_token por usuário revalidado a cada request; logout e troca de senha rotacionam o token, invalidando cookies antigos. - #4 Chave de API fora do DOM (CWE-200): HTML só com versão mascarada; cleartext buscado sob demanda em endpoint admin (no-store). - #5 HTTPException não vira mais 500: handler repassa HTTPException + página 405. - #9 /.well-known/security.txt (RFC 9116). Avaliados e mantidos: throttle de login por IP e msg de erro genérica já existiam (#1); rotas admin já exigem @admin_required (#8). Sem lockout por conta (DoS no admin conhecido) — 2FA opt-in é o controle forte. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 41 +++++++ VERSION | 2 +- app.py | 72 +++++++++++ database.py | 33 +++++ routes/account.py | 3 + routes/admin.py | 4 + routes/filaments.py | 10 +- routes/integrations.py | 22 +++- routes/main.py | 15 ++- routes/spool_models.py | 8 +- routes/spools.py | 12 +- templates/admin/integrations.html | 41 +++++-- templates/base.html | 2 + templates/dashboard.html | 2 + templates/filaments/detail.html | 4 +- templates/filaments/list.html | 4 + templates/reports/low_stock.html | 2 + templates/spool_models/list.html | 4 + templates/spools/detail.html | 4 +- templates/spools/list.html | 6 + tests/conftest.py | 19 +++ tests/test_security_rbac.py | 193 ++++++++++++++++++++++++++++++ translations.py | 2 + 23 files changed, 475 insertions(+), 30 deletions(-) create mode 100644 tests/test_security_rbac.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f23f7a..d39bdcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,47 @@ Versioning follows [SemVer](https://semver.org/): **MAJOR.MINOR.PATCH** --- +## [1.36.0] — 2026-06-10 + +Endurecimento de segurança a partir de um teste externo (caixa-preta). Vários +achados do relatório já estavam cobertos pelo código (throttle de login por IP, +mensagem de erro genérica, guarda anti-SSRF e anti open-redirect); o que segue são +as lacunas reais corrigidas, cada uma com teste automatizado. + +### Security +- **Controle de acesso por papel nas rotas de escrita (RBAC).** O papel `viewer` + é rotulado "somente leitura", mas as rotas de mutação de inventário + (`/spools/new` e `/edit`/`/weigh`/`/deactivate`, `/weigh`, `/filaments/*`, + `/spool-models/*`) eram protegidas apenas por `@login_required` — um viewer + conseguia criar/editar/excluir/pesar. Novo decorator `@write_required` enforça o + papel **no servidor** (403 para viewer, GET do formulário e POST). Os botões de + escrita também somem na UI para viewer (`can_write`), como defesa em profundidade. +- **Revogação de sessão server-side (CWE-613).** O cookie de sessão do Flask é + *stateless*: `logout` só apagava o cookie do navegador, então um cookie capturado + seguia válido até expirar (até ~30 dias com "manter conectado"); trocar a senha + também não derrubava sessões antigas. Cada usuário agora tem um `session_token` + no banco, revalidado a cada requisição autenticada; **logout e troca de senha + rotacionam o token**, invalidando na hora qualquer cookie antigo (a troca de senha + derruba as *outras* sessões e mantém a atual). Reset de senha por admin derruba a + sessão do alvo. Cookies anteriores a esta versão caem uma vez e o usuário reloga. +- **Chave de API não trafega mais no DOM (CWE-200).** A página Admin → Integrações + embutia a chave inteira num atributo `data-key` (visível em *view-source*). Agora + o HTML só recebe a versão mascarada; o valor em claro é buscado **sob demanda** + por um endpoint admin (`/admin/integrations//key`, resposta `no-store`) ao + clicar em Revelar/Copiar. +- **Exceções de roteamento não viram mais 500.** O `errorhandler(Exception)` + capturava `HTTPException` e a transformava em 500 — um GET em `/logout` + (POST-only) ou um POST em `/spool-models` (GET-only) retornava 500 em vez de 405. + O handler agora repassa `HTTPException` e há uma página 405 dedicada. +- **`/.well-known/security.txt`** (RFC 9116) com canal de divulgação responsável. + +### Notas +- Achados de severidade baixa do relatório foram avaliados e mantidos como estão + (versão no rodapé / IDs sequenciais — atrás de auth e mitigados pelo RBAC; rotas + admin potentes já exigem `@admin_required`). Brute-force: o throttle por IP + (10 falhas / 15 min) já existia; não foi adicionado *lockout por conta* para não + abrir um DoS contra o usuário `admin` conhecido — o 2FA opt-in é o controle forte. + ## [1.35.2] — 2026-06-10 ### Changed diff --git a/VERSION b/VERSION index 0035f2a..39fc130 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.35.2 +1.36.0 diff --git a/app.py b/app.py index 774654b..8004753 100644 --- a/app.py +++ b/app.py @@ -18,6 +18,7 @@ ) from markupsafe import Markup, escape from werkzeug.security import generate_password_hash, check_password_hash +from werkzeug.exceptions import HTTPException from werkzeug.middleware.proxy_fix import ProxyFix from flask_wtf import CSRFProtect from flask_wtf.csrf import CSRFError @@ -194,6 +195,27 @@ def _req_start(): g._t0 = time.perf_counter() +@app.before_request +def _validate_session(): + """Revogação de sessão server-side (CWE-613). O cookie de sessão do Flask é + *stateless* (assinado), então `session.clear()` no logout só apaga o cookie do + navegador — um cookie capturado continuaria válido até expirar. Aqui amarramos + cada sessão a um `session_token` guardado no usuário: o login grava o token na + sessão e toda requisição autenticada revalida contra o banco. Logout e troca de + senha ROTACIONAM o token, invalidando imediatamente quaisquer cookies antigos. + + Cookies legados (anteriores a esta versão, sem `auth_token`) não batem com o + token recém-semeado → a sessão é encerrada uma única vez e o usuário reloga.""" + if request.endpoint in ("static", "health") or "user_id" not in session: + return + stored = db.get_session_token(session["user_id"]) + if not stored or session.get("auth_token") != stored: + session.clear() + if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": + return jsonify(error="Unauthorized"), 401 + return redirect(url_for("login")) + + # Endpoints liberados mesmo com troca de senha pendente (o próprio formulário, # trocar idioma, sair, assets e o health-check). _PWCHANGE_ALLOWED = {"account_password", "logout", "set_lang", "static", "health"} @@ -587,6 +609,9 @@ def inject_globals(): count = db.queue_count() if "user_id" in session else 0 lang = session.get("lang", "pt") is_admin = session.get("role") == "admin" + # Espelha o gate server-side (write_required/WRITE_ROLES) na UI: esconde botões de + # escrita p/ `viewer`. É só defesa em profundidade — quem barra de verdade é o servidor. + can_write = session.get("role") in WRITE_ROLES # Alerta de backup (só admin): última rotação diária falhou ou a cópia externa falhou. backup_alert = bool(is_admin and ( db.get_setting("backup_last_result", "") == "error" @@ -598,6 +623,7 @@ def inject_globals(): "lang": lang, "_": i18n.get_translator(lang), "update_available": is_update_available() if is_admin else False, + "can_write": can_write, "backup_alert": backup_alert, "nonce": getattr(g, "_nonce", ""), "demo_mode": DEMO_MODE, @@ -671,6 +697,28 @@ def decorated(*args, **kwargs): return decorated +# Papéis com permissão de ESCRITA no inventário (criar/editar/excluir/pesar spools, +# filamentos e modelos). `viewer` é somente-leitura — o rótulo no cadastro de usuário +# é literalmente "Viewer (somente leitura)", então a regra é enforçada no servidor, +# não só escondendo botões. Fica numa constante p/ facilitar um futuro papel "editor". +WRITE_ROLES = ("admin",) + + +def write_required(f): + """Exige um papel com permissão de escrita (CWE-285). Aplicado em TODA rota que + muta dados do inventário — o gate é no servidor, independente da UI.""" + @wraps(f) + def decorated(*args, **kwargs): + if "user_id" not in session: + if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": + return jsonify(error="Unauthorized"), 401 + return redirect(url_for("login", next=request.path)) + if session.get("role") not in WRITE_ROLES: + abort(403) + return f(*args, **kwargs) + return decorated + + def demo_blocked(f): @wraps(f) def decorated(*args, **kwargs): @@ -691,6 +739,9 @@ def _promote_session(user, remember, ip, next_url=""): session["username"] = user["username"] session["role"] = user["role"] session["must_change_password"] = bool(user["must_change_password"]) + # Amarra o cookie a um token server-side (ver _validate_session): permite revogar + # esta sessão no logout / troca de senha rotacionando o token no banco. + session["auth_token"] = db.get_or_create_session_token(user["id"]) db.log_login(user["username"], ip) # Open redirect (CWE-601): valida o destino no PRÓPRIO ponto do redirect (o # analisador não reconhece a barreira interprocedural do _safe_next). Segue @@ -778,6 +829,11 @@ def login_2fa(): @app.route("/logout", methods=["POST"]) def logout(): + # Rotaciona o token server-side ANTES de limpar o cookie: invalida na hora + # qualquer outra cópia deste cookie de sessão (replay pós-logout — CWE-613). + uid = session.get("user_id") + if uid: + db.rotate_session_token(uid) session.clear() return redirect(url_for("login")) @@ -834,6 +890,17 @@ def err_404(e): return render_template("error.html", code=404, message=t("Página não encontrada")), 404 +@app.errorhandler(405) +def err_405(e): + # Método errado numa rota válida (ex.: GET em /logout, que é POST-only). Sem este + # handler, o errorhandler(Exception) abaixo capturaria o 405 e o transformaria em + # 500 — ver `err_unhandled`, que repassa HTTPException justamente por isso. + log.warning("http.405", path=request.path, method=request.method) + if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": + return jsonify(ok=False, error="method_not_allowed"), 405 + return render_template("error.html", code=405, message=t("Método não permitido")), 405 + + @app.errorhandler(422) def err_422(e): log.warning("http.422", detail=str(e)) @@ -852,6 +919,11 @@ def err_500(e): @app.errorhandler(Exception) def err_unhandled(e): + # HTTPExceptions (404/405/403/abort(...) etc.) NÃO são falhas internas: repassa + # para o handler específico / resposta padrão do Flask. Sem isto, um simples 405 + # (método errado) viraria 500. Só erros realmente não tratados caem como 500. + if isinstance(e, HTTPException): + return e log.critical("unhandled_exception", exc=str(e), exc_info=True) if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": return jsonify(ok=False, error="internal_error"), 500 diff --git a/database.py b/database.py index 010de2f..5df1eaa 100644 --- a/database.py +++ b/database.py @@ -214,6 +214,10 @@ def init_db(): "ALTER TABLE users ADD COLUMN totp_secret TEXT NOT NULL DEFAULT ''", "ALTER TABLE users ADD COLUMN totp_enabled INTEGER NOT NULL DEFAULT 0", "ALTER TABLE filaments ADD COLUMN color_name TEXT NOT NULL DEFAULT ''", + # Token de sessão server-side (revogação de cookie — ver app._validate_session). + # Default '' p/ instalações existentes: na próxima requisição o cookie legado + # (sem auth_token) não bate e o usuário reloga, recebendo um token gerado. + "ALTER TABLE users ADD COLUMN session_token TEXT NOT NULL DEFAULT ''", ]: try: db.execute(sql) @@ -294,6 +298,35 @@ def log_login(username, ip): db.commit() +# ── Token de sessão (revogação server-side — ver app._validate_session) ────── +# Cada usuário tem um token; o login grava na sessão e toda requisição autenticada +# revalida contra o banco. Rotacionar (logout / troca de senha) invalida na hora +# qualquer cookie de sessão antigo. + +def get_session_token(user_id): + """Token de sessão atual do usuário ('' se nenhum/usuário inexistente).""" + with closing(get_db()) as db: + row = db.execute("SELECT session_token FROM users WHERE id=?", (user_id,)).fetchone() + return (row["session_token"] if row else "") or "" + + +def get_or_create_session_token(user_id): + """Token atual; gera e persiste um se ainda vazio (usuário/instalação legados).""" + token = get_session_token(user_id) + if token: + return token + return rotate_session_token(user_id) + + +def rotate_session_token(user_id): + """Gera um token novo, persiste e o devolve — invalida sessões anteriores.""" + token = secrets.token_hex(16) + with closing(get_db()) as db: + db.execute("UPDATE users SET session_token=? WHERE id=?", (token, user_id)) + db.commit() + return token + + # ── 2FA (TOTP opcional por usuário) ────────────────────────────────────────── # Segredo TOTP em texto (igual às API keys; preciso p/ validar). Códigos de # recuperação ficam HASHEADOS na tabela recovery_codes. Tudo opt-in: off até o diff --git a/routes/account.py b/routes/account.py index 5db0de8..1c11d89 100644 --- a/routes/account.py +++ b/routes/account.py @@ -55,6 +55,9 @@ def account_password(): db.update_user_password(session["user_id"], generate_password_hash(new), must_change=False) session.pop("must_change_password", None) + # Troca de senha revoga TODAS as outras sessões (CWE-613): rotaciona o + # token server-side e re-amarra ESTA sessão ao token novo (continua logado). + session["auth_token"] = db.rotate_session_token(session["user_id"]) log.info("account.password_changed", forced=forced) flash(t("Senha alterada com sucesso"), "success") # Primeiro login (senha temporária recém-trocada): oferece — sem obrigar — diff --git a/routes/admin.py b/routes/admin.py index 042d6b2..b554680 100644 --- a/routes/admin.py +++ b/routes/admin.py @@ -65,8 +65,12 @@ def admin_users_password(user_id): is_self = (user_id == session["user_id"]) db.update_user_password(user_id, generate_password_hash(password), must_change=not is_self) + # Rotaciona o token de sessão do alvo (CWE-613): se for outro usuário, derruba + # imediatamente as sessões dele; se for o próprio admin, re-amarra a sessão atual. + new_token = db.rotate_session_token(user_id) if is_self: session.pop("must_change_password", None) + session["auth_token"] = new_token flash(t("Senha atualizada"), "success") return redirect(url_for("admin_users")) diff --git a/routes/filaments.py b/routes/filaments.py index 24078ba..a54feb8 100644 --- a/routes/filaments.py +++ b/routes/filaments.py @@ -3,7 +3,7 @@ import database as db import filament_catalog as catalog import logger as log_cfg -from app import app, login_required, t, _safe_next, get_ordered_materials +from app import app, login_required, write_required, t, _safe_next, get_ordered_materials log = log_cfg.get_logger() @@ -42,7 +42,7 @@ def _resolve_material(form) -> str: @app.route("/filaments/new", methods=["GET", "POST"]) -@login_required +@write_required def filaments_new(): if request.method == "POST": try: @@ -78,7 +78,7 @@ def filaments_detail(filament_id): @app.route("/filaments//edit", methods=["GET", "POST"]) -@login_required +@write_required def filaments_edit(filament_id): filament = db.get_filament(filament_id) if not filament: @@ -107,7 +107,7 @@ def filaments_edit(filament_id): @app.route("/filaments//duplicate", methods=["POST"]) -@login_required +@write_required def filaments_duplicate(filament_id): src = db.get_filament(filament_id) if not src: @@ -126,7 +126,7 @@ def filaments_duplicate(filament_id): @app.route("/filaments//delete", methods=["POST"]) -@login_required +@write_required def filaments_delete(filament_id): try: db.delete_filament(filament_id) diff --git a/routes/integrations.py b/routes/integrations.py index 54b42f5..2df6b59 100644 --- a/routes/integrations.py +++ b/routes/integrations.py @@ -4,7 +4,7 @@ leitura de estoque) têm chaves SEPARADAS: rotacionar uma não afeta a outra. A balança fica PRONTA mas OCULTA por enquanto (VISIBLE_INTEGRATIONS) — seu código, auth e seed já funcionam; só não aparece na UI ainda.""" -from flask import render_template, request, redirect, url_for, flash +from flask import render_template, request, redirect, url_for, flash, jsonify, abort import database as db import logger as log_cfg from app import app, admin_required, demo_blocked, t, public_base_url @@ -31,10 +31,13 @@ def admin_integrations(): row = rows.get(slug) if not row: continue + # NUNCA mande a chave inteira para o template/DOM (CWE-200): só os 4 últimos + # dígitos mascarados. O valor em claro é buscado sob demanda via reveal_key(). + key = row["key"] or "" items.append({ "slug": slug, "label": row["label"] or slug, - "key": row["key"], + "key_masked": ("•" * 24) + key[-4:], "scope": row["scope"], "enabled": bool(row["enabled"]), "created_at": row["created_at"], @@ -44,6 +47,21 @@ def admin_integrations(): items=items, ha_endpoints=_ha_endpoints()) +@app.route("/admin/integrations//key") +@admin_required +def admin_integration_key(integration): + """Revela a chave em claro SOB DEMANDA (botão "Revelar"/"Copiar"), só p/ admin e + nunca embutida no HTML servido. Resposta no-store p/ não ficar em cache.""" + if integration not in VISIBLE_INTEGRATIONS: + abort(404) + row = db.get_api_key(integration) + if not row: + abort(404) + resp = jsonify(key=row["key"]) + resp.headers["Cache-Control"] = "no-store" + return resp + + @app.route("/admin/integrations//regenerate", methods=["POST"]) @admin_required @demo_blocked diff --git a/routes/main.py b/routes/main.py index e0a659c..b170213 100644 --- a/routes/main.py +++ b/routes/main.py @@ -1,6 +1,6 @@ """Rotas gerais: dashboard, busca e health check.""" from datetime import datetime, timezone -from flask import render_template, request, jsonify +from flask import render_template, request, jsonify, Response import database as db import logger as log_cfg from app import app, login_required, APP_VERSION, DEMO_MODE @@ -33,6 +33,19 @@ def search(): # ── Health ───────────────────────────────────────────────────────────────── +@app.route("/.well-known/security.txt") +@app.route("/security.txt") +def security_txt(): + """Canal de divulgação responsável de vulnerabilidades (RFC 9116). Público (sem + auth) — é o ponto de contato p/ quem encontrar uma falha.""" + body = ( + "Contact: mailto:iscarelli@gmail.com\n" + "Preferred-Languages: pt, en\n" + "Canonical: https://spool.lojinharacer.com.br/.well-known/security.txt\n" + ) + return Response(body, mimetype="text/plain") + + @app.route("/health") def health(): checks: dict = {} diff --git a/routes/spool_models.py b/routes/spool_models.py index 8786d2e..982989c 100644 --- a/routes/spool_models.py +++ b/routes/spool_models.py @@ -2,7 +2,7 @@ from flask import render_template, request, redirect, url_for, flash, abort import database as db import logger as log_cfg -from app import app, login_required, t +from app import app, login_required, write_required, t log = log_cfg.get_logger() @@ -15,7 +15,7 @@ def spool_models_list(): @app.route("/spool-models/new", methods=["GET", "POST"]) -@login_required +@write_required def spool_models_new(): if request.method == "POST": try: @@ -33,7 +33,7 @@ def spool_models_new(): @app.route("/spool-models//edit", methods=["GET", "POST"]) -@login_required +@write_required def spool_models_edit(model_id): model = db.get_spool_model(model_id) if not model: @@ -55,7 +55,7 @@ def spool_models_edit(model_id): @app.route("/spool-models//delete", methods=["POST"]) -@login_required +@write_required def spool_models_delete(model_id): try: db.delete_spool_model(model_id) diff --git a/routes/spools.py b/routes/spools.py index 7d46929..718bc60 100644 --- a/routes/spools.py +++ b/routes/spools.py @@ -9,7 +9,7 @@ import labels as lbl import niimbot_registry as reg import logger as log_cfg -from app import app, login_required, t, _parse_price, public_base_url, _label_spool, _currency_meta +from app import app, login_required, write_required, t, _parse_price, public_base_url, _label_spool, _currency_meta log = log_cfg.get_logger() @@ -45,7 +45,7 @@ def spools_list(): @app.route("/spools/new", methods=["GET", "POST"]) -@login_required +@write_required def spools_new(): filament_id = request.args.get("filament_id", type=int) if request.method == "POST": @@ -89,7 +89,7 @@ def spools_detail(spool_id): @app.route("/spools//edit", methods=["GET", "POST"]) -@login_required +@write_required def spools_edit(spool_id): spool = db.get_spool(spool_id) if not spool: @@ -124,7 +124,7 @@ def spools_edit(spool_id): @app.route("/spools//weigh", methods=["GET", "POST"]) -@login_required +@write_required def spools_weigh(spool_id): spool = db.get_spool(spool_id) if not spool: @@ -163,7 +163,7 @@ def spools_weigh(spool_id): @app.route("/spools//deactivate", methods=["POST"]) -@login_required +@write_required def spools_deactivate(spool_id): db.deactivate_spool(spool_id) flash(t("Rolo marcado como finalizado"), "success") @@ -248,7 +248,7 @@ def niimbot_registry(): # ── Pesagem rápida ─────────────────────────────────────────────────────────── @app.route("/weigh", methods=["GET", "POST"]) -@login_required +@write_required def quick_weigh(): if request.method == "POST": code = request.form.get("spool_code", "").strip() diff --git a/templates/admin/integrations.html b/templates/admin/integrations.html index d75ae11..9c283c8 100644 --- a/templates/admin/integrations.html +++ b/templates/admin/integrations.html @@ -30,7 +30,7 @@
{{ it.label }}
+ data-slug="{{ it.slug }}" value="{{ it.key_masked }}">
@@ -64,23 +64,46 @@
{% block scripts %}