Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 57 additions & 51 deletions backend/app/costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, cast

from app.models import NormalizedRouteObjectiveWeights, RouteObjectiveWeights
from app.value_parsing import coerce_float
from app.models import NormalizedRoutePreferenceWeights, RoutePreferenceWeights
from app.value_parsing import parse_float_or_default

if TYPE_CHECKING:
from app.typing_aliases import EdgeAttributes, MultiDiGraphAny
from app.typing_aliases import EdgeAttributeMap, MultiDiGraphAny

FALLBACK_EDGE_COST = 1e18


def _coerce_edge_attributes(candidate: object) -> EdgeAttributes | None:
def _coerce_edge_attribute_mapping(candidate: object) -> EdgeAttributeMap | None:
"""Convert mapping-like edge payload to a string-keyed dict."""
if not isinstance(candidate, Mapping):
return None
Expand All @@ -27,77 +27,79 @@ def _coerce_edge_attributes(candidate: object) -> EdgeAttributes | None:
}


def extract_parallel_edge_attributes(candidate: object) -> list[EdgeAttributes]:
def _extract_parallel_edge_attribute_mappings(
candidate: object,
) -> list[EdgeAttributeMap]:
"""Extract parallel edge attribute dictionaries from a NetworkX payload."""
parallel_edges: list[EdgeAttributes] = []
parallel_edges: list[EdgeAttributeMap] = []

if not isinstance(candidate, Mapping):
return parallel_edges

for edge_candidate in candidate.values():
edge_attributes = _coerce_edge_attributes(edge_candidate)
edge_attributes = _coerce_edge_attribute_mapping(edge_candidate)

if edge_attributes is not None:
parallel_edges.append(edge_attributes)

return parallel_edges


def normalize_route_objective_weights(
route_objective_weights: RouteObjectiveWeights,
) -> NormalizedRouteObjectiveWeights:
def normalize_route_preference_weights(
route_preference_weights: RoutePreferenceWeights,
) -> NormalizedRoutePreferenceWeights:
"""Normalize objective weights from 0-100 percentages to 0.0-1.0."""
return NormalizedRouteObjectiveWeights(
scenic=route_objective_weights.scenic / 100.0,
avoid_snow=route_objective_weights.avoid_snow / 100.0,
avoid_uphill=route_objective_weights.avoid_uphill / 100.0,
return NormalizedRoutePreferenceWeights(
scenic_weight=route_preference_weights.scenic_weight / 100.0,
snow_free_weight=route_preference_weights.snow_free_weight / 100.0,
flat_weight=route_preference_weights.flat_weight / 100.0,
)


def compute_edge_objective_components(
edge_attributes: EdgeAttributes,
def compute_edge_cost_components(
edge_attributes: EdgeAttributeMap,
) -> tuple[float, float, float, float]:
"""Compute distance and objective-aligned edge costs."""
distance_meters = coerce_float(edge_attributes.get("length"), default=0.0)
distance_meters = parse_float_or_default(edge_attributes.get("length"), default=0.0)

snow_exposure = coerce_float(edge_attributes.get("snow"), default=0.0)
uphill_exposure = coerce_float(edge_attributes.get("uphill"), default=0.0)
scenic_score = coerce_float(edge_attributes.get("scenic"), default=0.0)
snow_intensity = parse_float_or_default(edge_attributes.get("snow"), default=0.0)
hill_intensity = parse_float_or_default(edge_attributes.get("hills"), default=0.0)
scenic_value = parse_float_or_default(edge_attributes.get("scenic"), default=0.0)

snow_penalty_cost = distance_meters * snow_exposure
uphill_penalty_cost = distance_meters * uphill_exposure
scenic_penalty_cost = distance_meters * (1.0 - scenic_score)
snow_penalty = distance_meters * snow_intensity
hill_penalty = distance_meters * hill_intensity
scenic_penalty = distance_meters * (1.0 - scenic_value)

return (
distance_meters,
snow_penalty_cost,
uphill_penalty_cost,
scenic_penalty_cost,
snow_penalty,
hill_penalty,
scenic_penalty,
)


def compute_scalar_edge_cost(
edge_attributes: EdgeAttributes,
normalized_weights: NormalizedRouteObjectiveWeights,
def _compute_weighted_edge_cost(
edge_attributes: EdgeAttributeMap,
normalized_weights: NormalizedRoutePreferenceWeights,
) -> float:
"""Compute a single scalar cost for an edge."""
(
distance_meters,
snow_penalty_cost,
uphill_penalty_cost,
scenic_penalty_cost,
) = compute_edge_objective_components(edge_attributes)
snow_penalty,
hill_penalty,
scenic_penalty,
) = compute_edge_cost_components(edge_attributes)

return (
distance_meters
+ normalized_weights.avoid_snow * snow_penalty_cost
+ normalized_weights.avoid_uphill * uphill_penalty_cost
+ normalized_weights.scenic * scenic_penalty_cost
+ normalized_weights.snow_free_weight * snow_penalty
+ normalized_weights.flat_weight * hill_penalty
+ normalized_weights.scenic_weight * scenic_penalty
)


def build_networkx_weight_function(
normalized_weights: NormalizedRouteObjectiveWeights,
def build_weighted_edge_cost_function(
normalized_weights: NormalizedRoutePreferenceWeights,
) -> Callable[[int, int, object], float]:
"""Create a weight function compatible with NetworkX MultiDiGraph."""

Expand All @@ -106,55 +108,59 @@ def edge_weight(
_target_node_id: int,
networkx_edge_payload: object,
) -> float:
direct_edge_attributes = _coerce_edge_attributes(networkx_edge_payload)
direct_edge_attributes = _coerce_edge_attribute_mapping(networkx_edge_payload)

if direct_edge_attributes is not None and "length" in direct_edge_attributes:
return compute_scalar_edge_cost(direct_edge_attributes, normalized_weights)
return _compute_weighted_edge_cost(
direct_edge_attributes, normalized_weights
)

parallel_edge_attributes = extract_parallel_edge_attributes(
parallel_edge_attributes = _extract_parallel_edge_attribute_mappings(
networkx_edge_payload
)

if not parallel_edge_attributes:
return FALLBACK_EDGE_COST

return min(
compute_scalar_edge_cost(edge_attributes, normalized_weights)
_compute_weighted_edge_cost(edge_attributes, normalized_weights)
for edge_attributes in parallel_edge_attributes
)

return edge_weight


def select_parallel_edge(
def select_parallel_edge_attributes(
graph: MultiDiGraphAny,
source_node_id: int,
target_node_id: int,
*,
ranking_key: Callable[[EdgeAttributes], float],
) -> EdgeAttributes | None:
ranking_key: Callable[[EdgeAttributeMap], float],
) -> EdgeAttributeMap | None:
"""Select one parallel edge according to the provided ranking key."""
parallel_edges_payload = graph.get_edge_data(source_node_id, target_node_id)
parallel_edge_attributes = extract_parallel_edge_attributes(parallel_edges_payload)
parallel_edge_attributes = _extract_parallel_edge_attribute_mappings(
parallel_edges_payload
)

if not parallel_edge_attributes:
return None

return min(parallel_edge_attributes, key=ranking_key)


def select_best_parallel_edge_by_scalar_cost(
def select_lowest_cost_parallel_edge(
graph: MultiDiGraphAny,
source_node_id: int,
target_node_id: int,
normalized_weights: NormalizedRouteObjectiveWeights,
) -> EdgeAttributes | None:
normalized_weights: NormalizedRoutePreferenceWeights,
) -> EdgeAttributeMap | None:
"""Select the lowest scalar-cost parallel edge for a graph segment."""
return select_parallel_edge(
return select_parallel_edge_attributes(
graph,
source_node_id,
target_node_id,
ranking_key=lambda edge_attributes: compute_scalar_edge_cost(
ranking_key=lambda edge_attributes: _compute_weighted_edge_cost(
edge_attributes,
normalized_weights,
),
Expand Down
22 changes: 11 additions & 11 deletions backend/app/geocoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
type JsonObject = dict[str, object]


def reverse_geocode_address(
def reverse_geocode_to_address(
longitude: float,
latitude: float,
*,
zoom_level: int = 18,
) -> ReverseGeocodeResponse:
"""Reverse geocode coordinates to a single-line road + house number string."""
reverse_url = ox.settings.nominatim_url.rstrip("/") + "/reverse"
request_get = cast("Callable[..., requests.Response]", requests.get)
reverse_endpoint_url = ox.settings.nominatim_url.rstrip("/") + "/reverse"
send_get_request = cast("Callable[..., requests.Response]", requests.get)

params: dict[str, str | int | float] = {
"format": "jsonv2",
Expand All @@ -35,27 +35,27 @@ def reverse_geocode_address(
"Accept-Language": ox.settings.http_accept_language,
}

response = request_get(
reverse_url,
response = send_get_request(
reverse_endpoint_url,
params=params,
headers=headers,
timeout=ox.settings.requests_timeout,
**dict(ox.settings.requests_kwargs),
)
response.raise_for_status()

payload = cast("object", response.json())
response_data = cast("object", response.json())

if not isinstance(payload, dict):
if not isinstance(response_data, dict):
return ReverseGeocodeResponse(address="")

payload_object = cast("JsonObject", payload)
raw_address = payload_object.get("address")
payload_object = cast("JsonObject", response_data)
address_payload = payload_object.get("address")

if not isinstance(raw_address, dict):
if not isinstance(address_payload, dict):
return ReverseGeocodeResponse(address="")

address_object = cast("JsonObject", raw_address)
address_object = cast("JsonObject", address_payload)
road = str(address_object.get("road") or "").strip()
house_number = str(address_object.get("house_number") or "").strip()
formatted_address = " ".join(part for part in (road, house_number) if part)
Expand Down
128 changes: 128 additions & 0 deletions backend/app/graph_layer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Graph layer filtering and serialization logic."""

from typing import TYPE_CHECKING

from fastapi import HTTPException
from geojson_pydantic import LineString as PydanticLineString
from geojson_pydantic import Point as PydanticPoint
from geojson_pydantic.types import Position, Position2D
from shapely.geometry import LineString as ShapelyLineString
from shapely.geometry import Point as ShapelyPoint
from shapely.geometry import Polygon as ShapelyPolygon

from app.models import (
GraphEdgeFeature,
GraphLayerFeatureCollection,
GraphLayerFeatureProperties,
GraphLayerKey,
GraphNodeFeature,
)

if TYPE_CHECKING:
import geopandas as gpd
from shapely.coords import CoordinateSequence

from app.layer_service import parse_bounding_box_string


def filter_graph_features(
geodataframe: gpd.GeoDataFrame,
*,
bounding_box: str | None,
max_features: int,
) -> gpd.GeoDataFrame:
"""Filter graph features by bbox and row limit."""
filtered_features = geodataframe

if bounding_box is not None:
min_longitude, min_latitude, max_longitude, max_latitude = (
parse_bounding_box_string(bounding_box)
)
bounding_geometry = ShapelyPolygon.from_bounds(
min_longitude,
min_latitude,
max_longitude,
max_latitude,
)
matching_indices = filtered_features.sindex.query(
bounding_geometry,
predicate="intersects",
)
filtered_features = filtered_features.iloc[matching_indices]

if len(filtered_features) > max_features:
filtered_features = filtered_features.head(max_features)

return filtered_features


def build_graph_layer_feature_collection(
geodataframe: gpd.GeoDataFrame,
*,
graph_layer_key: GraphLayerKey,
bounding_box: str | None,
max_features: int,
) -> GraphLayerFeatureCollection:
"""Build graph layer FeatureCollection response from node or edge data."""
filtered_features = filter_graph_features(
geodataframe,
bounding_box=bounding_box,
max_features=max_features,
)

graph_features: list[GraphNodeFeature | GraphEdgeFeature] = []

for row in filtered_features.itertuples():
geometry = row.geometry

if isinstance(geometry, ShapelyPoint):
graph_features.append(
GraphNodeFeature(
type="Feature",
properties=GraphLayerFeatureProperties(
graph_layer_key=graph_layer_key
),
geometry=PydanticPoint(
type="Point",
coordinates=Position2D(
float(geometry.x),
float(geometry.y),
),
),
)
)
continue

if isinstance(geometry, ShapelyLineString):
coordinates: CoordinateSequence = geometry.coords

if len(coordinates) == 0:
continue

line_coordinates: list[Position] = [
Position2D(float(longitude), float(latitude))
for longitude, latitude in coordinates
]
graph_features.append(
GraphEdgeFeature(
type="Feature",
properties=GraphLayerFeatureProperties(
graph_layer_key=graph_layer_key
),
geometry=PydanticLineString(
type="LineString",
coordinates=line_coordinates,
),
)
)
continue

raise HTTPException(
status_code=500,
detail="Graph layer contains unsupported geometry.",
)

return GraphLayerFeatureCollection(
type="FeatureCollection",
features=graph_features,
)
Loading
Loading