diff --git a/src/nene2/auth/__init__.py b/src/nene2/auth/__init__.py index afe28ac..61186f0 100644 --- a/src/nene2/auth/__init__.py +++ b/src/nene2/auth/__init__.py @@ -2,12 +2,15 @@ from .api_key import ApiKeyAuthMiddleware from .bearer_token import BearerTokenMiddleware -from .interfaces import TokenVerifierProtocol +from .exceptions import TokenVerificationException +from .interfaces import TokenIssuerProtocol, TokenVerifierProtocol from .local_verifier import LocalTokenVerifier __all__ = [ "ApiKeyAuthMiddleware", "BearerTokenMiddleware", "LocalTokenVerifier", + "TokenIssuerProtocol", + "TokenVerificationException", "TokenVerifierProtocol", ] diff --git a/src/nene2/auth/bearer_token.py b/src/nene2/auth/bearer_token.py index a85b28c..b2e9566 100644 --- a/src/nene2/auth/bearer_token.py +++ b/src/nene2/auth/bearer_token.py @@ -10,6 +10,7 @@ from nene2.http.problem_details import problem_details_response +from .exceptions import TokenVerificationException from .interfaces import TokenVerifierProtocol _WWW_AUTH = 'Bearer realm="api"' @@ -34,7 +35,11 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) - response.headers["WWW-Authenticate"] = _WWW_AUTH return response token = auth[len("Bearer "):] - if not self._verifier.verify(token): + try: + verified = self._verifier.verify(token) + except TokenVerificationException: + verified = False + if not verified: response = problem_details_response( "unauthorized", "Unauthorized", diff --git a/src/nene2/auth/exceptions.py b/src/nene2/auth/exceptions.py new file mode 100644 index 0000000..3dc7160 --- /dev/null +++ b/src/nene2/auth/exceptions.py @@ -0,0 +1,8 @@ +"""Authentication exceptions.""" + + +class TokenVerificationException(Exception): + """Raised by TokenVerifierProtocol implementations when a token is invalid. + + BearerTokenMiddleware maps this to a 401 Problem Details response. + """ diff --git a/src/nene2/auth/interfaces.py b/src/nene2/auth/interfaces.py index 140a3c6..95c030b 100644 --- a/src/nene2/auth/interfaces.py +++ b/src/nene2/auth/interfaces.py @@ -5,6 +5,19 @@ @runtime_checkable class TokenVerifierProtocol(Protocol): - """Verify an authentication token.""" + """Verify an authentication token. + + Implementations may raise TokenVerificationException instead of returning False. + """ def verify(self, token: str) -> bool: ... + + +@runtime_checkable +class TokenIssuerProtocol(Protocol): + """Issue a signed bearer token from the given claims. + + Production implementations wrap a JWT library (e.g. PyJWT). + """ + + def issue(self, claims: dict[str, object]) -> str: ... diff --git a/tests/nene2/auth/test_token_issuer.py b/tests/nene2/auth/test_token_issuer.py new file mode 100644 index 0000000..cd734e2 --- /dev/null +++ b/tests/nene2/auth/test_token_issuer.py @@ -0,0 +1,60 @@ +"""Tests for TokenIssuerProtocol and TokenVerificationException.""" + +import pytest +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from fastapi.testclient import TestClient + +from nene2.auth import ( + BearerTokenMiddleware, + TokenIssuerProtocol, + TokenVerificationException, + TokenVerifierProtocol, +) + + +class _StubIssuer: + """Minimal TokenIssuerProtocol implementation for testing.""" + + def issue(self, claims: dict[str, object]) -> str: + return f"stub-token-{claims.get('sub', 'anon')}" + + +class _RaisingVerifier: + """Verifier that raises TokenVerificationException.""" + + def verify(self, token: str) -> bool: + raise TokenVerificationException("token expired") + + +def test_stub_issuer_satisfies_protocol() -> None: + assert isinstance(_StubIssuer(), TokenIssuerProtocol) + + +def test_stub_issuer_returns_token_string() -> None: + token = _StubIssuer().issue({"sub": "user-1"}) + assert isinstance(token, str) + assert "user-1" in token + + +def test_token_verification_exception_is_exception() -> None: + with pytest.raises(TokenVerificationException): + raise TokenVerificationException("bad token") + + +def test_raising_verifier_satisfies_verifier_protocol() -> None: + assert isinstance(_RaisingVerifier(), TokenVerifierProtocol) + + +def test_bearer_middleware_maps_token_verification_exception_to_401() -> None: + app = FastAPI() + app.add_middleware(BearerTokenMiddleware, verifier=_RaisingVerifier()) + + @app.get("/secret") + async def secret() -> JSONResponse: + return JSONResponse({"ok": True}) + + client = TestClient(app, raise_server_exceptions=False) + response = client.get("/secret", headers={"Authorization": "Bearer any-token"}) + assert response.status_code == 401 + assert response.json()["type"].endswith("unauthorized")