From 7960855db6c8f863b099eaf65de64c63d6bd2628 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Sun, 12 Apr 2026 13:14:09 -0600 Subject: [PATCH 01/11] keep future projects in adjustment --- workflow/scripts/_utils.py | 83 +++++++------------ .../scripts/impute_capacity_adjustment.py | 42 +++++++++- 2 files changed, 71 insertions(+), 54 deletions(-) diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index 34ce3eb..daff6cf 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -69,42 +69,50 @@ def check_single_category(df: pd.DataFrame) -> str: def filter_years( - powerplants_df: pd.DataFrame, - year: int, - how: Literal["operating", "future"] = "operating", + powerplants_df: pd.DataFrame, year: int, how: Literal["operating", "future", "past"] ) -> pd.DataFrame: - """Standardised filtering of powerplants based on start / end years. + """Filter powerplants based on start/end year. + + Assumptions: + - A powerplant comes online on January 1 of `start_year`. + - A powerplant goes offline on January 1 of `end_year`. + - `end_year` > `start_year`. Args: - powerplants_df (pd.DataFrame): powerplant dataset to filter. - year (int): year to filter. - how (Literal["operating", "future"], optional): filtering approach. - Defaults to "operating". - - operating: only active powerplants in the given year. - - future: active and planned powerplant projects in the given year. + powerplants_df: Powerplant dataset to filter. + year: Reference year. + how: + - "operating": plants active during `year` + - "future": plants not yet online in `year` + - "past": plants already offline by `year` Returns: - pd.DataFrame: copy of the given dataframe after filtering applied. + A copy of the filtered dataframe. """ - if how == "operating": - filtered = powerplants_df[ - (powerplants_df["start_year"] <= year) & (year < powerplants_df["end_year"]) - ].copy() - elif how == "future": - # FIXME: this does not filter 'future' years at all! - filtered = powerplants_df[(powerplants_df["start_year"] <= year)].copy() - return filtered - - -def _clean_positive_capacity(plants: pd.DataFrame) -> pd.DataFrame: + match how: + case "operating": + mask = (powerplants_df["start_year"] <= year) & ( + year < powerplants_df["end_year"] + ) + case "future": + mask = powerplants_df["start_year"] > year + case "past": + mask = year >= powerplants_df["end_year"] + case _: + raise ValueError(f"Invalid request {how!r}.") + + return powerplants_df.loc[mask].copy() + + +def _clean_positive_capacity(df: pd.DataFrame) -> pd.DataFrame: """Remove rows with non-positive capacity.""" - return plants[plants["output_capacity_mw"] > 0].copy() + return df[df["output_capacity_mw"] > 0].copy() def get_adjusted_capacity( operating_plants: pd.DataFrame, expected_capacity: pd.Series ) -> pd.Series: - """Adjust powerplant capacity the total expected capacity per country. + """Adjust powerplant capacity to the total expected capacity per country. Args: operating_plants (pd.DataFrame): dataframe with all operating plants to adjust. @@ -120,33 +128,6 @@ def get_adjusted_capacity( return adjusted_cap_mw -def adjust_powerplant_capacity(plants, stats, year): - """Adjust powerplant capacity to national statistics in the given year. - - Keeps "future" projects beyond the given year. - """ - category = check_single_category(plants) - cat_stats = get_eia_stats_in_cat_yr(stats, year, category) - expected_capacity = cat_stats.groupby(["country_id"])["capacity_mw"].sum() - adjusted = _clean_positive_capacity(filter_years(plants, year, how="future")) - operating = filter_years(adjusted, year, how="operating") - operating = operating[operating["country_id"].isin(expected_capacity.index)] - - # Remove operating rows that cannot be matched to the requested year's stats. - adjusted = adjusted.drop( - index=filter_years(adjusted, year, how="operating").index.difference( - operating.index - ) - ) - - if operating.empty: - return adjusted.reset_index(drop=True) - - adjusted_cap = get_adjusted_capacity(operating, expected_capacity) - adjusted.loc[adjusted_cap.index, "output_capacity_mw"] = adjusted_cap - return adjusted.reset_index(drop=True) - - def adjust_aggregated_capacity(plants, stats, year): """Adjust capacity to national statistics in the given year.""" category = check_single_category(plants) diff --git a/workflow/scripts/impute_capacity_adjustment.py b/workflow/scripts/impute_capacity_adjustment.py index 02eee99..f27b8f0 100644 --- a/workflow/scripts/impute_capacity_adjustment.py +++ b/workflow/scripts/impute_capacity_adjustment.py @@ -11,7 +11,43 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") + + +def adjustment(plants: pd.DataFrame, stats: pd.DataFrame, year: int) -> pd.DataFrame: + """Adjust operating plant capacity to national statistics in the given year. + + Rules: + - Past plants are kept unchanged. + - Future plants are kept unchanged. + - Operating plants are adjusted to match the statistics. + - If a country's expected operating capacity is missing or zero in that year, + operating plants in that country are removed. + """ + category = _utils.check_single_category(plants) + cat_stats = _utils.get_eia_stats_in_cat_yr(stats, year, category) + expected_capacity = cat_stats.groupby("country_id")["capacity_mw"].sum() + + # isolate powerplants operating in the requested year + adjusted = plants.copy() + operating = _utils.filter_years(adjusted, year, how="operating") + non_operating = adjusted.drop(index=operating.index) + + # empty placeholder for currently operating powerplants + adjusted_operating = operating.head(0).copy() + if not operating.empty and not expected_capacity.empty: + positive_expected = expected_capacity[expected_capacity > 0] + operating_to_adjust = operating[ + operating["country_id"].isin(positive_expected.index) + ].copy() + + if not operating_to_adjust.empty: + operating_to_adjust["output_capacity_mw"] = _utils.get_adjusted_capacity( + operating_to_adjust, positive_expected + ) + adjusted_operating = operating_to_adjust + + adjusted = pd.concat([non_operating, adjusted_operating], ignore_index=True) + return adjusted def adjust_powerplant_capacity( @@ -27,19 +63,19 @@ def adjust_powerplant_capacity( if plants.empty: adjusted_plants = plants else: - adjusted_plants = _utils.adjust_powerplant_capacity(plants, stats, year) + adjusted_plants = adjustment(plants, stats, year) _schemas.PlantSchema.validate(adjusted_plants).to_parquet(output_file) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") adjust_powerplant_capacity( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.unadjusted, year=_utils.DATASET_YEAR, output_file=snakemake.output.adjusted, ) - _plots.plot_capacity_adjustment( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.unadjusted, From b9a13f80658fd2d5a01da3549ae397a214845f42 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Sun, 12 Apr 2026 18:52:20 -0600 Subject: [PATCH 02/11] Schema check for zero capacity --- workflow/scripts/_schemas.py | 4 ++-- workflow/scripts/_utils.py | 4 ++-- workflow/scripts/aggregate_capacity.py | 2 +- workflow/scripts/impute_adjustment_solar.py | 2 +- workflow/scripts/prepare_fossil.py | 2 ++ 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/workflow/scripts/_schemas.py b/workflow/scripts/_schemas.py index b83d83d..ecdac54 100644 --- a/workflow/scripts/_schemas.py +++ b/workflow/scripts/_schemas.py @@ -55,7 +55,7 @@ class Config: country_id: Series[str] category: Series[str] technology: Series[str] - output_capacity_mw: Series[float] + output_capacity_mw: Series[float] = Field(gt=0) chp: Series[bool] | None ccs: Series[bool] | None fuel_class: Series[str] | None @@ -78,7 +78,7 @@ class Config: "General category of the powerplant." technology: Series[str] "Subcategory of the powerplant, if necessary." - output_capacity_mw: Series[float] = Field(ge=0) + output_capacity_mw: Series[float] = Field(gt=0) "Powerplant gross output capacity in Megawatts." # Temporal aspects start_year: Series[float] diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index daff6cf..ec635ad 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -104,7 +104,7 @@ def filter_years( return powerplants_df.loc[mask].copy() -def _clean_positive_capacity(df: pd.DataFrame) -> pd.DataFrame: +def ensure_positive_capacity(df: pd.DataFrame) -> pd.DataFrame: """Remove rows with non-positive capacity.""" return df[df["output_capacity_mw"] > 0].copy() @@ -134,7 +134,7 @@ def adjust_aggregated_capacity(plants, stats, year): stats = get_eia_stats_in_cat_yr(stats, year, category) expected_capacity = stats.groupby(["country_id"])["capacity_mw"].sum() - adjusted = _clean_positive_capacity(plants) + adjusted = ensure_positive_capacity(plants) adjusted = adjusted[adjusted["country_id"].isin(expected_capacity.index)] if adjusted.empty: diff --git a/workflow/scripts/aggregate_capacity.py b/workflow/scripts/aggregate_capacity.py index 11ecc00..55f6af0 100644 --- a/workflow/scripts/aggregate_capacity.py +++ b/workflow/scripts/aggregate_capacity.py @@ -57,7 +57,7 @@ def capacity(powerplant_file: str, shapes_file: str, year: int, output_file: str agg_plants_df = pd.concat(agg_plants_arr, axis="index", ignore_index=True) agg_plants_df.attrs["year"] = year - agg_plants_df = _utils._clean_positive_capacity(agg_plants_df) + agg_plants_df = _utils.ensure_positive_capacity(agg_plants_df) _schemas.AggregatedPlantSchema.validate(agg_plants_df).to_parquet(output_file) diff --git a/workflow/scripts/impute_adjustment_solar.py b/workflow/scripts/impute_adjustment_solar.py index c029cdd..c4e9d97 100644 --- a/workflow/scripts/impute_adjustment_solar.py +++ b/workflow/scripts/impute_adjustment_solar.py @@ -33,7 +33,7 @@ def capacity_solar( # Combine and clean the data solar_mw = pd.concat([agg_roof_pv_cap, large_pv], ignore_index=True) - solar_mw = _utils._clean_positive_capacity(solar_mw) + solar_mw = _utils.ensure_positive_capacity(solar_mw) solar_mw.attrs = large_pv.attrs | agg_roof_pv_cap.attrs return _schemas.AggregatedPlantSchema.validate(solar_mw) diff --git a/workflow/scripts/prepare_fossil.py b/workflow/scripts/prepare_fossil.py index e290a53..eb7e0e2 100644 --- a/workflow/scripts/prepare_fossil.py +++ b/workflow/scripts/prepare_fossil.py @@ -103,6 +103,7 @@ def prepare_gem_gcpt( "fuel_class": fuel_class, } ).reset_index(drop=True) + coal_df = _utils.ensure_positive_capacity(coal_df) schema = _schemas.build_schema(technology_mapping, "prepare") return schema.validate(coal_df), _schemas.FuelSchema.validate(fuels_df) @@ -141,6 +142,7 @@ def prepare_gem_gogpt( "fuel_class": fuel_class, } ).reset_index(drop=True) + oil_gas_df = _utils.ensure_positive_capacity(oil_gas_df) schema = _schemas.build_schema(technology_mapping, "prepare") return schema.validate(oil_gas_df), _schemas.FuelSchema.validate(fuels_df) From 4b8bf5a7c3387de94e17ec405754536e3c94aea9 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Mon, 13 Apr 2026 10:08:40 -0600 Subject: [PATCH 03/11] use representative points for utility PV powerplants --- workflow/rules/impute.smk | 1 - workflow/scripts/_utils.py | 20 ++++++++++++++++++++ workflow/scripts/impute_years.py | 16 +++------------- workflow/scripts/prepare_large_solar.py | 8 ++++---- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index 6bf92ac..e891c06 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -16,7 +16,6 @@ rule impute_years: "../envs/powerplants.yaml" params: imputation=config["imputation"], - projected_crs=config["crs"]["projected"], tech_map=lambda wc: get_technology_mapping(wc.category), message: "National-level imputation of missing years for all powerplants in {wildcards.shapes}-{wildcards.category} dataset." diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index ec635ad..5c0a15f 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -5,12 +5,32 @@ import geopandas as gpd import pandas as pd from pandas.api.types import is_list_like +from pyproj import CRS # Average year where powerplant datasets were last updated. # MUST BE ADJUSTED WHENEVER DATASOURCES ARE UPDATED! DATASET_YEAR = 2023 +def check_crs(crs: int | str, how: Literal["projected", "geographic", "geocentric"]) -> CRS: + """Helper to verify user-provided CRS codes.""" + parsed = CRS.from_user_input(crs) + correct = False + match how: + case "projected": + if parsed.is_projected: + correct = True + case "geographic": + if parsed.is_geographic: + correct = True + case "geocentric": + if parsed.is_geocentric: + correct = True + if not correct: + raise ValueError(f"{crs!r} is not {how!r}.") + return parsed + + def listify(item) -> list: """Avoids ambiguity in YAML list parameters.""" return item if is_list_like(item) else [item] diff --git a/workflow/scripts/impute_years.py b/workflow/scripts/impute_years.py index 792f6c7..d4fd07a 100644 --- a/workflow/scripts/impute_years.py +++ b/workflow/scripts/impute_years.py @@ -92,7 +92,6 @@ def impute( countries_gdf: gpd.GeoDataFrame, imputation: dict, technology_mapping: dict, - projected_crs: str, ) -> gpd.GeoDataFrame: """Add automatic and user imputations to fill missing data. @@ -112,17 +111,9 @@ def impute( prepared_cat_gdf.geometry.geom_type != "Point" ].index if polygon_mask.any(): - if projected_crs: - prev_crs = prepared_cat_gdf.crs - prepared_cat_gdf.loc[polygon_mask, "geometry"] = ( - prepared_cat_gdf.loc[polygon_mask, "geometry"] - .to_crs(projected_crs) - .centroid.to_crs(prev_crs) - ) - else: - raise ValueError( - "Polygon powerplant geometries detected. Specify a projected CRS." - ) + raise ValueError( + "Polygon powerplant geometries detected. Only Points are supported." + ) lifetimes = imputation["lifetime_years"] retirement_delay_years = imputation["retirement_delay_years"] @@ -194,7 +185,6 @@ def main() -> None: countries_gdf=gpd.read_parquet(snakemake.input.dissolved_shapes), imputation=snakemake.params.imputation, technology_mapping=snakemake.params.tech_map, - projected_crs=snakemake.params.projected_crs, ) imputed_gdf.to_parquet(snakemake.output.imputed) _plots.plot_powerplant_capacity_buildup( diff --git a/workflow/scripts/prepare_large_solar.py b/workflow/scripts/prepare_large_solar.py index 6077d85..ab0cc50 100644 --- a/workflow/scripts/prepare_large_solar.py +++ b/workflow/scripts/prepare_large_solar.py @@ -149,9 +149,9 @@ def prepare_solar_utility_pv( valid_status=["announced", "pre-construction", "construction", "retired"], ) - utility_pv = pd.concat([filled_tz_df, gem_mismatch_df], axis="index") - utility_pv = utility_pv.reset_index(drop=True) - + utility_pv = pd.concat([filled_tz_df, gem_mismatch_df], ignore_index=True) + # Convert to points to make further processing easier. + utility_pv["geometry"] = utility_pv["geometry"].representative_point() schema = _schemas.build_schema({"utility_pv": tech_name}, "prepare") return schema.validate(utility_pv) @@ -188,7 +188,6 @@ def prepare_solar_csp( def main() -> None: """Main snakemake process.""" - sys.stderr = open(snakemake.log[0], "w") utility_pv_gdf = prepare_solar_utility_pv( snakemake.input.tz_sam, snakemake.input.gem_gspt, @@ -205,4 +204,5 @@ def main() -> None: if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() From c1b34b567e2664ffa444c3e36325a1461f99a193 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:26:18 -0600 Subject: [PATCH 04/11] standardise CRS management to internal setting --- workflow/internal/settings.yaml | 2 + workflow/rules/impute.smk | 3 +- workflow/rules/prepare.smk | 28 +++++++------- workflow/scripts/_utils.py | 2 +- .../scripts/impute_category_combination.py | 7 +++- workflow/scripts/impute_years.py | 1 - workflow/scripts/prepare_bioenergy.py | 7 +++- workflow/scripts/prepare_fossil.py | 15 +++++--- workflow/scripts/prepare_geothermal.py | 10 +++-- workflow/scripts/prepare_hydropower.py | 22 +++++------ workflow/scripts/prepare_large_solar.py | 11 ++++-- workflow/scripts/prepare_nuclear.py | 10 +++-- workflow/scripts/prepare_wind_gwpt.py | 38 ++++++++++++------- workflow/scripts/prepare_wind_wemi.py | 32 +++++++++------- workflow/scripts/proxy.py | 12 +----- 15 files changed, 113 insertions(+), 87 deletions(-) diff --git a/workflow/internal/settings.yaml b/workflow/internal/settings.yaml index 43236bc..168c0d0 100644 --- a/workflow/internal/settings.yaml +++ b/workflow/internal/settings.yaml @@ -1,4 +1,6 @@ # Module settings that users cannot modify. +crs: + geographic: "EPSG:4326" resources: automatic: # Links for automatically downloaded files diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index e891c06..b0d7115 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -56,8 +56,9 @@ rule impute_category_combination: conda: "../envs/powerplants.yaml" params: - tech_map=lambda wc: get_technology_mapping(f"{wc.category}"), + geo_crs=internal["crs"]["geographic"], excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), + tech_map=lambda wc: get_technology_mapping(f"{wc.category}"), message: "National-level imputation of user-configured inclusions and exclusions for {wildcards.shapes}-{wildcards.category}." script: diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index d7fe38c..fcb4eda 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -35,6 +35,7 @@ rule prepare_hydropower: "../envs/powerplants.yaml" params: technology_mapping=config["category"]["hydropower"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"] message: "Preparing hydropower powerplants using the GloHydroRes dataset." script: @@ -55,6 +56,7 @@ rule prepare_large_solar: dc_ac_ratio=config["category"]["solar"]["dc_ac_ratio"]["utility_pv"], utility_pv_name=config["category"]["solar"]["technology_mapping"]["utility_pv"], csp_name=config["category"]["solar"]["technology_mapping"]["csp"], + geo_crs=internal["crs"]["geographic"] message: "Preparing utility PV powerplants using the TZ-SAM and GEM-GSPT datasets." script: @@ -65,7 +67,6 @@ if config["category"]["wind"]["source"] == "gem": rule prepare_wind_gem: input: - script=workflow.source_path("../scripts/prepare_wind_gwpt.py"), gem_gwpt=rules.download_gem.output.path.format(dataset="GWPT"), output: path="/automatic/prepared/wind.parquet", @@ -75,19 +76,16 @@ if config["category"]["wind"]["source"] == "gem": "../envs/powerplants.yaml" params: tech_map=config["category"]["wind"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"] message: "Preparing wind powerplants using the Global Wind Power Tracker (GEM-GWPT) dataset." - shell: - """ - python {input.script:q} {input.gem_gwpt:q} \ - -t "{params.tech_map}" -o {output:q} 2> {log:q} - """ + script: + "../scripts/prepare_wind_gwpt.py" elif config["category"]["wind"]["source"] == "wemi": rule prepare_wind_wemi: input: - script=workflow.source_path("../scripts/prepare_wind_wemi.py"), wemi="", output: path="/automatic/prepared/wind.parquet", @@ -96,14 +94,12 @@ elif config["category"]["wind"]["source"] == "wemi": conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], tech_map=config["category"]["wind"]["technology_mapping"], message: "Preparing wind powerplants using the Wind Energy Market Intelligence dataset." - shell: - """ - python {input.script:q} {input.wemi:q} \ - -t "{params.tech_map}" -o {output.path:q} 2> {log:q} - """ + script: + "../scripts/prepare_wind_wemi.py" else: raise ValueError( @@ -122,8 +118,9 @@ rule prepare_bioenergy: conda: "../envs/powerplants.yaml" params: - technology_mapping=config["category"]["bioenergy"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], fuel_mapping=internal["fuel_mapping"] | config["fuel_mapping"], + technology_mapping=config["category"]["bioenergy"]["technology_mapping"], message: "Preparing bioenergy powerplants using the Global Bioenergy Power Tracker (GBPT) dataset." script: @@ -144,8 +141,9 @@ rule prepare_fossil: conda: "../envs/powerplants.yaml" params: - technology_mapping=config["category"]["fossil"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], fuel_mapping=internal["fuel_mapping"] | config["fuel_mapping"], + technology_mapping=config["category"]["fossil"]["technology_mapping"], message: "Preparing fossil powerplants using the GOGPT and GCPT datasets." script: @@ -162,6 +160,7 @@ rule prepare_nuclear: conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], technology_mapping=config["category"]["nuclear"]["technology_mapping"], message: "Preparing nuclear powerplants using the Global Nuclear Power Tracker (GNPT) dataset." @@ -179,6 +178,7 @@ rule prepare_geothermal: conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], technology_mapping=config["category"]["geothermal"]["technology_mapping"], message: "Preparing geothermal powerplants using the Global Geothermal Power Tracker (GGPT) dataset." diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index 5c0a15f..2258b95 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -61,7 +61,7 @@ def get_point_col( raw: pd.DataFrame, lon_col: str, lat_col: str, crs: str = "EPSG:4326" ) -> gpd.GeoSeries: """Converts latitude / longitude columns to a point geometry.""" - return gpd.points_from_xy(raw[lon_col], raw[lat_col], crs=crs) + return gpd.points_from_xy(raw[lon_col], raw[lat_col], crs=check_crs(crs, "geographic")) def get_combined_text_col( diff --git a/workflow/scripts/impute_category_combination.py b/workflow/scripts/impute_category_combination.py index 6d8fd57..a467ec3 100644 --- a/workflow/scripts/impute_category_combination.py +++ b/workflow/scripts/impute_category_combination.py @@ -14,18 +14,19 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def impute( input_paths: list[str], output_path: str, tech_mapping: dict[str, str], + geo_crs: str, excluded: list[str] | None = None, ): """Combine a given number of category files into a final dataset.""" + crs = _utils.check_crs(geo_crs, "geographic") combined_capacity = pd.concat( - (gpd.read_parquet(f) for f in input_paths), + (gpd.read_parquet(f).to_crs(crs) for f in input_paths), ignore_index=True, sort=False, axis="index", @@ -63,10 +64,12 @@ def explore(imputed_path: str, output_path: str, colormap="tab20"): if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") impute( input_paths=[snakemake.input.internal, *snakemake.input.user], output_path=snakemake.output.combined, tech_mapping=snakemake.params.tech_map, + geo_crs=snakemake.params.geo_crs, excluded=snakemake.params.excluded, ) plot(imputed_path=snakemake.output.combined, output_path=snakemake.output.plot) diff --git a/workflow/scripts/impute_years.py b/workflow/scripts/impute_years.py index d4fd07a..a625a1d 100644 --- a/workflow/scripts/impute_years.py +++ b/workflow/scripts/impute_years.py @@ -101,7 +101,6 @@ def impute( output_path (str): resulting dataset. imputation (str): imputation configuration. technology_mapping (str): technology mapping configuration. - projected_crs (str): crs used to calculate centroids. """ _utils.check_single_category(prepared_cat_gdf) diff --git a/workflow/scripts/prepare_bioenergy.py b/workflow/scripts/prepare_bioenergy.py index 1d042b3..547e88e 100644 --- a/workflow/scripts/prepare_bioenergy.py +++ b/workflow/scripts/prepare_bioenergy.py @@ -32,6 +32,7 @@ def main( gem_gbpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], + crs: str, output_plants_path: str, output_fuels_path: str, ): @@ -55,11 +56,12 @@ def main( "start_year": _start_year(raw_df), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": False, "chp": False, "fuel_class": fuel_class, - } + }, + crs=crs ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(bioenergy_df).to_parquet(output_plants_path) @@ -71,6 +73,7 @@ def main( gem_gbpt_path=snakemake.input.gem_gbpt, technology_mapping=snakemake.params.technology_mapping, fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, output_fuels_path=snakemake.output.fuels, ) diff --git a/workflow/scripts/prepare_fossil.py b/workflow/scripts/prepare_fossil.py index eb7e0e2..1940889 100644 --- a/workflow/scripts/prepare_fossil.py +++ b/workflow/scripts/prepare_fossil.py @@ -73,7 +73,7 @@ def _get_end_year( def prepare_gem_gcpt( - gem_gcpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str] + gem_gcpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], crs:str ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain coal power locations using GEM-GCPT data.""" raw_df = gem.read_gem_dataset(gem_gcpt_path, ["Units"]) @@ -97,11 +97,12 @@ def prepare_gem_gcpt( "start_year": gem.year_col(raw_df, "start"), "end_year": _get_end_year(raw_df, planned_ret_col="planned_retirement"), "status": gem.status_col(raw_df, mapping=STATUS_MAPPING_COAL), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": _get_coal_ccs(raw_df), "chp": False, # Not specified in GCPT "fuel_class": fuel_class, - } + }, + crs=crs ).reset_index(drop=True) coal_df = _utils.ensure_positive_capacity(coal_df) schema = _schemas.build_schema(technology_mapping, "prepare") @@ -112,6 +113,7 @@ def prepare_gem_gogpt( gem_gogpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], + crs: str ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain oil and gas power plants using GEM-GOGPT data.""" raw_df = gem.read_gem_dataset( @@ -136,11 +138,12 @@ def prepare_gem_gogpt( "start_year": gem.year_col(raw_df, "start"), "end_year": _get_end_year(raw_df, planned_ret_col="planned_retire"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": raw_df["ccs_attachment?"] == "yes", "chp": raw_df["chp"] == "yes", "fuel_class": fuel_class, - } + }, + crs=crs ).reset_index(drop=True) oil_gas_df = _utils.ensure_positive_capacity(oil_gas_df) schema = _schemas.build_schema(technology_mapping, "prepare") @@ -154,6 +157,7 @@ def main() -> None: gem_gogpt_path=snakemake.input.gem_gogpt, technology_mapping=snakemake.params.technology_mapping["oil_gas"], fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs ) og_plants.to_parquet(snakemake.output.og_plants) og_fuels.to_parquet(snakemake.output.og_fuels) @@ -163,6 +167,7 @@ def main() -> None: gem_gcpt_path=snakemake.input.gem_gcpt, technology_mapping=snakemake.params.technology_mapping["coal"], fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs ) coal_plants.to_parquet(snakemake.output.coal_plants) coal_fuels.to_parquet(snakemake.output.coal_fuels) diff --git a/workflow/scripts/prepare_geothermal.py b/workflow/scripts/prepare_geothermal.py index 12fec71..7ddf8eb 100644 --- a/workflow/scripts/prepare_geothermal.py +++ b/workflow/scripts/prepare_geothermal.py @@ -10,11 +10,10 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def main( - gem_ggpt_path: str, technology_mapping: dict[str, str], output_plants_path: str + gem_ggpt_path: str, technology_mapping: dict[str, str], crs: str, output_plants_path: str ): """Obtain geothermal power plants using GEM-GGPT data.""" raw_df = gem.read_gem_dataset( @@ -35,16 +34,19 @@ def main( "start_year": gem.year_col(raw_df, "start"), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(geo_df).to_parquet(output_plants_path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main( gem_ggpt_path=snakemake.input.gem_ggpt, technology_mapping=snakemake.params.technology_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, ) diff --git a/workflow/scripts/prepare_hydropower.py b/workflow/scripts/prepare_hydropower.py index f8ff632..c7315b2 100644 --- a/workflow/scripts/prepare_hydropower.py +++ b/workflow/scripts/prepare_hydropower.py @@ -11,7 +11,6 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def _technology(gem_df: pd.DataFrame, tech_mapping: dict[str, str]) -> pd.Series: @@ -37,9 +36,12 @@ def _reservoir_km3(gem_df: pd.DataFrame) -> pd.Series: return gem_df["res_vol_km3"].where(gem_df["res_vol_km3"] >= 0, np.nan) -def main(input_path: str, output_path: str, technology_mapping: dict[str, str]): +def main(): """Prepare a cleaned hydropower dataset.""" - raw_df = pd.read_csv(input_path) + raw_df = pd.read_csv(snakemake.input.glohydrores_path) + technology_mapping = snakemake.params.technology_mapping + crs = snakemake.params.geo_crs + raw_df = raw_df.dropna(subset=["capacity_mw", "plant_lon", "plant_lat"]) hydro_df = gpd.GeoDataFrame( { @@ -51,17 +53,15 @@ def main(input_path: str, output_path: str, technology_mapping: dict[str, str]): "start_year": raw_df["year"], "end_year": np.nan, "status": "operating", - "geometry": _utils.get_point_col(raw_df, "plant_lon", "plant_lat"), + "geometry": _utils.get_point_col(raw_df, "plant_lon", "plant_lat", crs), "reservoir_km3": _reservoir_km3(raw_df), - } + }, + crs=crs, ) schema = _schemas.build_schema(technology_mapping, "prepare") - schema.validate(hydro_df).to_parquet(output_path) + schema.validate(hydro_df).to_parquet(snakemake.output.output_path) if __name__ == "__main__": - main( - input_path=snakemake.input.glohydrores_path, - output_path=snakemake.output.output_path, - technology_mapping=snakemake.params.technology_mapping, - ) + sys.stderr = open(snakemake.log[0], "w") + main() diff --git a/workflow/scripts/prepare_large_solar.py b/workflow/scripts/prepare_large_solar.py index ab0cc50..daef7e0 100644 --- a/workflow/scripts/prepare_large_solar.py +++ b/workflow/scripts/prepare_large_solar.py @@ -59,9 +59,9 @@ def fill_tz_with_gem( def get_gem_v_tz_mismatch( gem_df: gpd.GeoDataFrame, tz_sam_df: gpd.GeoDataFrame, - valid_status: list[str] | None = None, + valid_status: list[str], buffer=1000, - proj_crs="epsg:3857", + proj_crs="epsg:8857", ) -> gpd.GeoDataFrame: """Estimation of future projects missed by TZ-SAM's satellite-based approach. @@ -69,6 +69,7 @@ def get_gem_v_tz_mismatch( - Only projects with known pre-operation states are accepted. - Only projects outside a buffer are accepted (default 1 km). """ + _utils.check_crs(proj_crs, "projected") if gem_df.crs != tz_sam_df.crs: raise ValueError("GEM and TZ-SAM CRS mismatch.") @@ -76,7 +77,7 @@ def get_gem_v_tz_mismatch( if valid_status: future_gem_df = gem_df[gem_df["status"].isin(valid_status)] else: - future_gem_df = gem_df + raise ValueError(f"Must specify valid status to get. Got {valid_status!r}.") # Buffer around the points using a projected CRS future_gem_buffered = future_gem_df.copy() @@ -196,11 +197,13 @@ def main() -> None: ) csp_gdf = prepare_solar_csp(snakemake.input.gem_gspt, snakemake.params.csp_name) + crs = _utils.check_crs(snakemake.params.geo_crs, "geographic") + # Combine into one large category large_solar_gdf = pd.concat( [utility_pv_gdf, csp_gdf], ignore_index=True, sort=False, axis="index" ) - large_solar_gdf.to_parquet(snakemake.output.large_solar) + large_solar_gdf.to_crs(crs).to_parquet(snakemake.output.large_solar) if __name__ == "__main__": diff --git a/workflow/scripts/prepare_nuclear.py b/workflow/scripts/prepare_nuclear.py index f7adbd1..0018c5f 100644 --- a/workflow/scripts/prepare_nuclear.py +++ b/workflow/scripts/prepare_nuclear.py @@ -11,11 +11,10 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def main( - gem_gnpt_path: str, technology_mapping: dict[str, str], output_plants_path: str + gem_gnpt_path: str, technology_mapping: dict[str, str], crs:str, output_plants_path: str ): """Obtain nuclear power plants using GEM-GNPT data.""" raw_df = gem.read_gem_dataset(gem_gnpt_path, ["Data"]) @@ -38,16 +37,19 @@ def main( raw_df["retirement_date"], format="mixed" ).dt.year, "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(nuclear_df).to_parquet(output_plants_path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main( gem_gnpt_path=snakemake.input.gem_gnpt, technology_mapping=snakemake.params.technology_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, ) diff --git a/workflow/scripts/prepare_wind_gwpt.py b/workflow/scripts/prepare_wind_gwpt.py index 7eb5fb1..3eebd8d 100644 --- a/workflow/scripts/prepare_wind_gwpt.py +++ b/workflow/scripts/prepare_wind_gwpt.py @@ -4,13 +4,17 @@ A free alternative to the WEMI dataset. """ +import sys +from typing import TYPE_CHECKING, Any + import _gem as gem import _schemas import _utils -import click import geopandas as gpd import pandas as pd -import yaml + +if TYPE_CHECKING: + snakemake: Any INTERNAL_TECH_MAPPING = { "Onshore": "onshore", @@ -24,19 +28,15 @@ def _technology(gem_df: pd.DataFrame): return gem_df["installation_type"].apply(lambda x: INTERNAL_TECH_MAPPING[x]) -@click.command() -@click.argument("gem_gwpt_path", type=click.Path(dir_okay=False)) -@click.option("-o", "output_path", type=click.Path(dir_okay=False), required=True) -@click.option("-t", "tech_mapping", type=str, required=True) -def main(gem_gwpt_path: str, output_path: str, tech_mapping: str): +def prepare_gem_gwpt( + gem_gwpt_path: str, tech_mapping: dict, crs: str +) -> gpd.GeoDataFrame: """Obtain wind power locations using GEM-GWPT data.""" raw_df = gem.read_gem_dataset(gem_gwpt_path, gem.GEM_GWPT_SHEETS) # Remove unknown installation types to avoid misplacement raw_df = raw_df[raw_df["installation_type"] != "Unknown"] raw_df = raw_df.dropna(subset=["installation_type"]) - tech_map = yaml.safe_load(tech_mapping) - wind_df = gpd.GeoDataFrame( { "powerplant_id": _utils.get_combined_text_col( @@ -46,17 +46,27 @@ def main(gem_gwpt_path: str, output_path: str, tech_mapping: str): raw_df, ["project_name", "phase_name"] ), "category": "wind", - "technology": _technology(raw_df).map(tech_map), + "technology": _technology(raw_df).map(tech_mapping), "output_capacity_mw": raw_df["capacity_(mw)"], "start_year": gem.year_col(raw_df, "start"), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs, + ) + schema = _schemas.build_schema(tech_mapping, "prepare") + return schema.validate(wind_df) + + +def main(): + """Main snakemake process.""" + gem_gwpt = prepare_gem_gwpt( + snakemake.input.gem_gwpt, snakemake.params.tech_map, snakemake.params.geo_crs ) - schema = _schemas.build_schema(tech_map, "prepare") - schema.validate(wind_df).to_parquet(output_path) + gem_gwpt.to_parquet(snakemake.output.path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() diff --git a/workflow/scripts/prepare_wind_wemi.py b/workflow/scripts/prepare_wind_wemi.py index a7f3c60..81fbde6 100644 --- a/workflow/scripts/prepare_wind_wemi.py +++ b/workflow/scripts/prepare_wind_wemi.py @@ -4,14 +4,18 @@ Available here: https://www.thewindpower.net/about_en.php """ +import sys +from typing import TYPE_CHECKING, Any + import _schemas -import click import geopandas as gpd import numpy as np import pandas as pd -import yaml from _utils import get_point_col +if TYPE_CHECKING: + snakemake: Any + WEMI_CRS = "EPSG:4326" KW_TO_MW = 1 / 1000 STATUS_MAPPING = { @@ -42,19 +46,14 @@ def _start_year(raw_df: pd.DataFrame) -> pd.Series: return pd.to_numeric(year, errors="coerce") -@click.command() -@click.argument("input_path", type=click.Path(dir_okay=False)) -@click.option("-o", "output_path", type=click.Path(dir_okay=False), required=True) -@click.option("-t", "technology_mapping", type=str, required=True) -def main(input_path: str, output_path: str, technology_mapping: str): - """Saves a standardised and validated version of the WEMI dataset.""" +def prepare_wemi(wemi_path: str, tech_map: dict, crs: str)-> gpd.GeoDataFrame: + """Standardised and validated version of the WEMI dataset.""" raw_df = pd.read_excel( - input_path, + wemi_path, sheet_name="Windfarms", skiprows=[1], na_values=["#ND"], # codespell:ignore ) - tech_map = yaml.safe_load(technology_mapping) # Cleanup columns with problematic empty values. raw_df = raw_df.dropna(subset=["Total power", "Latitude", "Longitude"]) @@ -69,13 +68,20 @@ def main(input_path: str, output_path: str, technology_mapping: str): "start_year": start_year, "end_year": np.nan, "status": raw_df["Status"].replace(STATUS_MAPPING), - "geometry": get_point_col(raw_df, "Longitude", "Latitude", crs=WEMI_CRS), + "geometry": get_point_col(raw_df, "Longitude", "Latitude", crs=crs), }, - crs=WEMI_CRS, + crs=crs, ) schema = _schemas.build_schema(tech_map, "prepare") - schema.validate(processed_df).to_parquet(output_path) + return schema.validate(processed_df) + + +def main(): + """Saves a standardised and validated version of the WEMI dataset.""" + wemi_gdf = prepare_wemi(snakemake.input.wemi, snakemake.params.tech_map, snakemake.params.geo_crs) + wemi_gdf.to_parquet(snakemake.output.path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() diff --git a/workflow/scripts/proxy.py b/workflow/scripts/proxy.py index 4c151ff..6dfd4ea 100644 --- a/workflow/scripts/proxy.py +++ b/workflow/scripts/proxy.py @@ -12,18 +12,11 @@ import rioxarray as rxr from cmap import Colormap from matplotlib import pyplot as plt -from xarray import DataArray if TYPE_CHECKING: snakemake: Any -def _check_crs(raster: DataArray): - crs = raster.rio.crs - if crs is None or not crs.is_valid: - raise ValueError(f"The provided raster has an invalid CRS: {crs}.") - - def _get_borders_gdf(shapes_file: str) -> gpd.GeoDataFrame: """Constructs country borders by removing marine regions and dissolving land regions. @@ -69,7 +62,6 @@ def proxy_rooftop_pv_capacity( borders_df = _get_borders_gdf(shapes_file).set_index("country_id") stats_df = pd.read_parquet(stats_file) area_potential_da = rxr.open_rasterio(proxy_file).squeeze() # type: ignore[union-attr] - _check_crs(area_potential_da) agg_unadjusted_df = pd.read_parquet(aggregated_unadjusted_file) unadj_cap_mw = agg_unadjusted_df.groupby("country_id")["output_capacity_mw"].sum() @@ -107,7 +99,6 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ shapes_gdf = gpd.read_parquet(shapes_file) area_potential_da = rxr.open_rasterio(proxy_file).squeeze() # type: ignore[union-attr] - _check_crs(area_potential_da) # Compute a coarsening factor to avoid memory limits nx, ny = area_potential_da.sizes["x"], area_potential_da.sizes["y"] @@ -125,11 +116,10 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ cbar_kwargs={"location": "right", "label": "Proxied potential"}, alpha=1, ) + # project to the raster's CRS for speed shapes_gdf.to_crs(area_potential_da.rio.crs).geometry.boundary.plot( ax=ax, color="lightgrey", linewidth=0.3, alpha=0.5 ) - ax.set_xlabel("Longitude") - ax.set_ylabel("Latitude") ax.set_title(f"Aggregation proxy (coarsened ~{pixel_count:.1e} pixels)") fig.savefig(output_file, bbox_inches="tight") From 9d2bc7d866862161fce3e3d52ef537ce3eb8cd8a Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 08:05:30 -0600 Subject: [PATCH 05/11] New impute_location rule --- config/config.yaml | 23 +- pixi.lock | 42 +-- pixi.toml | 2 +- workflow/internal/config.schema.yaml | 70 +++- workflow/rules/impute.smk | 28 ++ workflow/scripts/_plots.py | 6 +- workflow/scripts/impute_location.py | 530 +++++++++++++++++++++++++++ 7 files changed, 662 insertions(+), 39 deletions(-) create mode 100644 workflow/scripts/impute_location.py diff --git a/config/config.yaml b/config/config.yaml index 2f71762..f1804d6 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -91,7 +91,28 @@ fuel_mapping: "fossil gas: unknown": "natural gas" imputation: - shape_overlap_correction: "split_capacity" # strict or split_capacity + location: + inner_distance: 100 # matches projected CRS unit. + on_overlap: "split_capacity" + on_forced_class_error: "drop" # either 'drop', 'ignore', or 'raise' + forced_class: + bioenergy turbine: "land" + ccgt: "land" + coal turbine: "land" + concentrating solar power: "land" + high enthalpy geothermal: "land" + igcc: "land" + low enthalpy geothermal: "land" + nuclear reactor: "land" + ocgt: "land" + offshore: "maritime" + onshore: "land" + pumped storage: "land" + reciprocating engine: "land" + reservoir: "land" + run of river: "land" + steam turbine: "land" + utility pv: "land" scenario: "near-future" # near-future, far-future, far-off-future lifetime_years: # General lifetime assumptions. Adjust as needed! diff --git a/pixi.lock b/pixi.lock index 37f887f..196b3c7 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1082,7 +1082,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/msgpack-python-1.1.2-py312hd9148b4_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/muparser-2.3.5-h5888daf_0.conda - - conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.0-py312h4c3975b_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.1-py312h4c3975b_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/ncurses-6.5-h2d0b736_3.conda @@ -1352,7 +1352,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/osx-arm64/msgpack-python-1.1.2-py312h84eede6_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/muparser-2.3.5-h11e0b38_0.conda - - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.0-py312hefc2c51_0.conda + - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.1-py312hefc2c51_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/ncurses-6.5-h5e97a16_3.conda @@ -1599,7 +1599,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/win-64/msgpack-python-1.1.2-py312hf90b1b7_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/win-64/muparser-2.3.5-he0c23c2_0.conda - - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.0-py312he06e257_0.conda + - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.1-py312he06e257_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/networkx-3.6.1-pyhcf101f3_0.conda @@ -11973,9 +11973,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 18931157 timestamp: 1752534887158 -- conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.0-py312h4c3975b_0.conda - sha256: bb9ab670f94099843167170b81b597921890ffb0f56cb2222c1e611213b09f7e - md5: a1f58c8babb356d64f70c380dde5df2b +- conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.1-py312h4c3975b_0.conda + sha256: b9dc656d1f78e75ae07d57aefae6613ac94d55a4b33aabb35daa9451e624ef2f + md5: 672833a9f4e00c5e4ddbfa53b2aad6ee depends: - __glibc >=2.17,<3.0.a0 - libgcc >=14 @@ -11989,9 +11989,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 22053001 - timestamp: 1775234215387 + - pkg:pypi/mypy?source=compressed-mapping + size: 21984603 + timestamp: 1776069494897 - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.17.0-py312h163523d_0.conda sha256: 0fb78906dbcb3daa1c9b9aa9047985ef1cacf2d2e64d271c1b9e7570ae4ed1fc md5: 77b10261bad8e87c9ebb6cebd493c394 @@ -12010,9 +12010,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 10410020 timestamp: 1752534951317 -- conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.0-py312hefc2c51_0.conda - sha256: 002e080ca7731b473e41eed2ab79357b070419ff8613b6ad531ef756aabbb649 - md5: 16e653b99c021b140cfe6f2cc23ee02a +- conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.1-py312hefc2c51_0.conda + sha256: 66a5cae2d9107025d42f8fe013450c8fdd110afe1c8723420d1ca296445eb337 + md5: a84404f9fb0eeb402eec5ef06bc2522c depends: - __osx >=11.0 - mypy_extensions >=1.0.0 @@ -12026,9 +12026,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 11840059 - timestamp: 1775234973068 + - pkg:pypi/mypy?source=compressed-mapping + size: 11876678 + timestamp: 1776070219837 - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.17.0-py312he06e257_0.conda sha256: 93fc4abec9042deff9f882135601f90159fec0e88b9f2ac939a99a491b8c9be7 md5: 7a66ebebe26ae807bc24c1bc7661ad33 @@ -12048,9 +12048,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 10032459 timestamp: 1752534651646 -- conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.0-py312he06e257_0.conda - sha256: ebadc3b1c357ac2a8f2c68666f675e7004923f7f83faf22fbf239f404894b927 - md5: d461b246f4afe6f77f8ce37b8f66c23d +- conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.1-py312he06e257_0.conda + sha256: 142c45e7cd0ede5ffadf693f40c868f638b89b94a9983b4c8844c93342274255 + md5: 0e38dda3b76708bbc4bbfaff5bacf820 depends: - mypy_extensions >=1.0.0 - pathspec >=1.0.0 @@ -12065,9 +12065,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 11766260 - timestamp: 1775234467870 + - pkg:pypi/mypy?source=compressed-mapping + size: 11666119 + timestamp: 1776069830794 - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda sha256: 6ed158e4e5dd8f6a10ad9e525631e35cee8557718f83de7a4e3966b1f772c4b1 md5: e9c622e0d00fa24a6292279af3ab6d06 diff --git a/pixi.toml b/pixi.toml index 91df14c..3ebf745 100644 --- a/pixi.toml +++ b/pixi.toml @@ -46,7 +46,7 @@ python = "3.12.9.*" pyyaml = "6.0.2.*" xlrd = "2.0.1.*" pip = "25.1.1.*" -mypy = "==1.20.0" +mypy = "1.20.1.*" [feature.powerplants.pypi-dependencies] gregor = { git = "https://github.com/jnnr/gregor.git", rev = "fix-memory-explosion" } diff --git a/workflow/internal/config.schema.yaml b/workflow/internal/config.schema.yaml index d3adbb5..fce56d2 100644 --- a/workflow/internal/config.schema.yaml +++ b/workflow/internal/config.schema.yaml @@ -17,10 +17,9 @@ $defs: type: string minLength: 1 - nonzero_number: + positive_number: type: number - not: - const: 0 + exclusiveMinimum: 0 year_map: type: object @@ -246,7 +245,7 @@ properties: additionalProperties: false properties: utility_pv: - $ref: '#/$defs/nonzero_number' + $ref: '#/$defs/positive_number' required: - utility_pv required: @@ -353,20 +352,63 @@ properties: type: object additionalProperties: false required: - - shape_overlap_correction + - location - scenario - lifetime_years - retirement_delay_years properties: - shape_overlap_correction: - title: Shape overlap correction method - description: | - Approach to handling powerplants that land on shape overlaps. - Either 'strict' (raise errors) or 'split_capacity' (divide powerplant capacity equally across overlapping shapes). - type: string - enum: - - strict - - split_capacity + location: + title: Location imputation + description: Settings in this section affect how the module modifies the location of powerplants. + type: object + additionalProperties: false + required: + - inner_distance + - on_overlap + - on_forced_class_error + properties: + inner_distance: + title: Inner distance + description: | + Distance, in projected CRS units, used when relocating powerplant points into valid placement areas. + Applies both to overlap-split plants and to plants forced into a target shape class. + The point is placed approximately this far inside the selected polygon from the nearest boundary/contact point. + If the available interior segment is shorter, the midpoint of that segment is used. + $ref: '#/$defs/positive_number' + on_overlap: + title: Overlap adjustment method + description: | + Controls how to handle powerplants that overlap on more than one shape. + - "raise" stops the workflow with an error listing the overlapping powerplants. + - "split_capacity" creates smaller duplicate powerplants in each intersecting shape, moved using `inner_distance`. + type: string + enum: + - raise + - split_capacity + on_forced_class_error: + title: Forced shape class adjustment error handling + description: | + Controls how to handle powerplants whose technology is configured for a forced `shape_class`, but no matching shape exists in that plant's country. + - "raise": Stop the workflow with an error listing the failing country/shape_class/technology combinations. + - "ignore": Leave those powerplants unchanged. + - "drop": Remove those powerplants. + type: string + enum: + - raise + - ignore + - drop + forced_class: + title: Forced `shape_class` adjustment. + description: | + Optional. Maps powerplant technologies to a `shape_class`. + Powerplants with a listed technology are moved to the nearest matching shape class within their assigned country. + Allowed values are "land" and "maritime". + type: object + additionalProperties: + type: string + enum: + - land + - maritime scenario: title: Scenario description: > diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index b0d7115..ae2a126 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -1,5 +1,33 @@ """Rules related to internal and user-provided imputations.""" +rule impute_location: + input: + internal="/automatic/prepared/{category}.parquet", + shapes="", + user=branch( + exists(""), + then=[""], + otherwise=[], + ), + output: + adjusted="/automatic/shapes/{shapes}/impute_location/{category}.parquet", + plot="/automatic/shapes/{shapes}/impute_location/{category}_location_adjustment.png", + log: + "/{shapes}/{category}/impute_location.log", + wildcard_constraints: + category="|".join(IMPUTED_CAT), + conda: + "../envs/powerplants.yaml" + params: + crs=internal["crs"]|config["crs"], + location_cnf=config["imputation"]["location"], + excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), + tech_mapping=lambda wc: get_technology_mapping(f"{wc.category}"), + message: + "Imputation of user-configured location adjustment for {wildcards.shapes}-{wildcards.category}." + script: + "../scripts/impute_location.py" + rule impute_years: input: diff --git a/workflow/scripts/_plots.py b/workflow/scripts/_plots.py index 38138af..353bd66 100644 --- a/workflow/scripts/_plots.py +++ b/workflow/scripts/_plots.py @@ -10,13 +10,15 @@ from cmap import Colormap from matplotlib import pyplot as plt from matplotlib import ticker as mticker +from matplotlib.axes import Axes from matplotlib.patches import Patch -def draw_empty(ax, title, message="No data available"): +def draw_empty(ax: Axes, title: str="", message="No data available"): """Helper to render an empty-data placeholder.""" ax.text(0.5, 0.5, message, ha="center", va="center", fontsize=12, alpha=0.7) - ax.set_title(title) + if title: + ax.set_title(title) ax.set_axis_off() diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py new file mode 100644 index 0000000..a0eceff --- /dev/null +++ b/workflow/scripts/impute_location.py @@ -0,0 +1,530 @@ +"""Adjust powerplant point locations to valid placement shapes. + +First, this will: +- combine powerplant categories and user provided files +- apply optional exclusions +- assign country IDs to in-scope powerplants +- split or reject powerplants falling in overlapping shapes + +Then, configured technologies are moved (per country) to the nearest shape matching +their requested shape class. +- Plants already inside a valid shape are kept in place. +- Plants outside are nudged just inside the nearest valid shape while minimizing +movement from their original location. +""" + +import sys +from collections.abc import Iterator, Mapping +from dataclasses import dataclass +from math import hypot +from typing import TYPE_CHECKING, Any, Literal + +import _schemas +import _utils +import geopandas as gpd +import pandas as pd +from matplotlib import pyplot as plt +from shapely import union_all +from shapely.geometry import LineString, MultiPolygon, Point, Polygon +from shapely.geometry.base import BaseGeometry +from shapely.ops import nearest_points + +if TYPE_CHECKING: + snakemake: Any + + +type OnError = Literal["raise", "ignore", "drop"] +type CountryOverlapMethod = Literal["raise", "split_capacity"] +type PolygonLike = Polygon | MultiPolygon + + +@dataclass(frozen=True) +class CountryAssignmentResult: + """Assigned powerplants and split-created IDs.""" + + powerplants: gpd.GeoDataFrame + split_powerplant_ids: pd.Index + + +@dataclass(frozen=True) +class LocationAdjustmentResult: + """Adjusted powerplants and forced-moved IDs.""" + + powerplants: gpd.GeoDataFrame + moved_powerplant_ids: pd.Index + + +def _nearest_part(point: Point, geometry: PolygonLike) -> Polygon: + """Return the polygon part closest to point.""" + if isinstance(geometry, MultiPolygon): + return min(geometry.geoms, key=point.distance) + return geometry + + +def _line_strings(geometry: BaseGeometry) -> Iterator[LineString]: + """Yield non-empty LineStrings from a possibly multipart geometry.""" + if geometry.is_empty: + return + + if isinstance(geometry, LineString) and geometry.length > 0: + yield geometry + return + + for part in getattr(geometry, "geoms", []): + yield from _line_strings(part) + + +def _entry_cut(ray: LineString, polygon: Polygon, contact: Point) -> LineString | None: + """Return the first interior segment entered from contact.""" + cut = min( + _line_strings(ray.intersection(polygon)), + key=lambda line: line.distance(contact), + default=None, + ) + + if cut is not None: + coords = list(cut.coords) + if Point(coords[-1]).distance(contact) < Point(coords[0]).distance(contact): + cut = LineString(coords[::-1]) + + return cut + + +def _polygon_span(polygon: Polygon) -> float: + """Return a conservative span length for extending a ray.""" + minx, miny, maxx, maxy = polygon.bounds + return hypot(maxx - minx, maxy - miny) + + +def _inward_ray( + point: Point, polygon: Polygon, inner_distance: float +) -> tuple[LineString, Point, Point]: + """Build a straight ray from the nearest contact point into the polygon.""" + contact = nearest_points(point, polygon)[1] + fallback = polygon.representative_point() + + dx = contact.x - point.x + dy = contact.y - point.y + length = hypot(dx, dy) + + if length == 0: + dx = fallback.x - contact.x + dy = fallback.y - contact.y + length = hypot(dx, dy) + + if length == 0: + return LineString([contact, fallback]), contact, fallback + + extension = _polygon_span(polygon) + inner_distance + end = Point( + contact.x + extension * dx / length, contact.y + extension * dy / length + ) + + return LineString([contact, end]), contact, fallback + + +def _point_on_cut(cut: LineString, inner_distance: float) -> Point: + """Return inner-distance point, or midpoint if the cut is too short.""" + distance = inner_distance if inner_distance < cut.length else cut.length / 2 + return cut.interpolate(distance) + + +def _place_slightly_inside( + point: Point, polygon: PolygonLike, inner_distance: float +) -> Point: + """Place point just inside polygon using the nearest inward cut.""" + polygon = _nearest_part(point, polygon) + ray, contact, fallback = _inward_ray(point, polygon, inner_distance) + + cut = _entry_cut(ray, polygon, contact) + if cut is None: + cut = _entry_cut(LineString([contact, fallback]), polygon, contact) + + placed = _point_on_cut(cut, inner_distance) if cut is not None else fallback + return placed if placed.within(polygon) else fallback + + +def _polygonal_geometry(geometry: BaseGeometry) -> PolygonLike | None: + """Return polygonal geometry, or None if no polygonal area remains.""" + if geometry.is_empty: + return None + + if isinstance(geometry, Polygon | MultiPolygon): + return geometry + + polygons = [] + for part in getattr(geometry, "geoms", []): + if isinstance(part, Polygon): + polygons.append(part) + elif isinstance(part, MultiPolygon): + polygons.extend(part.geoms) + + if not polygons: + return None + + merged = union_all(polygons) + return merged if isinstance(merged, Polygon | MultiPolygon) else None + + +def _raise_country_overlaps(overlaps: gpd.GeoDataFrame) -> None: + """Raise an error for powerplants assigned to multiple countries.""" + duplicate_ids = overlaps["powerplant_id"].drop_duplicates() + raise ValueError( + "Found powerplants intersecting multiple countries: " + f"{', '.join(map(str, duplicate_ids))}. " + "Please adjust your shapes, or enable the 'split_capacity' correction." + ) + + +def _split_shape_overlaps( + overlaps: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Split overlapping rows and move them into shape-exclusive areas.""" + overlaps = overlaps.copy() + + counts = overlaps["powerplant_id"].map(overlaps["powerplant_id"].value_counts()) + overlaps["output_capacity_mw"] /= counts + + shape_geometry = shapes.set_index("shape_id")["geometry"] + exclusive_geometry = { + shape_id: _polygonal_geometry( + shape_geometry.loc[shape_id].difference( + union_all(shape_geometry.drop(index=shape_id).to_list()) + ) + ) + for shape_id in overlaps["shape_id"].unique() + } + + missing = [ + shape_id for shape_id, geom in exclusive_geometry.items() if geom is None + ] + if missing: + raise ValueError(f"Shapes with no exclusive placement area: {missing}.") + + overlaps["geometry"] = overlaps.apply( + lambda row: _place_slightly_inside( + row.geometry, exclusive_geometry[row["shape_id"]], inner_distance + ), + axis=1, + ) + + suffixes = overlaps.groupby("powerplant_id").cumcount().astype(str) + overlaps["powerplant_id"] += "_duplicate_" + suffixes + + return overlaps + + +def assign_country_id( + prepared_powerplants: gpd.GeoDataFrame, + shapes: gpd.GeoDataFrame, + projected_crs: _utils.CRS, + inner_distance: float, + overlap_method: CountryOverlapMethod, +) -> CountryAssignmentResult: + """Assign country_id, dropping out-of-scope plants and handling shape overlaps.""" + output_columns = [*prepared_powerplants.columns, "country_id"] + shapes = shapes.to_crs(projected_crs)[["shape_id", "country_id", "geometry"]] + split_powerplant_ids = pd.Index([], dtype="object", name="powerplant_id") + + assigned = ( + gpd.sjoin( + prepared_powerplants.to_crs(projected_crs), + shapes, + predicate="intersects", + how="inner", + ) + .drop(columns="index_right") + .reset_index(drop=True) + ) + + overlaps = assigned[assigned["powerplant_id"].duplicated(keep=False)] + if not overlaps.empty: + if overlap_method == "raise": + _raise_country_overlaps(overlaps) + elif overlap_method == "split_capacity": + split_rows = _split_shape_overlaps( + overlaps=overlaps, shapes=shapes, inner_distance=inner_distance + ) + assigned.loc[overlaps.index] = split_rows + split_powerplant_ids = pd.Index( + split_rows["powerplant_id"], name="powerplant_id" + ) + else: + raise ValueError(f"Invalid overlap method {overlap_method!r}.") + + return CountryAssignmentResult( + powerplants=assigned.to_crs(prepared_powerplants.crs)[output_columns], + split_powerplant_ids=split_powerplant_ids, + ) + + +def _has_target_shape( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame +) -> pd.Series: + """Return whether each candidate has a matching shape class in-country.""" + available = pd.MultiIndex.from_frame( + shapes[["country_id", "shape_class"]].drop_duplicates() + ) + requested = pd.MultiIndex.from_frame( + candidates[["country_id", "target_shape_class"]] + ) + + return pd.Series(requested.isin(available), index=candidates.index) + + +def _raise_missing_shapes(missing: gpd.GeoDataFrame) -> None: + """Raise a compact error describing missing country/class combinations.""" + summary = ( + missing[["technology", "country_id", "target_shape_class"]] + .value_counts() + .rename("count") + .reset_index() + ) + details = "; ".join( + f"technology={row.technology!r}, country_id={row.country_id!r}, " + f"shape_class={row.target_shape_class!r} ({row['count']} plants)" + for _, row in summary.iterrows() + ) + raise ValueError(f"No matching shapes found: {details}.") + + +def _already_correct_index( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame +) -> pd.Index: + """Return candidate rows already inside a valid target shape.""" + if candidates.empty: + return pd.Index([]) + + shape_lookup = shapes[["shape_id", "country_id", "shape_class", "geometry"]].rename( + columns={"country_id": "shape_country_id", "shape_class": "matched_shape_class"} + ) + inside = gpd.sjoin(candidates, shape_lookup, how="inner", predicate="within") + inside = inside[ + (inside["country_id"] == inside["shape_country_id"]) + & (inside["target_shape_class"] == inside["matched_shape_class"]) + ] + + return inside.index.unique() + + +def _adjust_group( + group: gpd.GeoDataFrame, eligible_shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Adjust one country/class group to each point's nearest eligible shape.""" + matched = gpd.sjoin_nearest( + group, + eligible_shapes[["shape_id", "geometry"]], + how="left", + distance_col="_distance", + ) + + geometry_by_shape_id = eligible_shapes.set_index("shape_id")["geometry"] + matched["matched_geometry"] = matched["shape_id"].map(geometry_by_shape_id) + + matched = matched.sort_values(["powerplant_id", "_distance", "shape_id"]).loc[ + lambda df: ~df.index.duplicated(keep="first") + ] + + matched["geometry"] = matched.apply( + lambda row: _place_slightly_inside( + row.geometry, row.matched_geometry, inner_distance + ), + axis=1, + ) + + return matched[["geometry"]] + + +def _adjust_candidates( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Adjust all misplaced candidates to nearest valid shapes.""" + adjusted_parts = [] + + for (country_id, shape_class), group in candidates.groupby( + ["country_id", "target_shape_class"], sort=False + ): + eligible_shapes = shapes.loc[ + (shapes["country_id"] == country_id) + & (shapes["shape_class"] == shape_class), + ["shape_id", "geometry"], + ] + adjusted_parts.append(_adjust_group(group, eligible_shapes, inner_distance)) + + adjusted = ( + pd.concat(adjusted_parts) + if adjusted_parts + else gpd.GeoDataFrame( + columns=["geometry"], geometry="geometry", crs=candidates.crs + ) + ) + + return gpd.GeoDataFrame(adjusted, geometry="geometry", crs=candidates.crs) + + +def adjust_powerplant_location( + powerplants: gpd.GeoDataFrame, + shapes: gpd.GeoDataFrame, + forced_shape_class: Mapping[str, str], + projected_crs: _utils.CRS, + inner_distance: float, + on_error: OnError = "raise", +) -> LocationAdjustmentResult: + """Adjust configured technologies to nearest valid shape in the same country.""" + plants = powerplants.to_crs(projected_crs).copy() + plants["target_shape_class"] = plants["technology"].map(forced_shape_class) + + shapes = shapes.to_crs(projected_crs).copy() + candidates = plants[plants["target_shape_class"].notna()].copy() + + has_target = _has_target_shape(candidates, shapes) + missing = candidates[~has_target] + + if not missing.empty and on_error == "raise": + _raise_missing_shapes(missing) + + drop_index = missing.index if on_error == "drop" else pd.Index([]) + valid = candidates[has_target & ~candidates.index.isin(drop_index)] + + already_correct = _already_correct_index(valid, shapes) + to_adjust = valid[~valid.index.isin(already_correct)] + moved_powerplant_ids = pd.Index(to_adjust["powerplant_id"], name="powerplant_id") + + adjusted = _adjust_candidates(to_adjust, shapes, inner_distance) + + result = plants.drop(index=drop_index) + if not adjusted.empty: + result.loc[adjusted.index, "geometry"] = adjusted["geometry"] + + result = result.drop(columns="target_shape_class") + result = result[powerplants.columns] + result = result.to_crs(powerplants.crs) + + return LocationAdjustmentResult( + powerplants=result, moved_powerplant_ids=moved_powerplant_ids + ) + + +def combine_powerplants( + file_paths: list[str], geo_crs: _utils.CRS, excluded: list[str] +) -> gpd.GeoDataFrame: + """Combine internal category files and user files into a final dataset.""" + combined = pd.concat( + (gpd.read_parquet(f).to_crs(geo_crs) for f in file_paths), + ignore_index=True, + sort=False, + ) + combined = combined[~combined["powerplant_id"].isin(_utils.listify(excluded))] + if not combined.empty: + _utils.check_single_category(combined) + return combined + + +def plot( + adjustment: LocationAdjustmentResult, + assignment: CountryAssignmentResult, + shapes: gpd.GeoDataFrame, + projected_crs: _utils.CRS, +): + """Plot final powerplants, highlighting split and forced-moved plants.""" + fig, ax = plt.subplots(layout="constrained") + + adjusted = adjustment.powerplants.to_crs(projected_crs).set_index( + "powerplant_id", drop=False + ) + shapes = shapes.to_crs(projected_crs) + + split = adjusted.loc[adjusted.index.intersection(assignment.split_powerplant_ids)] + moved = adjusted.loc[adjusted.index.intersection(adjustment.moved_powerplant_ids)] + + highlighted = split.index.union(moved.index) + other = adjusted.loc[adjusted.index.difference(highlighted)] + + if not shapes.empty: + shapes.boundary.plot( + ax=ax, + linewidth=0.5, + color="black", + label=f"Shapes (n={len(shapes)})", + rasterized=True, + ) + + layers = [ + (other, "Valid plants"), + (split, "Split due to overlaps"), + (moved, "Forced shape class"), + ] + + for data, label in layers: + if not data.empty: + data.plot( + ax=ax, + markersize=8 if label == "Valid plants" else 28, + marker=".", + alpha=0.25 if label == "Valid plants" else 0.9, + label=f"{label} (n={len(data)})", + rasterized=True, + ) + + ax.set_aspect("equal") + ax.set_axis_off() + + handles, labels = ax.get_legend_handles_labels() + if handles: + ax.legend( + handles, labels, loc="upper left", bbox_to_anchor=(1, 1), borderaxespad=0 + ) + + return fig, ax + + +def main() -> None: + """Main snakemake process.""" + shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes)) + + projected_crs = _utils.check_crs(snakemake.params.crs["projected"], "projected") + geographic_crs = _utils.check_crs(snakemake.params.crs["geographic"], "geographic") + + location_cnf = snakemake.params.location_cnf + inner_distance = location_cnf["inner_distance"] + + combined = combine_powerplants( + file_paths=[snakemake.input.internal, *snakemake.input.user], + geo_crs=geographic_crs, + excluded=snakemake.params.excluded, + ) + schema = _schemas.build_schema(snakemake.params.tech_mapping, "prepare") + combined = schema.validate(combined) + + assignment = assign_country_id( + prepared_powerplants=combined, + shapes=shapes, + projected_crs=projected_crs, + inner_distance=inner_distance, + overlap_method=location_cnf["on_overlap"], + ) + assigned = schema.validate(assignment.powerplants) + + adjustment = adjust_powerplant_location( + powerplants=assigned, + shapes=shapes, + forced_shape_class=location_cnf.get("forced_class", {}), + projected_crs=projected_crs, + inner_distance=inner_distance, + on_error=location_cnf["on_forced_class_error"], + ) + adjusted = schema.validate(adjustment.powerplants) + adjusted.to_parquet(snakemake.output.adjusted) + + fig, _ = plot( + adjustment=adjustment, + assignment=assignment, + shapes=shapes, + projected_crs=projected_crs, + ) + fig.savefig(snakemake.output.plot, dpi=200, bbox_inches="tight") + + +if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") + main() From 77f0dfe3719d81dfba6aa20191a65cf6fbe9e89c Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 08:27:59 -0600 Subject: [PATCH 06/11] add report for location adjustment --- workflow/report/impute_location.rst | 1 + workflow/rules/impute.smk | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 workflow/report/impute_location.rst diff --git a/workflow/report/impute_location.rst b/workflow/report/impute_location.rst new file mode 100644 index 0000000..9681fa7 --- /dev/null +++ b/workflow/report/impute_location.rst @@ -0,0 +1 @@ +Powerplant relocation based on the imputation->location configuration. diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index ae2a126..f4a02f0 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -11,7 +11,12 @@ rule impute_location: ), output: adjusted="/automatic/shapes/{shapes}/impute_location/{category}.parquet", - plot="/automatic/shapes/{shapes}/impute_location/{category}_location_adjustment.png", + plot=report( + "/automatic/shapes/{shapes}/impute_location/{category}_location_adjustment.png", + caption="../report/impute_location.rst", + category="Powerplants module", + subcategory="{category}", + ) log: "/{shapes}/{category}/impute_location.log", wildcard_constraints: From 083b8c8c6bf4396d94eae30e9f7db7615f51b648 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 09:23:32 -0600 Subject: [PATCH 07/11] Simplified year imputation: clearer scenario names, no location adjustment --- config/config.yaml | 2 +- workflow/internal/config.schema.yaml | 17 ++-- ...tion_map.rst => impute_years_explorer.rst} | 0 ...stogram.rst => impute_years_histogram.rst} | 4 - workflow/rules/impute.smk | 22 +++-- workflow/scripts/impute_location.py | 4 +- workflow/scripts/impute_years.py | 86 ++++++------------- 7 files changed, 51 insertions(+), 84 deletions(-) rename workflow/report/{impute_category_combination_map.rst => impute_years_explorer.rst} (100%) rename workflow/report/{impute_category_combination_histogram.rst => impute_years_histogram.rst} (52%) diff --git a/config/config.yaml b/config/config.yaml index f1804d6..eda0aed 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -113,7 +113,7 @@ imputation: run of river: "land" steam turbine: "land" utility pv: "land" - scenario: "near-future" # near-future, far-future, far-off-future + scenario: "construction" # Options: historical->construction->pre_construction->announced lifetime_years: # General lifetime assumptions. Adjust as needed! bioenergy turbine: 25 diff --git a/workflow/internal/config.schema.yaml b/workflow/internal/config.schema.yaml index fce56d2..71f3197 100644 --- a/workflow/internal/config.schema.yaml +++ b/workflow/internal/config.schema.yaml @@ -411,19 +411,18 @@ properties: - maritime scenario: title: Scenario - description: > - Determines which 'future' facilities to keep beyond the adjustment year. From less to more: - + description: | + Determines which 'future' facilities to keep beyond the adjustment year. In order of lower to higher uncertainty: - historical: only keep existing capacities. - - near-future: keeps the above plus projects in construction (RECOMMENDED). - - far-future: keeps the above plus projects in pre-construction stages (e.g., those processing permits). - - far-off-future: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. + - construction: keeps the above plus projects already in construction (RECOMMENDED). + - pre_construction: keeps the above plus projects in pre-construction stages (e.g., those processing permits). + - announced: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. type: string enum: - historical - - near-future - - far-future - - far-off-future + - construction + - pre_construction + - announced lifetime_years: $ref: '#/$defs/year_map' retirement_delay_years: diff --git a/workflow/report/impute_category_combination_map.rst b/workflow/report/impute_years_explorer.rst similarity index 100% rename from workflow/report/impute_category_combination_map.rst rename to workflow/report/impute_years_explorer.rst diff --git a/workflow/report/impute_category_combination_histogram.rst b/workflow/report/impute_years_histogram.rst similarity index 52% rename from workflow/report/impute_category_combination_histogram.rst rename to workflow/report/impute_years_histogram.rst index 67a0fd4..625cf9b 100644 --- a/workflow/report/impute_category_combination_histogram.rst +++ b/workflow/report/impute_years_histogram.rst @@ -1,5 +1 @@ Histogram of evolving {{ snakemake.wildcards.category }} capacity in each country in {{ snakemake.wildcards.shapes }}. -Determined by: - -* Imputed lifetimes and retirement delay. -* Imputed powerplant exclusions and inclusions. diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index f4a02f0..17d2a32 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -10,9 +10,9 @@ rule impute_location: otherwise=[], ), output: - adjusted="/automatic/shapes/{shapes}/impute_location/{category}.parquet", + relocated="/automatic/shapes/{shapes}/impute_location/{category}.parquet", plot=report( - "/automatic/shapes/{shapes}/impute_location/{category}_location_adjustment.png", + "/automatic/shapes/{shapes}/impute_location/{category}_relocation.png", caption="../report/impute_location.rst", category="Powerplants module", subcategory="{category}", @@ -36,11 +36,21 @@ rule impute_location: rule impute_years: input: - prepared="/automatic/prepared/{category}.parquet", - dissolved_shapes=rules.prepare_shapes.output.dissolved, + relocated=rules.impute_location.output.relocated, output: - imputed="/automatic/shapes/{shapes}/imputed/{category}.parquet", - plot="/automatic/shapes/{shapes}/imputed/{category}.pdf", + imputed="/{shapes}/powerplants/unadjusted/{category}.parquet", + plot=report( + "/{shapes}/powerplants/unadjusted/{category}_histogram.pdf", + caption="../report/impute_years_histogram.rst", + category="Powerplants module", + subcategory="{category}", + ), + explorer=report( + "/{shapes}/powerplants/unadjusted/{category}_explorer.html", + caption="../report/impute_years_explorer.rst", + category="Powerplants module", + subcategory="{category}", + ), log: "/{shapes}/{category}/impute_years.log", wildcard_constraints: diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py index a0eceff..4d96b03 100644 --- a/workflow/scripts/impute_location.py +++ b/workflow/scripts/impute_location.py @@ -427,7 +427,7 @@ def plot( projected_crs: _utils.CRS, ): """Plot final powerplants, highlighting split and forced-moved plants.""" - fig, ax = plt.subplots(layout="constrained") + fig, ax = plt.subplots(figsize=(8,8), layout="constrained") adjusted = adjustment.powerplants.to_crs(projected_crs).set_index( "powerplant_id", drop=False @@ -514,7 +514,7 @@ def main() -> None: on_error=location_cnf["on_forced_class_error"], ) adjusted = schema.validate(adjustment.powerplants) - adjusted.to_parquet(snakemake.output.adjusted) + adjusted.to_parquet(snakemake.output.relocated) fig, _ = plot( adjustment=adjustment, diff --git a/workflow/scripts/impute_years.py b/workflow/scripts/impute_years.py index a625a1d..84b704c 100644 --- a/workflow/scripts/impute_years.py +++ b/workflow/scripts/impute_years.py @@ -15,9 +15,9 @@ HISTORICAL = {"operating", "retired"} SCENARIO_MAP = { "historical": HISTORICAL, - "near-future": HISTORICAL | {"construction"}, - "far-future": HISTORICAL | {"construction", "pre-construction"}, - "far-off-future": HISTORICAL | {"construction", "pre-construction", "announced"}, + "construction": HISTORICAL | {"construction"}, + "pre_construction": HISTORICAL | {"construction", "pre-construction"}, + "announced": HISTORICAL | {"construction", "pre-construction", "announced"}, } @@ -88,28 +88,18 @@ def _impute_status(df: pd.DataFrame) -> pd.Series: def impute( - prepared_cat_gdf: gpd.GeoDataFrame, - countries_gdf: gpd.GeoDataFrame, - imputation: dict, - technology_mapping: dict, + relocated_gdf: gpd.GeoDataFrame, imputation: dict, technology_mapping: dict ) -> gpd.GeoDataFrame: """Add automatic and user imputations to fill missing data. Args: - prepared_cat_gdf (gpd.GeoDataFrame): cleaned category dataset following our schema. - countries_gdf (str): country-level shapes to use. - output_path (str): resulting dataset. + relocated_gdf (gpd.GeoDataFrame): relocated powerplants (must have country_id). imputation (str): imputation configuration. technology_mapping (str): technology mapping configuration. """ - _utils.check_single_category(prepared_cat_gdf) - - # Re-map polygons to their centroid to simplify further processing. - # TODO: consider splitting them between shapes instead? - polygon_mask = prepared_cat_gdf[ - prepared_cat_gdf.geometry.geom_type != "Point" - ].index - if polygon_mask.any(): + _utils.check_single_category(relocated_gdf) + + if (relocated_gdf.geometry.geom_type != "Point").any(): raise ValueError( "Polygon powerplant geometries detected. Only Points are supported." ) @@ -118,13 +108,8 @@ def impute( retirement_delay_years = imputation["retirement_delay_years"] scenario = SCENARIO_MAP[imputation["scenario"]] - # Get facilities within the provided regions and for the given scenario - imputed = gpd.sjoin( - prepared_cat_gdf[prepared_cat_gdf["status"].isin(scenario)], - countries_gdf, - predicate="intersects", - how="inner", - ).drop("index_right", axis="columns") + # Get facilities within the requested scenario + imputed = relocated_gdf[relocated_gdf["status"].isin(scenario)].copy() if not imputed.empty: # Adjust project dates @@ -132,63 +117,40 @@ def impute( imputed["end_year"] = _impute_end_year( imputed, lifetimes, retirement_delay_years ) - - # Drop projects with insufficient date data and then adjust status. + # Drop projects with insufficient date data imputed = imputed.dropna(subset=["start_year", "end_year"]) + # Update the powerplant status imputed["status"] = _impute_status(imputed) - imputed = handle_polygon_overlaps( - imputed, imputation["shape_overlap_correction"] - ) - schema = _schemas.build_schema(technology_mapping, "impute") return schema.validate(imputed) -def handle_polygon_overlaps(df: gpd.GeoDataFrame, method: str) -> gpd.GeoDataFrame: - """Handle duplicate powerplant assignments caused by overlapping shapes.""" - if method not in {"strict", "split_capacity"}: - raise ValueError( - f"Unsupported polygon overlap method {method!r}. " - "Expected 'strict' or 'split_capacity'." - ) - - dup_mask = df["powerplant_id"].duplicated(keep=False) - - if dup_mask.any(): - duplicate_ids = df.loc[dup_mask, "powerplant_id"] - - if method == "strict": - raise ValueError( - "Found duplicate IDs, likely due to overlapping polygons: " - f"{', '.join(map(str, duplicate_ids.unique()))}. " - "Please adjust your shapes, or enable the 'split_capacity' correction." - ) - - counts = duplicate_ids.map(df["powerplant_id"].value_counts()) - - df.loc[dup_mask, "output_capacity_mw"] = ( - df.loc[dup_mask, "output_capacity_mw"] / counts +def explore(imputed: gpd.GeoDataFrame, output_path: str, colormap="tab20"): + """Create a HTML map for users to explore.""" + if imputed.empty: + with open(output_path, "w") as f: + f.write("No data") + else: + explorer = imputed.explore( + column="technology", legend=True, popup=True, cmap=colormap ) - - suffixes = df.loc[dup_mask].groupby("powerplant_id").cumcount().astype(str) - df.loc[dup_mask, "powerplant_id"] = duplicate_ids + "_duplicate_" + suffixes - - return df.reset_index(drop=True) + explorer.save(output_path) def main() -> None: """Main snakemake process.""" imputed_gdf = impute( - prepared_cat_gdf=gpd.read_parquet(snakemake.input.prepared), - countries_gdf=gpd.read_parquet(snakemake.input.dissolved_shapes), + relocated_gdf=gpd.read_parquet(snakemake.input.relocated), imputation=snakemake.params.imputation, technology_mapping=snakemake.params.tech_map, ) imputed_gdf.to_parquet(snakemake.output.imputed) + _plots.plot_powerplant_capacity_buildup( imputed_gdf, snakemake.output.plot, "seaborn:tab20" ) + explore(imputed_gdf, snakemake.output.explorer) if __name__ == "__main__": From 013db41d5577697e702a510dc68e572e6742f42f Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:26:54 -0600 Subject: [PATCH 08/11] plot naming cleanup --- ..._explorer.rst => impute_ages_explorer.rst} | 0 ...istogram.rst => impute_ages_histogram.rst} | 0 workflow/report/prepare_shapes.rst | 1 - workflow/rules/aggregate.smk | 3 +- workflow/rules/impute.smk | 62 +++------------ workflow/rules/prepare.smk | 23 ------ workflow/rules/solar.smk | 11 +-- workflow/scripts/_plots.py | 7 +- workflow/scripts/aggregate_capacity.py | 1 + workflow/scripts/impute_adjustment_solar.py | 5 +- .../{impute_years.py => impute_ages.py} | 4 +- .../scripts/impute_category_combination.py | 78 ------------------- workflow/scripts/impute_location.py | 4 +- workflow/scripts/prepare_shapes.py | 72 ----------------- workflow/scripts/proxy.py | 2 +- 15 files changed, 30 insertions(+), 243 deletions(-) rename workflow/report/{impute_years_explorer.rst => impute_ages_explorer.rst} (100%) rename workflow/report/{impute_years_histogram.rst => impute_ages_histogram.rst} (100%) delete mode 100644 workflow/report/prepare_shapes.rst rename workflow/scripts/{impute_years.py => impute_ages.py} (97%) delete mode 100644 workflow/scripts/impute_category_combination.py delete mode 100644 workflow/scripts/prepare_shapes.py diff --git a/workflow/report/impute_years_explorer.rst b/workflow/report/impute_ages_explorer.rst similarity index 100% rename from workflow/report/impute_years_explorer.rst rename to workflow/report/impute_ages_explorer.rst diff --git a/workflow/report/impute_years_histogram.rst b/workflow/report/impute_ages_histogram.rst similarity index 100% rename from workflow/report/impute_years_histogram.rst rename to workflow/report/impute_ages_histogram.rst diff --git a/workflow/report/prepare_shapes.rst b/workflow/report/prepare_shapes.rst deleted file mode 100644 index 6e34e5a..0000000 --- a/workflow/report/prepare_shapes.rst +++ /dev/null @@ -1 +0,0 @@ -Dissolved country shapes, used during imputation. diff --git a/workflow/rules/aggregate.smk b/workflow/rules/aggregate.smk index b5577c0..83e14b4 100644 --- a/workflow/rules/aggregate.smk +++ b/workflow/rules/aggregate.smk @@ -8,7 +8,7 @@ rule aggregate_capacity: output: aggregated="", plot=report( - "/{shapes}/aggregated/{adjustment}/{category}.pdf", + "/{shapes}/aggregated/{adjustment}/{category}_aggregation.png", caption="../report/aggregate_capacity.rst", category="Powerplants module", subcategory="{category}", @@ -22,6 +22,7 @@ rule aggregate_capacity: "../envs/powerplants.yaml" params: category=lambda wc: wc.category, + proj_crs=config["crs"]["projected"], message: "Aggregating capacity for {wildcards.shapes}-{wildcards.adjustment}-{wildcards.category}." script: diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index 17d2a32..9159533 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -34,25 +34,25 @@ rule impute_location: "../scripts/impute_location.py" -rule impute_years: +rule impute_ages: input: relocated=rules.impute_location.output.relocated, output: - imputed="/{shapes}/powerplants/unadjusted/{category}.parquet", - plot=report( + aged="/{shapes}/powerplants/unadjusted/{category}.parquet", + histogram=report( "/{shapes}/powerplants/unadjusted/{category}_histogram.pdf", - caption="../report/impute_years_histogram.rst", + caption="../report/impute_ages_histogram.rst", category="Powerplants module", subcategory="{category}", ), explorer=report( "/{shapes}/powerplants/unadjusted/{category}_explorer.html", - caption="../report/impute_years_explorer.rst", + caption="../report/impute_ages_explorer.rst", category="Powerplants module", subcategory="{category}", ), log: - "/{shapes}/{category}/impute_years.log", + "/{shapes}/{category}/impute_ages.log", wildcard_constraints: dataset="|".join(IMPUTED_CAT), conda: @@ -61,56 +61,14 @@ rule impute_years: imputation=config["imputation"], tech_map=lambda wc: get_technology_mapping(wc.category), message: - "National-level imputation of missing years for all powerplants in {wildcards.shapes}-{wildcards.category} dataset." - script: - "../scripts/impute_years.py" - - -rule impute_category_combination: - input: - internal=rules.impute_years.output.imputed, - user=branch( - exists(""), - then=[""], - otherwise=[], - ), - output: - combined=workflow.pathvars.apply("").format( - shapes="{shapes}", - adjustment="unadjusted", - category="{category}", - ), - plot=report( - "/{shapes}/powerplants/unadjusted/{category}.pdf", - caption="../report/impute_category_combination_histogram.rst", - category="Powerplants module", - subcategory="{category}", - ), - explore=report( - "/{shapes}/powerplants/unadjusted/{category}.html", - caption="../report/impute_category_combination_map.rst", - category="Powerplants module", - subcategory="{category}", - ), - log: - "/{shapes}/{category}/impute_category_combination.log", - wildcard_constraints: - category="|".join(IMPUTED_CAT), - conda: - "../envs/powerplants.yaml" - params: - geo_crs=internal["crs"]["geographic"], - excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), - tech_map=lambda wc: get_technology_mapping(f"{wc.category}"), - message: - "National-level imputation of user-configured inclusions and exclusions for {wildcards.shapes}-{wildcards.category}." + "National-level imputation of missing powerplant ages in {wildcards.shapes}-{wildcards.category} dataset." script: - "../scripts/impute_category_combination.py" + "../scripts/impute_ages.py" rule impute_capacity_adjustment: input: - unadjusted=rules.impute_category_combination.output.combined, + unadjusted=rules.impute_ages.output.aged, stats=rules.prepare_statistics.output.categories, output: adjusted=workflow.pathvars.apply("").format( @@ -119,7 +77,7 @@ rule impute_capacity_adjustment: category="{category}", ), plot=report( - "/{shapes}/powerplants/adjusted/{category}.pdf", + "/{shapes}/powerplants/adjusted/{category}_adjustment.pdf", caption="../report/impute_capacity_adjustment.rst", category="Powerplants module", subcategory="{category}", diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index fcb4eda..c265b32 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -1,29 +1,6 @@ """Rules in this file focus on parsing and cleaning data into shared schemas.""" -rule prepare_shapes: - input: - shapes="", - output: - dissolved="/automatic/shapes/{shapes}/dissolved.parquet", - dissolved_plt=report( - "/automatic/shapes/{shapes}/dissolved.png", - caption="../report/prepare_shapes.rst", - category="Powerplants module", - subcategory="preparation", - ), - log: - "/{shapes}/prepare_shapes.log", - conda: - "../envs/powerplants.yaml" - params: - crs=config["crs"]["projected"], - message: - "Preparing intermediate shapefile versions to speed up processing." - script: - "../scripts/prepare_shapes.py" - - rule prepare_hydropower: input: glohydrores_path=rules.download_glohydrores.output.path, diff --git a/workflow/rules/solar.smk b/workflow/rules/solar.smk index 15db9b6..52691f3 100644 --- a/workflow/rules/solar.smk +++ b/workflow/rules/solar.smk @@ -25,7 +25,7 @@ rule proxy_rooftop_pv: output: proxy="/automatic/shapes/{shapes}/proxies/rooftop_pv.tif", plot=report( - "/automatic/shapes/{shapes}/proxies/rooftop_pv.pdf", + "/automatic/shapes/{shapes}/proxies/rooftop_pv.png", caption="../report/proxy_rooftop_pv.rst", category="Powerplants module", subcategory="solar", @@ -58,14 +58,14 @@ rule impute_adjustment_solar: adjustment="adjusted", category="solar", ), - plot_map=report( - "/{shapes}/aggregated/adjusted/solar_map.pdf", + plot_aggregation=report( + "/{shapes}/aggregated/adjusted/solar_aggregation.png", caption="../report/aggregate_capacity.rst", category="Powerplants module", subcategory="solar", ), - plot_stats=report( - "/{shapes}/aggregated/adjusted/solar_stats.pdf", + plot_adjustment=report( + "/{shapes}/aggregated/adjusted/solar_adjustment.pdf", caption="../report/impute_capacity_adjustment.rst", category="Powerplants module", subcategory="solar", @@ -76,6 +76,7 @@ rule impute_adjustment_solar: "../envs/powerplants.yaml" params: category="solar", + proj_crs=config["crs"]["projected"], technology=config["category"]["solar"]["technology_mapping"]["rooftop_pv"], message: "Aggregating capacity for {wildcards.shapes}-adjusted-solar." diff --git a/workflow/scripts/_plots.py b/workflow/scripts/_plots.py index 353bd66..31e2f84 100644 --- a/workflow/scripts/_plots.py +++ b/workflow/scripts/_plots.py @@ -286,10 +286,11 @@ def plot_capacity_adjustment( def plot_capacity_aggregation( - aggregated_file: str, shapes_file: str, output_file: str, category: str + aggregated_file: str, shapes_file: str, output_file: str, category: str, crs: int | str ): """Plot aggregated capacity per region.""" shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(shapes_file)) + shapes = shapes.to_crs(_utils.check_crs(crs, "projected")) agg = _schemas.AggregatedPlantSchema.validate(pd.read_parquet(aggregated_file)) title = f"Aggregated {category} capacity" @@ -302,12 +303,12 @@ def plot_capacity_aggregation( shapes = shapes.set_index("shape_id") shapes["output_capacity_mw"] = cap_by_shape.replace(0, np.nan) - fig, ax = plt.subplots(figsize=(6, 6), dpi=300, rasterized=True) + fig, ax = plt.subplots(dpi=300) ax = shapes.plot( ax=ax, column="output_capacity_mw", - cmap="magma", + cmap="plasma", edgecolor="grey", linewidth=0.5, legend=True, diff --git a/workflow/scripts/aggregate_capacity.py b/workflow/scripts/aggregate_capacity.py index 55f6af0..1257ce6 100644 --- a/workflow/scripts/aggregate_capacity.py +++ b/workflow/scripts/aggregate_capacity.py @@ -74,4 +74,5 @@ def capacity(powerplant_file: str, shapes_file: str, year: int, output_file: str shapes_file=snakemake.input.shapes, output_file=snakemake.output.plot, category=snakemake.params.category, + crs=snakemake.params.proj_crs ) diff --git a/workflow/scripts/impute_adjustment_solar.py b/workflow/scripts/impute_adjustment_solar.py index c4e9d97..4897677 100644 --- a/workflow/scripts/impute_adjustment_solar.py +++ b/workflow/scripts/impute_adjustment_solar.py @@ -50,15 +50,16 @@ def main(): _plots.plot_capacity_aggregation( aggregated_file=snakemake.output.aggregated, shapes_file=snakemake.input.shapes, - output_file=snakemake.output.plot_map, + output_file=snakemake.output.plot_aggregation, category=snakemake.params.category, + crs=snakemake.params.proj_crs ) _plots.plot_capacity_adjustment( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.large_solar, adjusted_file=snakemake.output.aggregated, year=_utils.DATASET_YEAR, - output_file=snakemake.output.plot_stats, + output_file=snakemake.output.plot_adjustment, is_disagg=False, ) diff --git a/workflow/scripts/impute_years.py b/workflow/scripts/impute_ages.py similarity index 97% rename from workflow/scripts/impute_years.py rename to workflow/scripts/impute_ages.py index 84b704c..7f16943 100644 --- a/workflow/scripts/impute_years.py +++ b/workflow/scripts/impute_ages.py @@ -145,10 +145,10 @@ def main() -> None: imputation=snakemake.params.imputation, technology_mapping=snakemake.params.tech_map, ) - imputed_gdf.to_parquet(snakemake.output.imputed) + imputed_gdf.to_parquet(snakemake.output.aged) _plots.plot_powerplant_capacity_buildup( - imputed_gdf, snakemake.output.plot, "seaborn:tab20" + imputed_gdf, snakemake.output.histogram, "seaborn:tab20" ) explore(imputed_gdf, snakemake.output.explorer) diff --git a/workflow/scripts/impute_category_combination.py b/workflow/scripts/impute_category_combination.py deleted file mode 100644 index a467ec3..0000000 --- a/workflow/scripts/impute_category_combination.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Combine a given number of files belonging to a technology category. - -Optionally, remove a given number of powerplant_id. -""" - -import sys -from typing import TYPE_CHECKING, Any - -import _plots -import _schemas -import _utils -import geopandas as gpd -import pandas as pd - -if TYPE_CHECKING: - snakemake: Any - - -def impute( - input_paths: list[str], - output_path: str, - tech_mapping: dict[str, str], - geo_crs: str, - excluded: list[str] | None = None, -): - """Combine a given number of category files into a final dataset.""" - crs = _utils.check_crs(geo_crs, "geographic") - combined_capacity = pd.concat( - (gpd.read_parquet(f).to_crs(crs) for f in input_paths), - ignore_index=True, - sort=False, - axis="index", - ) - if excluded: - to_drop = _utils.listify(excluded) - combined_capacity = combined_capacity[ - ~combined_capacity["powerplant_id"].isin(to_drop) - ] - - if not combined_capacity.empty: - _utils.check_single_category(combined_capacity) - - schema = _schemas.build_schema(tech_mapping, "impute") - schema.validate(combined_capacity).to_parquet(output_path) - - -def plot(imputed_path: str, output_path: str, colormap="tab20"): - """Plot stacked bar charts of active powerplant capacity over time per country.""" - df = pd.read_parquet(imputed_path) - _plots.plot_powerplant_capacity_buildup(df, output_path, colormap) - - -def explore(imputed_path: str, output_path: str, colormap="tab20"): - """Create a HTML map for users to explore.""" - df = gpd.read_parquet(imputed_path) - if df.empty: - with open(output_path, "w") as f: - f.write("No data") - else: - explorer = df.explore( - column="technology", legend=True, popup=True, cmap=colormap - ) - explorer.save(output_path) - - -if __name__ == "__main__": - sys.stderr = open(snakemake.log[0], "w") - impute( - input_paths=[snakemake.input.internal, *snakemake.input.user], - output_path=snakemake.output.combined, - tech_mapping=snakemake.params.tech_map, - geo_crs=snakemake.params.geo_crs, - excluded=snakemake.params.excluded, - ) - plot(imputed_path=snakemake.output.combined, output_path=snakemake.output.plot) - explore( - imputed_path=snakemake.output.combined, output_path=snakemake.output.explore - ) diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py index 4d96b03..87e7784 100644 --- a/workflow/scripts/impute_location.py +++ b/workflow/scripts/impute_location.py @@ -446,7 +446,6 @@ def plot( linewidth=0.5, color="black", label=f"Shapes (n={len(shapes)})", - rasterized=True, ) layers = [ @@ -461,9 +460,8 @@ def plot( ax=ax, markersize=8 if label == "Valid plants" else 28, marker=".", - alpha=0.25 if label == "Valid plants" else 0.9, + alpha=0.5 if label == "Valid plants" else 0.9, label=f"{label} (n={len(data)})", - rasterized=True, ) ax.set_aspect("equal") diff --git a/workflow/scripts/prepare_shapes.py b/workflow/scripts/prepare_shapes.py deleted file mode 100644 index f8f8654..0000000 --- a/workflow/scripts/prepare_shapes.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Prepare intermediate versions of the shape file used during processing.""" - -import sys -from typing import TYPE_CHECKING, Any - -import _schemas -import geopandas as gpd -from cmap import Colormap -from matplotlib import pyplot as plt -from matplotlib.axes import Axes -from matplotlib.figure import Figure - -if TYPE_CHECKING: - snakemake: Any - - -def dissolve_by_country(shapes: gpd.GeoDataFrame) -> gpd.GeoDataFrame: - """Dissolve shapes by their country.""" - countries_gdf = ( - shapes[["country_id", "geometry"]].dissolve("country_id").reset_index() - ) - countries_gdf["geometry"] = countries_gdf.buffer(0) - return countries_gdf - - -def plot( - original: gpd.GeoDataFrame, - new: gpd.GeoDataFrame, - *, - name: str, - crs: str, - cmap: str, - col: str, -) -> tuple[Figure, Axes]: - """Helper to view computed changes.""" - fig, axes = plt.subplots(1, 2, layout="constrained") - original.to_crs(crs).plot("shape_class", ax=axes[0]) - axes[0].set_title("User") - new_proj = new.to_crs(crs) - if len(new[col].unique()) == 1: - # FIXME: odd issue when plotting a single category - # Upgrading geopandas might fix it. - new_proj.plot(ax=axes[1], color="lightcoral") - else: - new_proj.plot(col, ax=axes[1], cmap=Colormap(cmap).to_mpl()) - new_proj.boundary.plot(ax=axes[1], color="black", lw=0.05) - axes[1].set_title(name) - for ax in axes: - ax.set(xticks=[], yticks=[], xlabel="", ylabel="") - return fig, axes - - -def main(): - """Prepare a cleaned hydropower dataset.""" - shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes)) - dissolved = dissolve_by_country(shapes) - dissolved.to_parquet(snakemake.output.dissolved) - - fig, _ = plot( - shapes, - dissolved, - col="country_id", - name="Dissolved", - crs=snakemake.params.crs, - cmap="glasbey:glasbey", - ) - fig.savefig(snakemake.output.dissolved_plt, dpi=200, bbox_inches="tight") - - -if __name__ == "__main__": - sys.stderr = open(snakemake.log[0], "w") - main() diff --git a/workflow/scripts/proxy.py b/workflow/scripts/proxy.py index 6dfd4ea..41b62b5 100644 --- a/workflow/scripts/proxy.py +++ b/workflow/scripts/proxy.py @@ -108,7 +108,7 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ # Coarsen the proxy data coarse = area_potential_da.coarsen(x=factor, y=factor, boundary="trim").mean() - fig, ax = plt.subplots(figsize=(6, 6), dpi=300, rasterized=True) + fig, ax = plt.subplots(figsize=(6, 6), dpi=300) coarse.plot.imshow( ax=ax, cmap=Colormap("seaborn:rocket").to_matplotlib(), From 3b49b43a2ae4bec8e7040ac1d8610a58c80c3b43 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:19:46 -0600 Subject: [PATCH 09/11] fix integration test --- workflow/rules/impute.smk | 8 +++-- workflow/scripts/_plots.py | 4 +-- workflow/scripts/impute_ages.py | 50 +++++++++++++++-------------- workflow/scripts/impute_location.py | 2 +- 4 files changed, 34 insertions(+), 30 deletions(-) diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index 9159533..f3bbdb0 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -10,7 +10,7 @@ rule impute_location: otherwise=[], ), output: - relocated="/automatic/shapes/{shapes}/impute_location/{category}.parquet", + relocated=temp("/automatic/shapes/{shapes}/impute_location/{category}.parquet"), plot=report( "/automatic/shapes/{shapes}/impute_location/{category}_relocation.png", caption="../report/impute_location.rst", @@ -38,7 +38,11 @@ rule impute_ages: input: relocated=rules.impute_location.output.relocated, output: - aged="/{shapes}/powerplants/unadjusted/{category}.parquet", + aged=workflow.pathvars.apply("").format( + shapes="{shapes}", + adjustment="unadjusted", + category="{category}", + ), histogram=report( "/{shapes}/powerplants/unadjusted/{category}_histogram.pdf", caption="../report/impute_ages_histogram.rst", diff --git a/workflow/scripts/_plots.py b/workflow/scripts/_plots.py index 31e2f84..c4ee512 100644 --- a/workflow/scripts/_plots.py +++ b/workflow/scripts/_plots.py @@ -303,7 +303,7 @@ def plot_capacity_aggregation( shapes = shapes.set_index("shape_id") shapes["output_capacity_mw"] = cap_by_shape.replace(0, np.nan) - fig, ax = plt.subplots(dpi=300) + fig, ax = plt.subplots(dpi=200) ax = shapes.plot( ax=ax, @@ -316,6 +316,4 @@ def plot_capacity_aggregation( missing_kwds={"color": "lightgrey", "alpha": 0.2}, ) ax.set_title(title + f" in year {agg.attrs['year']}") - ax.set_xlabel("Longitude ($deg$)") - ax.set_ylabel("Latitude ($deg$)") fig.savefig(output_file, bbox_inches="tight") diff --git a/workflow/scripts/impute_ages.py b/workflow/scripts/impute_ages.py index 7f16943..cd85627 100644 --- a/workflow/scripts/impute_ages.py +++ b/workflow/scripts/impute_ages.py @@ -97,30 +97,32 @@ def impute( imputation (str): imputation configuration. technology_mapping (str): technology mapping configuration. """ - _utils.check_single_category(relocated_gdf) - - if (relocated_gdf.geometry.geom_type != "Point").any(): - raise ValueError( - "Polygon powerplant geometries detected. Only Points are supported." - ) - - lifetimes = imputation["lifetime_years"] - retirement_delay_years = imputation["retirement_delay_years"] - scenario = SCENARIO_MAP[imputation["scenario"]] - - # Get facilities within the requested scenario - imputed = relocated_gdf[relocated_gdf["status"].isin(scenario)].copy() - - if not imputed.empty: - # Adjust project dates - imputed["start_year"] = _impute_start_year(imputed, lifetimes) - imputed["end_year"] = _impute_end_year( - imputed, lifetimes, retirement_delay_years - ) - # Drop projects with insufficient date data - imputed = imputed.dropna(subset=["start_year", "end_year"]) - # Update the powerplant status - imputed["status"] = _impute_status(imputed) + if relocated_gdf.empty: + imputed = relocated_gdf + else: + _utils.check_single_category(relocated_gdf) + if (relocated_gdf.geometry.geom_type != "Point").any(): + raise ValueError( + "Polygon powerplant geometries detected. Only Points are supported." + ) + + lifetimes = imputation["lifetime_years"] + retirement_delay_years = imputation["retirement_delay_years"] + scenario = SCENARIO_MAP[imputation["scenario"]] + + # Get facilities within the requested scenario + imputed = relocated_gdf[relocated_gdf["status"].isin(scenario)].copy() + + if not imputed.empty: + # Adjust project dates + imputed["start_year"] = _impute_start_year(imputed, lifetimes) + imputed["end_year"] = _impute_end_year( + imputed, lifetimes, retirement_delay_years + ) + # Drop projects with insufficient date data + imputed = imputed.dropna(subset=["start_year", "end_year"]) + # Update the powerplant status + imputed["status"] = _impute_status(imputed) schema = _schemas.build_schema(technology_mapping, "impute") return schema.validate(imputed) diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py index 87e7784..2f17094 100644 --- a/workflow/scripts/impute_location.py +++ b/workflow/scripts/impute_location.py @@ -427,7 +427,7 @@ def plot( projected_crs: _utils.CRS, ): """Plot final powerplants, highlighting split and forced-moved plants.""" - fig, ax = plt.subplots(figsize=(8,8), layout="constrained") + fig, ax = plt.subplots(layout="constrained") adjusted = adjustment.powerplants.to_crs(projected_crs).set_index( "powerplant_id", drop=False From 8bb0efcd6f541e438ca21e3ce4d827cf1c961890 Mon Sep 17 00:00:00 2001 From: Ivan Ruiz Manuel <72193617+irm-codebase@users.noreply.github.com> Date: Wed, 15 Apr 2026 11:29:56 -0600 Subject: [PATCH 10/11] isolate time imputation in schema --- config/config.yaml | 85 ++++++++++--------- workflow/internal/config.schema.yaml | 49 ++++++----- ..._explorer.rst => impute_time_explorer.rst} | 0 ...istogram.rst => impute_time_histogram.rst} | 0 workflow/rules/_utils.smk | 4 +- workflow/rules/impute.smk | 12 +-- 6 files changed, 79 insertions(+), 71 deletions(-) rename workflow/report/{impute_ages_explorer.rst => impute_time_explorer.rst} (100%) rename workflow/report/{impute_ages_histogram.rst => impute_time_histogram.rst} (100%) diff --git a/config/config.yaml b/config/config.yaml index eda0aed..f8acc1a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -113,45 +113,46 @@ imputation: run of river: "land" steam turbine: "land" utility pv: "land" - scenario: "construction" # Options: historical->construction->pre_construction->announced - lifetime_years: - # General lifetime assumptions. Adjust as needed! - bioenergy turbine: 25 - ccgt: 20 - coal turbine: 50 - concentrating solar power: 25 - high enthalpy geothermal: 30 - igcc: 25 - low enthalpy geothermal: 30 - nuclear reactor: 50 - ocgt: 20 - offshore: 25 - onshore: 25 - pumped storage: 80 - reciprocating engine: 20 - reservoir: 80 - rooftop pv: 25 - run of river: 80 - steam turbine: 30 - utility pv: 25 - retirement_delay_years: - # Assumed extension past reference year. - # Only applies if a powerplant is still operating past the lifetime_years assumption - bioenergy turbine: 5 - ccgt: 5 - coal turbine: 5 - concentrating solar power: 2 - high enthalpy geothermal: 5 - igcc: 5 - low enthalpy geothermal: 5 - nuclear reactor: 5 - ocgt: 5 - offshore: 2 - onshore: 2 - pumped storage: 10 - reciprocating engine: 5 - reservoir: 10 - rooftop pv: 5 - run of river: 10 - steam turbine: 5 - utility pv: 2 + time: + scenario: "construction" # Options: historical->construction->pre_construction->announced + lifetime_years: + # General lifetime assumptions. Adjust as needed! + bioenergy turbine: 25 + ccgt: 20 + coal turbine: 50 + concentrating solar power: 25 + high enthalpy geothermal: 30 + igcc: 25 + low enthalpy geothermal: 30 + nuclear reactor: 50 + ocgt: 20 + offshore: 25 + onshore: 25 + pumped storage: 80 + reciprocating engine: 20 + reservoir: 80 + rooftop pv: 25 + run of river: 80 + steam turbine: 30 + utility pv: 25 + retirement_delay_years: + # Assumed extension past reference year. + # Only applies if a powerplant is still operating past the lifetime_years assumption + bioenergy turbine: 5 + ccgt: 5 + coal turbine: 5 + concentrating solar power: 2 + high enthalpy geothermal: 5 + igcc: 5 + low enthalpy geothermal: 5 + nuclear reactor: 5 + ocgt: 5 + offshore: 2 + onshore: 2 + pumped storage: 10 + reciprocating engine: 5 + reservoir: 10 + rooftop pv: 5 + run of river: 10 + steam turbine: 5 + utility pv: 2 diff --git a/workflow/internal/config.schema.yaml b/workflow/internal/config.schema.yaml index 71f3197..609be80 100644 --- a/workflow/internal/config.schema.yaml +++ b/workflow/internal/config.schema.yaml @@ -353,9 +353,7 @@ properties: additionalProperties: false required: - location - - scenario - - lifetime_years - - retirement_delay_years + - time properties: location: title: Location imputation @@ -409,21 +407,30 @@ properties: enum: - land - maritime - scenario: - title: Scenario - description: | - Determines which 'future' facilities to keep beyond the adjustment year. In order of lower to higher uncertainty: - - historical: only keep existing capacities. - - construction: keeps the above plus projects already in construction (RECOMMENDED). - - pre_construction: keeps the above plus projects in pre-construction stages (e.g., those processing permits). - - announced: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. - type: string - enum: - - historical - - construction - - pre_construction - - announced - lifetime_years: - $ref: '#/$defs/year_map' - retirement_delay_years: - $ref: '#/$defs/year_map' + time: + title: Time imputation + description: Settings in this section relate to temporal aspects of powerplants + additionalProperties: false + required: + - scenario + - lifetime_years + - retirement_delay_years + properties: + scenario: + title: Scenario + description: | + Determines which 'future' facilities to keep beyond the adjustment year. In order of lower to higher uncertainty: + - historical: only keep existing capacities. + - construction: keeps the above plus projects already in construction (RECOMMENDED). + - pre_construction: keeps the above plus projects in pre-construction stages (e.g., those processing permits). + - announced: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. + type: string + enum: + - historical + - construction + - pre_construction + - announced + lifetime_years: + $ref: '#/$defs/year_map' + retirement_delay_years: + $ref: '#/$defs/year_map' diff --git a/workflow/report/impute_ages_explorer.rst b/workflow/report/impute_time_explorer.rst similarity index 100% rename from workflow/report/impute_ages_explorer.rst rename to workflow/report/impute_time_explorer.rst diff --git a/workflow/report/impute_ages_histogram.rst b/workflow/report/impute_time_histogram.rst similarity index 100% rename from workflow/report/impute_ages_histogram.rst rename to workflow/report/impute_time_histogram.rst diff --git a/workflow/rules/_utils.smk b/workflow/rules/_utils.smk index df78411..7b4221b 100644 --- a/workflow/rules/_utils.smk +++ b/workflow/rules/_utils.smk @@ -32,8 +32,8 @@ IMPUTED_CAT_WITHOUT_ADJUSTMENT = {"large_solar"} def additional_config_validation(): """Ensures technology mapping and lifetime-related names match.""" - lifetime_set = set(config["imputation"]["lifetime_years"].keys()) - mismatch = lifetime_set ^ set(config["imputation"]["retirement_delay_years"]) + lifetime_set = set(config["imputation"]["time"]["lifetime_years"].keys()) + mismatch = lifetime_set ^ set(config["imputation"]["time"]["retirement_delay_years"]) if mismatch: raise ValueError( f"Technologies in lifetimes and retirement delays mismatch: {mismatch}." diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index f3bbdb0..8bdb97f 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -34,7 +34,7 @@ rule impute_location: "../scripts/impute_location.py" -rule impute_ages: +rule impute_time: input: relocated=rules.impute_location.output.relocated, output: @@ -45,24 +45,24 @@ rule impute_ages: ), histogram=report( "/{shapes}/powerplants/unadjusted/{category}_histogram.pdf", - caption="../report/impute_ages_histogram.rst", + caption="../report/impute_time_histogram.rst", category="Powerplants module", subcategory="{category}", ), explorer=report( "/{shapes}/powerplants/unadjusted/{category}_explorer.html", - caption="../report/impute_ages_explorer.rst", + caption="../report/impute_time_explorer.rst", category="Powerplants module", subcategory="{category}", ), log: - "/{shapes}/{category}/impute_ages.log", + "/{shapes}/{category}/impute_time.log", wildcard_constraints: dataset="|".join(IMPUTED_CAT), conda: "../envs/powerplants.yaml" params: - imputation=config["imputation"], + imputation=config["imputation"]["time"], tech_map=lambda wc: get_technology_mapping(wc.category), message: "National-level imputation of missing powerplant ages in {wildcards.shapes}-{wildcards.category} dataset." @@ -72,7 +72,7 @@ rule impute_ages: rule impute_capacity_adjustment: input: - unadjusted=rules.impute_ages.output.aged, + unadjusted=rules.impute_time.output.aged, stats=rules.prepare_statistics.output.categories, output: adjusted=workflow.pathvars.apply("").format( From e03bb4e13bb888d1d396d3eb98c9ee82d19928e7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 17:31:49 +0000 Subject: [PATCH 11/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- workflow/rules/_utils.smk | 4 +++- workflow/rules/impute.smk | 9 ++++++--- workflow/rules/prepare.smk | 6 +++--- workflow/scripts/_plots.py | 8 ++++++-- workflow/scripts/_utils.py | 8 ++++++-- workflow/scripts/aggregate_capacity.py | 2 +- workflow/scripts/impute_adjustment_solar.py | 2 +- workflow/scripts/impute_location.py | 5 +---- workflow/scripts/prepare_bioenergy.py | 2 +- workflow/scripts/prepare_fossil.py | 15 +++++++++------ workflow/scripts/prepare_geothermal.py | 7 +++++-- workflow/scripts/prepare_nuclear.py | 7 +++++-- workflow/scripts/prepare_wind_wemi.py | 6 ++++-- 13 files changed, 51 insertions(+), 30 deletions(-) diff --git a/workflow/rules/_utils.smk b/workflow/rules/_utils.smk index 7b4221b..c4c0ed3 100644 --- a/workflow/rules/_utils.smk +++ b/workflow/rules/_utils.smk @@ -33,7 +33,9 @@ IMPUTED_CAT_WITHOUT_ADJUSTMENT = {"large_solar"} def additional_config_validation(): """Ensures technology mapping and lifetime-related names match.""" lifetime_set = set(config["imputation"]["time"]["lifetime_years"].keys()) - mismatch = lifetime_set ^ set(config["imputation"]["time"]["retirement_delay_years"]) + mismatch = lifetime_set ^ set( + config["imputation"]["time"]["retirement_delay_years"] + ) if mismatch: raise ValueError( f"Technologies in lifetimes and retirement delays mismatch: {mismatch}." diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index 8bdb97f..a4d5704 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -1,5 +1,6 @@ """Rules related to internal and user-provided imputations.""" + rule impute_location: input: internal="/automatic/prepared/{category}.parquet", @@ -10,13 +11,15 @@ rule impute_location: otherwise=[], ), output: - relocated=temp("/automatic/shapes/{shapes}/impute_location/{category}.parquet"), + relocated=temp( + "/automatic/shapes/{shapes}/impute_location/{category}.parquet" + ), plot=report( "/automatic/shapes/{shapes}/impute_location/{category}_relocation.png", caption="../report/impute_location.rst", category="Powerplants module", subcategory="{category}", - ) + ), log: "/{shapes}/{category}/impute_location.log", wildcard_constraints: @@ -24,7 +27,7 @@ rule impute_location: conda: "../envs/powerplants.yaml" params: - crs=internal["crs"]|config["crs"], + crs=internal["crs"] | config["crs"], location_cnf=config["imputation"]["location"], excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), tech_mapping=lambda wc: get_technology_mapping(f"{wc.category}"), diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index c265b32..4fe76b0 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -12,7 +12,7 @@ rule prepare_hydropower: "../envs/powerplants.yaml" params: technology_mapping=config["category"]["hydropower"]["technology_mapping"], - geo_crs=internal["crs"]["geographic"] + geo_crs=internal["crs"]["geographic"], message: "Preparing hydropower powerplants using the GloHydroRes dataset." script: @@ -33,7 +33,7 @@ rule prepare_large_solar: dc_ac_ratio=config["category"]["solar"]["dc_ac_ratio"]["utility_pv"], utility_pv_name=config["category"]["solar"]["technology_mapping"]["utility_pv"], csp_name=config["category"]["solar"]["technology_mapping"]["csp"], - geo_crs=internal["crs"]["geographic"] + geo_crs=internal["crs"]["geographic"], message: "Preparing utility PV powerplants using the TZ-SAM and GEM-GSPT datasets." script: @@ -53,7 +53,7 @@ if config["category"]["wind"]["source"] == "gem": "../envs/powerplants.yaml" params: tech_map=config["category"]["wind"]["technology_mapping"], - geo_crs=internal["crs"]["geographic"] + geo_crs=internal["crs"]["geographic"], message: "Preparing wind powerplants using the Global Wind Power Tracker (GEM-GWPT) dataset." script: diff --git a/workflow/scripts/_plots.py b/workflow/scripts/_plots.py index c4ee512..c917883 100644 --- a/workflow/scripts/_plots.py +++ b/workflow/scripts/_plots.py @@ -14,7 +14,7 @@ from matplotlib.patches import Patch -def draw_empty(ax: Axes, title: str="", message="No data available"): +def draw_empty(ax: Axes, title: str = "", message="No data available"): """Helper to render an empty-data placeholder.""" ax.text(0.5, 0.5, message, ha="center", va="center", fontsize=12, alpha=0.7) if title: @@ -286,7 +286,11 @@ def plot_capacity_adjustment( def plot_capacity_aggregation( - aggregated_file: str, shapes_file: str, output_file: str, category: str, crs: int | str + aggregated_file: str, + shapes_file: str, + output_file: str, + category: str, + crs: int | str, ): """Plot aggregated capacity per region.""" shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(shapes_file)) diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index 2258b95..b1a8d04 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -12,7 +12,9 @@ DATASET_YEAR = 2023 -def check_crs(crs: int | str, how: Literal["projected", "geographic", "geocentric"]) -> CRS: +def check_crs( + crs: int | str, how: Literal["projected", "geographic", "geocentric"] +) -> CRS: """Helper to verify user-provided CRS codes.""" parsed = CRS.from_user_input(crs) correct = False @@ -61,7 +63,9 @@ def get_point_col( raw: pd.DataFrame, lon_col: str, lat_col: str, crs: str = "EPSG:4326" ) -> gpd.GeoSeries: """Converts latitude / longitude columns to a point geometry.""" - return gpd.points_from_xy(raw[lon_col], raw[lat_col], crs=check_crs(crs, "geographic")) + return gpd.points_from_xy( + raw[lon_col], raw[lat_col], crs=check_crs(crs, "geographic") + ) def get_combined_text_col( diff --git a/workflow/scripts/aggregate_capacity.py b/workflow/scripts/aggregate_capacity.py index 1257ce6..49f0875 100644 --- a/workflow/scripts/aggregate_capacity.py +++ b/workflow/scripts/aggregate_capacity.py @@ -74,5 +74,5 @@ def capacity(powerplant_file: str, shapes_file: str, year: int, output_file: str shapes_file=snakemake.input.shapes, output_file=snakemake.output.plot, category=snakemake.params.category, - crs=snakemake.params.proj_crs + crs=snakemake.params.proj_crs, ) diff --git a/workflow/scripts/impute_adjustment_solar.py b/workflow/scripts/impute_adjustment_solar.py index 4897677..beecf52 100644 --- a/workflow/scripts/impute_adjustment_solar.py +++ b/workflow/scripts/impute_adjustment_solar.py @@ -52,7 +52,7 @@ def main(): shapes_file=snakemake.input.shapes, output_file=snakemake.output.plot_aggregation, category=snakemake.params.category, - crs=snakemake.params.proj_crs + crs=snakemake.params.proj_crs, ) _plots.plot_capacity_adjustment( stats_file=snakemake.input.stats, diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py index 2f17094..bfac9db 100644 --- a/workflow/scripts/impute_location.py +++ b/workflow/scripts/impute_location.py @@ -442,10 +442,7 @@ def plot( if not shapes.empty: shapes.boundary.plot( - ax=ax, - linewidth=0.5, - color="black", - label=f"Shapes (n={len(shapes)})", + ax=ax, linewidth=0.5, color="black", label=f"Shapes (n={len(shapes)})" ) layers = [ diff --git a/workflow/scripts/prepare_bioenergy.py b/workflow/scripts/prepare_bioenergy.py index 547e88e..f4f4f76 100644 --- a/workflow/scripts/prepare_bioenergy.py +++ b/workflow/scripts/prepare_bioenergy.py @@ -61,7 +61,7 @@ def main( "chp": False, "fuel_class": fuel_class, }, - crs=crs + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(bioenergy_df).to_parquet(output_plants_path) diff --git a/workflow/scripts/prepare_fossil.py b/workflow/scripts/prepare_fossil.py index 1940889..d8bfd8f 100644 --- a/workflow/scripts/prepare_fossil.py +++ b/workflow/scripts/prepare_fossil.py @@ -73,7 +73,10 @@ def _get_end_year( def prepare_gem_gcpt( - gem_gcpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], crs:str + gem_gcpt_path: str, + technology_mapping: dict[str, str], + fuel_mapping: dict[str, str], + crs: str, ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain coal power locations using GEM-GCPT data.""" raw_df = gem.read_gem_dataset(gem_gcpt_path, ["Units"]) @@ -102,7 +105,7 @@ def prepare_gem_gcpt( "chp": False, # Not specified in GCPT "fuel_class": fuel_class, }, - crs=crs + crs=crs, ).reset_index(drop=True) coal_df = _utils.ensure_positive_capacity(coal_df) schema = _schemas.build_schema(technology_mapping, "prepare") @@ -113,7 +116,7 @@ def prepare_gem_gogpt( gem_gogpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], - crs: str + crs: str, ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain oil and gas power plants using GEM-GOGPT data.""" raw_df = gem.read_gem_dataset( @@ -143,7 +146,7 @@ def prepare_gem_gogpt( "chp": raw_df["chp"] == "yes", "fuel_class": fuel_class, }, - crs=crs + crs=crs, ).reset_index(drop=True) oil_gas_df = _utils.ensure_positive_capacity(oil_gas_df) schema = _schemas.build_schema(technology_mapping, "prepare") @@ -157,7 +160,7 @@ def main() -> None: gem_gogpt_path=snakemake.input.gem_gogpt, technology_mapping=snakemake.params.technology_mapping["oil_gas"], fuel_mapping=snakemake.params.fuel_mapping, - crs=snakemake.params.geo_crs + crs=snakemake.params.geo_crs, ) og_plants.to_parquet(snakemake.output.og_plants) og_fuels.to_parquet(snakemake.output.og_fuels) @@ -167,7 +170,7 @@ def main() -> None: gem_gcpt_path=snakemake.input.gem_gcpt, technology_mapping=snakemake.params.technology_mapping["coal"], fuel_mapping=snakemake.params.fuel_mapping, - crs=snakemake.params.geo_crs + crs=snakemake.params.geo_crs, ) coal_plants.to_parquet(snakemake.output.coal_plants) coal_fuels.to_parquet(snakemake.output.coal_fuels) diff --git a/workflow/scripts/prepare_geothermal.py b/workflow/scripts/prepare_geothermal.py index 7ddf8eb..3d29517 100644 --- a/workflow/scripts/prepare_geothermal.py +++ b/workflow/scripts/prepare_geothermal.py @@ -13,7 +13,10 @@ def main( - gem_ggpt_path: str, technology_mapping: dict[str, str], crs: str, output_plants_path: str + gem_ggpt_path: str, + technology_mapping: dict[str, str], + crs: str, + output_plants_path: str, ): """Obtain geothermal power plants using GEM-GGPT data.""" raw_df = gem.read_gem_dataset( @@ -36,7 +39,7 @@ def main( "status": gem.status_col(raw_df), "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), }, - crs=crs + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(geo_df).to_parquet(output_plants_path) diff --git a/workflow/scripts/prepare_nuclear.py b/workflow/scripts/prepare_nuclear.py index 0018c5f..4985320 100644 --- a/workflow/scripts/prepare_nuclear.py +++ b/workflow/scripts/prepare_nuclear.py @@ -14,7 +14,10 @@ def main( - gem_gnpt_path: str, technology_mapping: dict[str, str], crs:str, output_plants_path: str + gem_gnpt_path: str, + technology_mapping: dict[str, str], + crs: str, + output_plants_path: str, ): """Obtain nuclear power plants using GEM-GNPT data.""" raw_df = gem.read_gem_dataset(gem_gnpt_path, ["Data"]) @@ -39,7 +42,7 @@ def main( "status": gem.status_col(raw_df), "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), }, - crs=crs + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(nuclear_df).to_parquet(output_plants_path) diff --git a/workflow/scripts/prepare_wind_wemi.py b/workflow/scripts/prepare_wind_wemi.py index 81fbde6..f0bfcd3 100644 --- a/workflow/scripts/prepare_wind_wemi.py +++ b/workflow/scripts/prepare_wind_wemi.py @@ -46,7 +46,7 @@ def _start_year(raw_df: pd.DataFrame) -> pd.Series: return pd.to_numeric(year, errors="coerce") -def prepare_wemi(wemi_path: str, tech_map: dict, crs: str)-> gpd.GeoDataFrame: +def prepare_wemi(wemi_path: str, tech_map: dict, crs: str) -> gpd.GeoDataFrame: """Standardised and validated version of the WEMI dataset.""" raw_df = pd.read_excel( wemi_path, @@ -78,7 +78,9 @@ def prepare_wemi(wemi_path: str, tech_map: dict, crs: str)-> gpd.GeoDataFrame: def main(): """Saves a standardised and validated version of the WEMI dataset.""" - wemi_gdf = prepare_wemi(snakemake.input.wemi, snakemake.params.tech_map, snakemake.params.geo_crs) + wemi_gdf = prepare_wemi( + snakemake.input.wemi, snakemake.params.tech_map, snakemake.params.geo_crs + ) wemi_gdf.to_parquet(snakemake.output.path)