Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/nene2/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from .api_key import ApiKeyAuthMiddleware
from .bearer_token import BearerTokenMiddleware
from .interfaces import TokenVerifierProtocol
from .exceptions import TokenVerificationException
from .interfaces import TokenIssuerProtocol, TokenVerifierProtocol
from .local_verifier import LocalTokenVerifier

__all__ = [
"ApiKeyAuthMiddleware",
"BearerTokenMiddleware",
"LocalTokenVerifier",
"TokenIssuerProtocol",
"TokenVerificationException",
"TokenVerifierProtocol",
]
7 changes: 6 additions & 1 deletion src/nene2/auth/bearer_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from nene2.http.problem_details import problem_details_response

from .exceptions import TokenVerificationException
from .interfaces import TokenVerifierProtocol

_WWW_AUTH = 'Bearer realm="api"'
Expand All @@ -34,7 +35,11 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -
response.headers["WWW-Authenticate"] = _WWW_AUTH
return response
token = auth[len("Bearer "):]
if not self._verifier.verify(token):
try:
verified = self._verifier.verify(token)
except TokenVerificationException:
verified = False
if not verified:
response = problem_details_response(
"unauthorized",
"Unauthorized",
Expand Down
8 changes: 8 additions & 0 deletions src/nene2/auth/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Authentication exceptions."""


class TokenVerificationException(Exception):
"""Raised by TokenVerifierProtocol implementations when a token is invalid.

BearerTokenMiddleware maps this to a 401 Problem Details response.
"""
15 changes: 14 additions & 1 deletion src/nene2/auth/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@

@runtime_checkable
class TokenVerifierProtocol(Protocol):
"""Verify an authentication token."""
"""Verify an authentication token.

Implementations may raise TokenVerificationException instead of returning False.
"""

def verify(self, token: str) -> bool: ...


@runtime_checkable
class TokenIssuerProtocol(Protocol):
"""Issue a signed bearer token from the given claims.

Production implementations wrap a JWT library (e.g. PyJWT).
"""

def issue(self, claims: dict[str, object]) -> str: ...
60 changes: 60 additions & 0 deletions tests/nene2/auth/test_token_issuer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Tests for TokenIssuerProtocol and TokenVerificationException."""

import pytest
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.testclient import TestClient

from nene2.auth import (
BearerTokenMiddleware,
TokenIssuerProtocol,
TokenVerificationException,
TokenVerifierProtocol,
)


class _StubIssuer:
"""Minimal TokenIssuerProtocol implementation for testing."""

def issue(self, claims: dict[str, object]) -> str:
return f"stub-token-{claims.get('sub', 'anon')}"


class _RaisingVerifier:
"""Verifier that raises TokenVerificationException."""

def verify(self, token: str) -> bool:
raise TokenVerificationException("token expired")


def test_stub_issuer_satisfies_protocol() -> None:
assert isinstance(_StubIssuer(), TokenIssuerProtocol)


def test_stub_issuer_returns_token_string() -> None:
token = _StubIssuer().issue({"sub": "user-1"})
assert isinstance(token, str)
assert "user-1" in token


def test_token_verification_exception_is_exception() -> None:
with pytest.raises(TokenVerificationException):
raise TokenVerificationException("bad token")


def test_raising_verifier_satisfies_verifier_protocol() -> None:
assert isinstance(_RaisingVerifier(), TokenVerifierProtocol)


def test_bearer_middleware_maps_token_verification_exception_to_401() -> None:
app = FastAPI()
app.add_middleware(BearerTokenMiddleware, verifier=_RaisingVerifier())

@app.get("/secret")
async def secret() -> JSONResponse:
return JSONResponse({"ok": True})

client = TestClient(app, raise_server_exceptions=False)
response = client.get("/secret", headers={"Authorization": "Bearer any-token"})
assert response.status_code == 401
assert response.json()["type"].endswith("unauthorized")
Loading