Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/tests/test_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ def test_stac_api(self):

def test_stac_to_raster(self):
"""test link to raster api."""
tile_matrix_set_id = "WebMercatorQuad"

# tilejson
resp = httpx.get(
f"{self.stac_endpoint}/{self.collections_route}/{self.seeded_collection}/items/{self.seeded_id}/tilejson.json",
f"{self.stac_endpoint}/{self.collections_route}/{self.seeded_collection}/items/{self.seeded_id}/{tile_matrix_set_id}/tilejson.json",
params={"assets": "cog"},
)
assert resp.status_code == 307
Expand Down
13 changes: 7 additions & 6 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
Aspects.of(self).add(PermissionsBoundaryAspect(permissions_boundary_policy))


git_sha = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
try:
git_tag = subprocess.check_output(["git", "describe", "--tags"]).decode().strip()
except subprocess.CalledProcessError:
git_tag = "no-tag"

veda_stack = VedaStack(
app,
f"{veda_app_settings.app_name}-{veda_app_settings.stage_name()}",
Expand Down Expand Up @@ -110,6 +116,7 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
stac_db_security_group_id=db_security_group.security_group_id,
stac_api_url=stac_api.stac_api.url,
raster_api_url=raster_api.raster_api.url,
git_sha=git_sha,
)

ingest_api = ingest_api_construct(
Expand All @@ -129,12 +136,6 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
db_vpc=vpc.vpc,
)

git_sha = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
try:
git_tag = subprocess.check_output(["git", "describe", "--tags"]).decode().strip()
except subprocess.CalledProcessError:
git_tag = "no-tag"

for key, value in {
"Project": veda_app_settings.app_name,
"Stack": veda_app_settings.stage_name(),
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
# - TITILER_ENDPOINT=raster
- TITILER_ENDPOINT=http://0.0.0.0:8082
- VEDA_STAC_ENABLE_TRANSACTIONS=True
- VEDA_STAC_GIT_SHA=local123
depends_on:
- database
- raster
Expand Down Expand Up @@ -84,6 +85,7 @@ services:
# API Config
- VEDA_RASTER_ENABLE_MOSAIC_SEARCH=TRUE
- VEDA_RASTER_EXPORT_ASSUME_ROLE_CREDS_AS_ENVS=TRUE
- VEDA_RASTER_GIT_SHA=local123


depends_on:
Expand Down Expand Up @@ -151,6 +153,7 @@ services:
- STAC_URL=http://0.0.0.0:8081
- USERPOOL_ID=us-west-2_123456789
- CLIENT_ID=123456789
- GIT_SHA=local123
ports:
- "8083:8083"
command: bash -c "bash /tmp/scripts/wait-for-it.sh -t 120 -h database -p 5432 && python /asset/local.py"
Expand Down
6 changes: 6 additions & 0 deletions ingest_api/infrastructure/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import subprocess
from getpass import getuser
from typing import List, Optional

Expand Down Expand Up @@ -93,6 +94,11 @@ class IngestorConfig(BaseSettings):
case_sensitive=False, env_file=".env", env_prefix="VEDA_", extra="ignore"
)

git_sha: Optional[str] = Field(
subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode("utf-8"),
description="Git SHA of the current commit, used to track deployment version",
)

@property
def stack_name(self) -> str:
return f"veda-stac-ingestion-{self.stage}"
Expand Down
2 changes: 2 additions & 0 deletions ingest_api/infrastructure/construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
"STAGE": config.stage,
"CLIENT_ID": config.keycloak_ingest_api_client_id,
"OPENID_CONFIGURATION_URL": str(config.openid_configuration_url),
"GIT_SHA": config.git_sha,
}

build_api_lambda_params = {
Expand Down Expand Up @@ -236,6 +237,7 @@ def __init__(
"RASTER_URL": config.veda_raster_api_cf_url,
"CLIENT_ID": config.keycloak_ingest_api_client_id,
"OPENID_CONFIGURATION_URL": str(config.openid_configuration_url),
"GIT_SHA": config.git_sha,
}

if config.raster_data_access_role_arn:
Expand Down
3 changes: 3 additions & 0 deletions ingest_api/runtime/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class Settings(BaseSettings):
stac_url: AnyHttpUrl = Field(description="URL of STAC API")
root_path: Optional[str] = None
stage: Optional[str] = Field(None, description="API stage")
git_sha: Optional[str] = Field(
"", description="Git SHA of the deployed service"
) # default to str so that docker compose tests work


settings = Settings()
7 changes: 4 additions & 3 deletions ingest_api/runtime/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.collection_publisher import CollectionPublisher, ItemPublisher
from src.config import settings
from src.doc import DESCRIPTION
from src.monitoring import LoggerRouteHandler, logger, metrics, tracer
from src.monitoring import ObservabilityMiddleware, logger, metrics, tracer

from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.exceptions import RequestValidationError
Expand All @@ -32,8 +32,6 @@
},
)

app.router.route_class = LoggerRouteHandler

collection_publisher = CollectionPublisher()
item_publisher = ItemPublisher()

Expand Down Expand Up @@ -217,6 +215,9 @@ def who_am_i(claims=Depends(oidc_auth.valid_token_dependency)):
return claims


app.add_middleware(ObservabilityMiddleware)


# If the correlation header is used in the UI, we can analyze traces that originate from a given user or client
@app.middleware("http")
async def add_correlation_id(request: Request, call_next):
Expand Down
196 changes: 154 additions & 42 deletions ingest_api/runtime/src/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,169 @@
"""Observability utils"""
"""Observability middleware for logging and tracing requests."""
import json
from typing import Callable
import time
from typing import Callable, Optional

from aws_lambda_powertools import Logger, Metrics, Tracer, single_metric
from aws_lambda_powertools.metrics import MetricUnit # noqa: F401
from src.config import settings
from aws_lambda_powertools import Logger, Metrics, Tracer
from aws_lambda_powertools.metrics import MetricResolution, MetricUnit
from src.config import Settings

from fastapi import Request, Response
from fastapi.routing import APIRoute
settings = Settings()

logger: Logger = Logger(service="ingest-api", namespace="veda-backend")
metrics: Metrics = Metrics(namespace="veda-backend")
metrics.set_default_dimensions(environment=settings.stage, service="ingest-api")
tracer: Tracer = Tracer()

try:
version = settings.git_sha[:6] # short git sha
except TypeError:
version = "unknown"

class LoggerRouteHandler(APIRoute):
"""Extension of base APIRoute to add context to log statements, as well as record usage metrics"""

def get_route_handler(self) -> Callable:
"""Overide route handler method to add logs, metrics, tracing"""
original_route_handler = super().get_route_handler()
class ObservabilityMiddleware:
"""Observability middleware for logging and tracing requests."""

async def route_handler(request: Request) -> Response:
# Add fastapi context to logs
body = await request.body()
def __init__(self, app: Callable):
"""Observability middleware for logging and tracing requests."""
self.app = app

async def __call__(self, scope, receive, send): # noqa: C901
"""Observability middleware for logging and tracing requests."""
# Only handle HTTP requests
if scope["type"] != "http":
await self.app(scope, receive, send)
return

method: str = scope.get("method", "GET")
raw_path: str = scope.get("path", "")

# --- Buffer the incoming body so we can log it but still pass it downstream ---
body = b""
more_body = True

# Consume the body from the original receive channel
while more_body:
message = await receive()
if message["type"] != "http.request":
# Pass through non-http.request messages
await self.app(scope, _make_receive_replay([message]), send)
return
body += message.get("body", b"")
more_body = message.get("more_body", False)

# Prepare a receive wrapper that replays the buffered body to the app
receive_replayed = _make_receive_replay(
[{"type": "http.request", "body": body, "more_body": False}]
)

# Try to parse JSON body for structured logging (non-JSON becomes None)
body_json = None
if body:
try:
body_json = json.loads(body)
except json.decoder.JSONDecodeError:
except json.JSONDecodeError:
body_json = None
ctx = {
"path": request.url.path,
"path_params": request.path_params,
"body": body_json,
"route": self.path,
"method": request.method,
}
logger.append_keys(fastapi=ctx)
logger.info("Received request")

with single_metric(
name="RequestCount",
unit=MetricUnit.Count,
value=1,
default_dimensions=metrics.default_dimensions,
namespace="veda-backend",
) as metric:
metric.add_dimension(
name="route", value=f"{request.method} {self.path}"
)

tracer.put_annotation(key="path", value=request.url.path)
tracer.capture_method(original_route_handler)(request)
return await original_route_handler(request)

return route_handler

# Initial context logging (route template not yet resolved here)
ctx = {
"path": raw_path,
"method": method,
"path_params": None, # will try to resolve after routing
"route": None, # will try to resolve after routing
"body": body_json,
}
logger.append_keys(fastapi=ctx)
logger.info("Received request")

# Add X-Ray annotations early
tracer.put_annotation(key="path", value=raw_path)
tracer.put_annotation(key="method", value=method)

# Capture status code via send wrapper
status_holder = {"status": 500}
resp_size_holder = {"bytes": 0}

start = time.perf_counter_ns()

async def send_wrapper(message):
if message["type"] == "http.response.start":
status_holder["status"] = message.get("status", 500)
# If Content-Length is set, remember it; otherwise we’ll sum body chunks
for (h, v) in message.get("headers", []) or []:
if h.lower() == b"content-length":
try:
resp_size_holder["bytes"] = int(v.decode("latin1"))
except Exception:
pass
elif message["type"] == "http.response.body":
# If no Content-Length, accumulate bytes
if resp_size_holder["bytes"] == 0:
resp_size_holder["bytes"] += len(message.get("body") or b"")
await send(message)

# Route/execute the request with tracing around the app call
@tracer.capture_method(capture_response=False)
async def _call_downstream():
await self.app(scope, receive_replayed, send_wrapper)

await _call_downstream()

elapsed_ms = (time.perf_counter_ns() - start) / 1_000_000.0
status = status_holder["status"]
status_family = f"{status // 100}xx"

# After downstream handled routing, try to resolve route template & path params
route_template: Optional[str] = None
path_params = None
route_obj = scope.get("route")
if route_obj is not None:
# FastAPI/Starlette exposes a path_format like "/items/{item_id}"
route_template = getattr(route_obj, "path_format", None) or getattr(
route_obj, "path", None
)
path_params = scope.get("path_params", None)

# Update log context with resolved info and status
final_ctx = {
"path": raw_path,
"method": method,
"path_params": path_params,
"route": route_template or raw_path,
"status_code": status_holder["status"],
"body": body_json,
}
logger.append_keys(fastapi=final_ctx)
logger.info("Completed request")

metrics.add_dimension("route_template", route_template)
metrics.add_dimension("status_family", status_family)
metrics.add_dimension("git_sha", version)
metrics.add_metric(name="http_requests_total", value=1, unit=MetricUnit.Count)
metrics.add_metric(
name="http_request_duration_ms",
value=elapsed_ms,
unit=MetricUnit.Milliseconds,
resolution=MetricResolution.High,
)
metrics.add_metric(
name="http_response_size_bytes",
value=resp_size_holder["bytes"],
unit=MetricUnit.Bytes,
)


def _make_receive_replay(messages):
"""
Build a 'receive' callable that replays given ASGI messages once.
"""

async def _receive():
if _receive._idx < len(messages):
msg = messages[_receive._idx]
_receive._idx += 1
return msg
# No more data
return {"type": "http.request", "body": b"", "more_body": False}

_receive._idx = 0 # type: ignore[attr-defined]
return _receive
7 changes: 4 additions & 3 deletions ingest_api/runtime/src/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import src.schemas as schemas
from boto3.dynamodb import conditions
from boto3.dynamodb.types import DYNAMODB_CONTEXT
from pydantic import parse_obj_as
from pydantic import TypeAdapter

if TYPE_CHECKING:
from mypy_boto3_dynamodb.service_resource import Table
Expand All @@ -25,7 +25,7 @@ def fetch_one(self, username: str, ingestion_id: str):
Key={"created_by": username, "id": ingestion_id},
)
try:
return schemas.Ingestion.parse_obj(response["Item"])
return schemas.Ingestion.model_validate(response["Item"])
except KeyError:
raise NotInDb("Record not found")

Expand All @@ -38,8 +38,9 @@ def fetch_many(
**{"Limit": limit} if limit else {},
**{"ExclusiveStartKey": next} if next else {},
)
list_of_ingestions = TypeAdapter(List[schemas.Ingestion])
return {
"items": parse_obj_as(List[schemas.Ingestion], response["Items"]),
"items": list_of_ingestions.validate_python(response["Items"]),
"next": response.get("LastEvaluatedKey"),
}

Expand Down
2 changes: 1 addition & 1 deletion ingest_api/runtime/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ def example_ingestion(example_stac_item):
id=example_stac_item["id"],
created_by="test-user",
status=schemas.Status.queued,
item=Item.parse_obj(example_stac_item),
item=Item.model_validate(example_stac_item),
created_at=datetime.now(),
)
Loading
Loading