Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ fuel_mapping:

imputation:
location:
remove_shape_overlaps: false
inner_distance: 100 # matches projected CRS unit.
on_overlap: "split_capacity"
on_overlap: "raise" # either 'split_capacity', or 'raise'
on_forced_class_error: "drop" # either 'drop', 'ignore', or 'raise'
forced_class:
bioenergy turbine: "land"
Expand Down
7 changes: 7 additions & 0 deletions workflow/internal/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,17 @@ properties:
type: object
additionalProperties: false
required:
- remove_shape_overlaps
- inner_distance
- on_overlap
- on_forced_class_error
properties:
remove_shape_overlaps:
title: Remove shape overlaps
description: |
If enabled, run algorithms to remove overlaps between user provided shapes.
WARNING: slows down processing significantly. Avoid if possible!
type: boolean
inner_distance:
title: Inner distance
description: |
Expand Down
5 changes: 5 additions & 0 deletions workflow/rules/impute.smk
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
rule impute_location:
input:
internal="<resources>/automatic/prepared/{category}.parquet",
exclusive_shapes=branch(
config["imputation"]["location"]["remove_shape_overlaps"],
then=rules.prepare_shapes.output.exclusive,
otherwise="<shapes>",
),
shapes="<shapes>",
user=branch(
exists("<imputed_powerplants>"),
Expand Down
17 changes: 17 additions & 0 deletions workflow/rules/prepare.smk
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,20 @@ rule remap_fuel_classes:
"Remap fuel classes of combustion plants to harmonised ones."
script:
"../scripts/remap_fuel_classes.py"


rule prepare_shapes:
input:
shapes="<shapes>",
output:
exclusive="<resources>/automatic/shapes/{shapes}/exclusive_shapes.parquet",
log:
"<logs>/{shapes}/prepare_shapes.log",
conda:
"../envs/powerplants.yaml"
params:
crs=config["crs"]["projected"],
message:
"Prepare checks to ensure exclusive area."
script:
"../scripts/prepare_shapes.py"
90 changes: 35 additions & 55 deletions workflow/scripts/impute_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import geopandas as gpd
import pandas as pd
from matplotlib import pyplot as plt
from shapely import union_all
from pyproj import CRS
from shapely.geometry import LineString, MultiPolygon, Point, Polygon
from shapely.geometry.base import BaseGeometry
from shapely.ops import nearest_points
Expand Down Expand Up @@ -144,28 +144,6 @@ def _place_slightly_inside(
return placed if placed.within(polygon) else fallback


def _polygonal_geometry(geometry: BaseGeometry) -> PolygonLike | None:
"""Return polygonal geometry, or None if no polygonal area remains."""
if geometry.is_empty:
return None

if isinstance(geometry, Polygon | MultiPolygon):
return geometry

polygons = []
for part in getattr(geometry, "geoms", []):
if isinstance(part, Polygon):
polygons.append(part)
elif isinstance(part, MultiPolygon):
polygons.extend(part.geoms)

if not polygons:
return None

merged = union_all(polygons)
return merged if isinstance(merged, Polygon | MultiPolygon) else None


def _raise_country_overlaps(overlaps: gpd.GeoDataFrame) -> None:
"""Raise an error for powerplants assigned to multiple countries."""
duplicate_ids = overlaps["powerplant_id"].drop_duplicates()
Expand All @@ -177,35 +155,22 @@ def _raise_country_overlaps(overlaps: gpd.GeoDataFrame) -> None:


def _split_shape_overlaps(
overlaps: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame, inner_distance: float
overlaps: gpd.GeoDataFrame,
placement_shapes: gpd.GeoDataFrame,
inner_distance: float,
) -> gpd.GeoDataFrame:
"""Split overlapping rows and move them into shape-exclusive areas."""
"""Split overlapping rows and move them into precomputed exclusive shapes."""
overlaps = overlaps.copy()

counts = overlaps["powerplant_id"].map(overlaps["powerplant_id"].value_counts())
overlaps["output_capacity_mw"] /= counts

shape_geometry = shapes.set_index("shape_id")["geometry"]
exclusive_geometry = {
shape_id: _polygonal_geometry(
shape_geometry.loc[shape_id].difference(
union_all(shape_geometry.drop(index=shape_id).to_list())
)
)
for shape_id in overlaps["shape_id"].unique()
}

missing = [
shape_id for shape_id, geom in exclusive_geometry.items() if geom is None
]
if missing:
raise ValueError(f"Shapes with no exclusive placement area: {missing}.")

placement_geometry = placement_shapes.set_index("shape_id")["geometry"]
overlaps["geometry"] = overlaps.apply(
lambda row: _place_slightly_inside(
row.geometry, exclusive_geometry[row["shape_id"]], inner_distance
row.geometry, placement_geometry.loc[row["shape_id"]], inner_distance
),
axis=1,
axis="columns",
)

suffixes = overlaps.groupby("powerplant_id").cumcount().astype(str)
Expand All @@ -217,13 +182,15 @@ def _split_shape_overlaps(
def assign_country_id(
prepared_powerplants: gpd.GeoDataFrame,
shapes: gpd.GeoDataFrame,
projected_crs: _utils.CRS,
exclusive_shapes: gpd.GeoDataFrame,
projected_crs: CRS,
inner_distance: float,
overlap_method: CountryOverlapMethod,
) -> CountryAssignmentResult:
"""Assign country_id, dropping out-of-scope plants and handling shape overlaps."""
output_columns = [*prepared_powerplants.columns, "country_id"]
shapes = shapes.to_crs(projected_crs)[["shape_id", "country_id", "geometry"]]
exclusive_shapes = exclusive_shapes[["shape_id", "geometry"]]
split_powerplant_ids = pd.Index([], dtype="object", name="powerplant_id")

assigned = (
Expand All @@ -243,7 +210,9 @@ def assign_country_id(
_raise_country_overlaps(overlaps)
elif overlap_method == "split_capacity":
split_rows = _split_shape_overlaps(
overlaps=overlaps, shapes=shapes, inner_distance=inner_distance
overlaps=overlaps,
placement_shapes=exclusive_shapes,
inner_distance=inner_distance,
)
assigned.loc[overlaps.index] = split_rows
split_powerplant_ids = pd.Index(
Expand Down Expand Up @@ -364,20 +333,19 @@ def _adjust_candidates(

def adjust_powerplant_location(
powerplants: gpd.GeoDataFrame,
shapes: gpd.GeoDataFrame,
exclusive_shapes: gpd.GeoDataFrame,
forced_shape_class: Mapping[str, str],
projected_crs: _utils.CRS,
projected_crs: CRS,
inner_distance: float,
on_error: OnError = "raise",
) -> LocationAdjustmentResult:
"""Adjust configured technologies to nearest valid shape in the same country."""
"""Adjust configured technologies to nearest valid exclusive shape in-country."""
plants = powerplants.to_crs(projected_crs).copy()
plants["target_shape_class"] = plants["technology"].map(forced_shape_class)

shapes = shapes.to_crs(projected_crs).copy()
candidates = plants[plants["target_shape_class"].notna()].copy()

has_target = _has_target_shape(candidates, shapes)
has_target = _has_target_shape(candidates, exclusive_shapes)
missing = candidates[~has_target]

if not missing.empty and on_error == "raise":
Expand All @@ -386,11 +354,11 @@ def adjust_powerplant_location(
drop_index = missing.index if on_error == "drop" else pd.Index([])
valid = candidates[has_target & ~candidates.index.isin(drop_index)]

already_correct = _already_correct_index(valid, shapes)
already_correct = _already_correct_index(valid, exclusive_shapes)
to_adjust = valid[~valid.index.isin(already_correct)]
moved_powerplant_ids = pd.Index(to_adjust["powerplant_id"], name="powerplant_id")

adjusted = _adjust_candidates(to_adjust, shapes, inner_distance)
adjusted = _adjust_candidates(to_adjust, exclusive_shapes, inner_distance)

result = plants.drop(index=drop_index)
if not adjusted.empty:
Expand All @@ -406,7 +374,7 @@ def adjust_powerplant_location(


def combine_powerplants(
file_paths: list[str], geo_crs: _utils.CRS, excluded: list[str]
file_paths: list[str], geo_crs: CRS, excluded: list[str]
) -> gpd.GeoDataFrame:
"""Combine internal category files and user files into a final dataset."""
combined = pd.concat(
Expand All @@ -424,7 +392,7 @@ def plot(
adjustment: LocationAdjustmentResult,
assignment: CountryAssignmentResult,
shapes: gpd.GeoDataFrame,
projected_crs: _utils.CRS,
projected_crs: CRS,
):
"""Plot final powerplants, highlighting split and forced-moved plants."""
fig, ax = plt.subplots(layout="constrained")
Expand Down Expand Up @@ -475,10 +443,21 @@ def plot(

def main() -> None:
"""Main snakemake process."""
# Open both shape files and ensure they match
shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes))
exclusive = _schemas.ShapeSchema.validate(
gpd.read_parquet(snakemake.input.exclusive_shapes)
)
mismatch = set(shapes["shape_id"].unique()) ^ set(exclusive["shape_id"].unique())
if mismatch:
raise ValueError(
f"Mismatched IDs between user and exclusive shapes {sorted(mismatch)}"
)

projected_crs = _utils.check_crs(snakemake.params.crs["projected"], "projected")
geographic_crs = _utils.check_crs(snakemake.params.crs["geographic"], "geographic")
if not projected_crs.equals(exclusive.crs):
exclusive = exclusive.to_crs(projected_crs)

location_cnf = snakemake.params.location_cnf
inner_distance = location_cnf["inner_distance"]
Expand All @@ -494,6 +473,7 @@ def main() -> None:
assignment = assign_country_id(
prepared_powerplants=combined,
shapes=shapes,
exclusive_shapes=exclusive,
projected_crs=projected_crs,
inner_distance=inner_distance,
overlap_method=location_cnf["on_overlap"],
Expand All @@ -502,7 +482,7 @@ def main() -> None:

adjustment = adjust_powerplant_location(
powerplants=assigned,
shapes=shapes,
exclusive_shapes=exclusive,
forced_shape_class=location_cnf.get("forced_class", {}),
projected_crs=projected_crs,
inner_distance=inner_distance,
Expand Down
107 changes: 107 additions & 0 deletions workflow/scripts/prepare_shapes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Precompute non-overlapping / exclusive shapes.

For each input shape, subtract all other shapes that intersect it to ensure exclusivity.
"""

import sys
from typing import TYPE_CHECKING, Any

import _schemas
import _utils
import geopandas as gpd
from shapely import make_valid, union_all
from shapely.geometry import MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry

if TYPE_CHECKING:
snakemake: Any


type PolygonLike = Polygon | MultiPolygon


def _polygonal_geometry(geometry: BaseGeometry) -> PolygonLike | None:
"""Return valid polygonal geometry, or None if no polygonal area remains."""
result = None

if not geometry.is_empty:
if not geometry.is_valid:
geometry = make_valid(geometry)

if isinstance(geometry, Polygon | MultiPolygon):
result = geometry
else:
polygons = [
part
for part in getattr(geometry, "geoms", [])
if isinstance(part, Polygon)
]

if polygons:
merged = union_all(polygons)
if isinstance(merged, Polygon | MultiPolygon):
result = merged

return result


def build_exclusive_shapes(shapes: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Subtract intersecting neighbor shapes from each shape.

Raises if any shape_id has no exclusive polygonal area left.
"""
shapes = shapes.reset_index(drop=True)
sindex = shapes.sindex

exclusive_geometries: list[PolygonLike] = []
removed_shape_ids: list[str] = []

for i, shape in shapes.iterrows():
neighbor_positions = [
j for j in sindex.query(shape.geometry, predicate="intersects") if j != i
]

exclusive = (
shape.geometry.difference(
union_all(shapes.geometry.iloc[neighbor_positions].to_list())
)
if neighbor_positions
else shape.geometry
)
exclusive = _polygonal_geometry(exclusive)

if exclusive is None:
removed_shape_ids.append(shape["shape_id"])
continue

exclusive_geometries.append(exclusive)

if removed_shape_ids:
raise ValueError(
"The following shapes have no exclusive placement area after removing "
"overlaps: "
f"{', '.join(map(str, removed_shape_ids))}. "
"This indicates overlapping input shapes that fully cover one or more "
"shape_id geometries."
)

shapes["geometry"] = exclusive_geometries
return shapes[["shape_id", "country_id", "shape_class", "geometry"]]


def main() -> None:
"""Main Snakemake process."""
projected_crs = _utils.check_crs(snakemake.params.crs, "projected")

shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes))
shapes = shapes.to_crs(projected_crs)

exclusive_shapes = build_exclusive_shapes(shapes)
exclusive_shapes = _schemas.ShapeSchema.validate(exclusive_shapes)

exclusive_shapes.to_parquet(snakemake.output.exclusive)


if __name__ == "__main__":
sys.stderr = open(snakemake.log[0], "w")
main()
Loading