diff --git a/config/config.yaml b/config/config.yaml index 2f71762..f8acc1a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -91,46 +91,68 @@ fuel_mapping: "fossil gas: unknown": "natural gas" imputation: - shape_overlap_correction: "split_capacity" # strict or split_capacity - scenario: "near-future" # near-future, far-future, far-off-future - lifetime_years: - # General lifetime assumptions. Adjust as needed! - bioenergy turbine: 25 - ccgt: 20 - coal turbine: 50 - concentrating solar power: 25 - high enthalpy geothermal: 30 - igcc: 25 - low enthalpy geothermal: 30 - nuclear reactor: 50 - ocgt: 20 - offshore: 25 - onshore: 25 - pumped storage: 80 - reciprocating engine: 20 - reservoir: 80 - rooftop pv: 25 - run of river: 80 - steam turbine: 30 - utility pv: 25 - retirement_delay_years: - # Assumed extension past reference year. - # Only applies if a powerplant is still operating past the lifetime_years assumption - bioenergy turbine: 5 - ccgt: 5 - coal turbine: 5 - concentrating solar power: 2 - high enthalpy geothermal: 5 - igcc: 5 - low enthalpy geothermal: 5 - nuclear reactor: 5 - ocgt: 5 - offshore: 2 - onshore: 2 - pumped storage: 10 - reciprocating engine: 5 - reservoir: 10 - rooftop pv: 5 - run of river: 10 - steam turbine: 5 - utility pv: 2 + location: + inner_distance: 100 # matches projected CRS unit. + on_overlap: "split_capacity" + on_forced_class_error: "drop" # either 'drop', 'ignore', or 'raise' + forced_class: + bioenergy turbine: "land" + ccgt: "land" + coal turbine: "land" + concentrating solar power: "land" + high enthalpy geothermal: "land" + igcc: "land" + low enthalpy geothermal: "land" + nuclear reactor: "land" + ocgt: "land" + offshore: "maritime" + onshore: "land" + pumped storage: "land" + reciprocating engine: "land" + reservoir: "land" + run of river: "land" + steam turbine: "land" + utility pv: "land" + time: + scenario: "construction" # Options: historical->construction->pre_construction->announced + lifetime_years: + # General lifetime assumptions. Adjust as needed! + bioenergy turbine: 25 + ccgt: 20 + coal turbine: 50 + concentrating solar power: 25 + high enthalpy geothermal: 30 + igcc: 25 + low enthalpy geothermal: 30 + nuclear reactor: 50 + ocgt: 20 + offshore: 25 + onshore: 25 + pumped storage: 80 + reciprocating engine: 20 + reservoir: 80 + rooftop pv: 25 + run of river: 80 + steam turbine: 30 + utility pv: 25 + retirement_delay_years: + # Assumed extension past reference year. + # Only applies if a powerplant is still operating past the lifetime_years assumption + bioenergy turbine: 5 + ccgt: 5 + coal turbine: 5 + concentrating solar power: 2 + high enthalpy geothermal: 5 + igcc: 5 + low enthalpy geothermal: 5 + nuclear reactor: 5 + ocgt: 5 + offshore: 2 + onshore: 2 + pumped storage: 10 + reciprocating engine: 5 + reservoir: 10 + rooftop pv: 5 + run of river: 10 + steam turbine: 5 + utility pv: 2 diff --git a/pixi.lock b/pixi.lock index 37f887f..196b3c7 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1082,7 +1082,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/msgpack-python-1.1.2-py312hd9148b4_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/muparser-2.3.5-h5888daf_0.conda - - conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.0-py312h4c3975b_0.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.1-py312h4c3975b_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/ncurses-6.5-h2d0b736_3.conda @@ -1352,7 +1352,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/osx-arm64/msgpack-python-1.1.2-py312h84eede6_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/muparser-2.3.5-h11e0b38_0.conda - - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.0-py312hefc2c51_0.conda + - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.1-py312hefc2c51_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/ncurses-6.5-h5e97a16_3.conda @@ -1599,7 +1599,7 @@ environments: - conda: https://conda.anaconda.org/conda-forge/win-64/msgpack-python-1.1.2-py312hf90b1b7_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/munkres-1.1.4-pyhd8ed1ab_1.conda - conda: https://conda.anaconda.org/conda-forge/win-64/muparser-2.3.5-he0c23c2_0.conda - - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.0-py312he06e257_0.conda + - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.1-py312he06e257_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/narwhals-2.18.1-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/noarch/networkx-3.6.1-pyhcf101f3_0.conda @@ -11973,9 +11973,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 18931157 timestamp: 1752534887158 -- conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.0-py312h4c3975b_0.conda - sha256: bb9ab670f94099843167170b81b597921890ffb0f56cb2222c1e611213b09f7e - md5: a1f58c8babb356d64f70c380dde5df2b +- conda: https://conda.anaconda.org/conda-forge/linux-64/mypy-1.20.1-py312h4c3975b_0.conda + sha256: b9dc656d1f78e75ae07d57aefae6613ac94d55a4b33aabb35daa9451e624ef2f + md5: 672833a9f4e00c5e4ddbfa53b2aad6ee depends: - __glibc >=2.17,<3.0.a0 - libgcc >=14 @@ -11989,9 +11989,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 22053001 - timestamp: 1775234215387 + - pkg:pypi/mypy?source=compressed-mapping + size: 21984603 + timestamp: 1776069494897 - conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.17.0-py312h163523d_0.conda sha256: 0fb78906dbcb3daa1c9b9aa9047985ef1cacf2d2e64d271c1b9e7570ae4ed1fc md5: 77b10261bad8e87c9ebb6cebd493c394 @@ -12010,9 +12010,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 10410020 timestamp: 1752534951317 -- conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.0-py312hefc2c51_0.conda - sha256: 002e080ca7731b473e41eed2ab79357b070419ff8613b6ad531ef756aabbb649 - md5: 16e653b99c021b140cfe6f2cc23ee02a +- conda: https://conda.anaconda.org/conda-forge/osx-arm64/mypy-1.20.1-py312hefc2c51_0.conda + sha256: 66a5cae2d9107025d42f8fe013450c8fdd110afe1c8723420d1ca296445eb337 + md5: a84404f9fb0eeb402eec5ef06bc2522c depends: - __osx >=11.0 - mypy_extensions >=1.0.0 @@ -12026,9 +12026,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 11840059 - timestamp: 1775234973068 + - pkg:pypi/mypy?source=compressed-mapping + size: 11876678 + timestamp: 1776070219837 - conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.17.0-py312he06e257_0.conda sha256: 93fc4abec9042deff9f882135601f90159fec0e88b9f2ac939a99a491b8c9be7 md5: 7a66ebebe26ae807bc24c1bc7661ad33 @@ -12048,9 +12048,9 @@ packages: - pkg:pypi/mypy?source=hash-mapping size: 10032459 timestamp: 1752534651646 -- conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.0-py312he06e257_0.conda - sha256: ebadc3b1c357ac2a8f2c68666f675e7004923f7f83faf22fbf239f404894b927 - md5: d461b246f4afe6f77f8ce37b8f66c23d +- conda: https://conda.anaconda.org/conda-forge/win-64/mypy-1.20.1-py312he06e257_0.conda + sha256: 142c45e7cd0ede5ffadf693f40c868f638b89b94a9983b4c8844c93342274255 + md5: 0e38dda3b76708bbc4bbfaff5bacf820 depends: - mypy_extensions >=1.0.0 - pathspec >=1.0.0 @@ -12065,9 +12065,9 @@ packages: license: MIT license_family: MIT purls: - - pkg:pypi/mypy?source=hash-mapping - size: 11766260 - timestamp: 1775234467870 + - pkg:pypi/mypy?source=compressed-mapping + size: 11666119 + timestamp: 1776069830794 - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda sha256: 6ed158e4e5dd8f6a10ad9e525631e35cee8557718f83de7a4e3966b1f772c4b1 md5: e9c622e0d00fa24a6292279af3ab6d06 diff --git a/pixi.toml b/pixi.toml index 91df14c..3ebf745 100644 --- a/pixi.toml +++ b/pixi.toml @@ -46,7 +46,7 @@ python = "3.12.9.*" pyyaml = "6.0.2.*" xlrd = "2.0.1.*" pip = "25.1.1.*" -mypy = "==1.20.0" +mypy = "1.20.1.*" [feature.powerplants.pypi-dependencies] gregor = { git = "https://github.com/jnnr/gregor.git", rev = "fix-memory-explosion" } diff --git a/workflow/internal/config.schema.yaml b/workflow/internal/config.schema.yaml index d3adbb5..609be80 100644 --- a/workflow/internal/config.schema.yaml +++ b/workflow/internal/config.schema.yaml @@ -17,10 +17,9 @@ $defs: type: string minLength: 1 - nonzero_number: + positive_number: type: number - not: - const: 0 + exclusiveMinimum: 0 year_map: type: object @@ -246,7 +245,7 @@ properties: additionalProperties: false properties: utility_pv: - $ref: '#/$defs/nonzero_number' + $ref: '#/$defs/positive_number' required: - utility_pv required: @@ -353,36 +352,85 @@ properties: type: object additionalProperties: false required: - - shape_overlap_correction - - scenario - - lifetime_years - - retirement_delay_years + - location + - time properties: - shape_overlap_correction: - title: Shape overlap correction method - description: | - Approach to handling powerplants that land on shape overlaps. - Either 'strict' (raise errors) or 'split_capacity' (divide powerplant capacity equally across overlapping shapes). - type: string - enum: - - strict - - split_capacity - scenario: - title: Scenario - description: > - Determines which 'future' facilities to keep beyond the adjustment year. From less to more: - - - historical: only keep existing capacities. - - near-future: keeps the above plus projects in construction (RECOMMENDED). - - far-future: keeps the above plus projects in pre-construction stages (e.g., those processing permits). - - far-off-future: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. - type: string - enum: - - historical - - near-future - - far-future - - far-off-future - lifetime_years: - $ref: '#/$defs/year_map' - retirement_delay_years: - $ref: '#/$defs/year_map' + location: + title: Location imputation + description: Settings in this section affect how the module modifies the location of powerplants. + type: object + additionalProperties: false + required: + - inner_distance + - on_overlap + - on_forced_class_error + properties: + inner_distance: + title: Inner distance + description: | + Distance, in projected CRS units, used when relocating powerplant points into valid placement areas. + Applies both to overlap-split plants and to plants forced into a target shape class. + The point is placed approximately this far inside the selected polygon from the nearest boundary/contact point. + If the available interior segment is shorter, the midpoint of that segment is used. + $ref: '#/$defs/positive_number' + on_overlap: + title: Overlap adjustment method + description: | + Controls how to handle powerplants that overlap on more than one shape. + - "raise" stops the workflow with an error listing the overlapping powerplants. + - "split_capacity" creates smaller duplicate powerplants in each intersecting shape, moved using `inner_distance`. + type: string + enum: + - raise + - split_capacity + on_forced_class_error: + title: Forced shape class adjustment error handling + description: | + Controls how to handle powerplants whose technology is configured for a forced `shape_class`, but no matching shape exists in that plant's country. + - "raise": Stop the workflow with an error listing the failing country/shape_class/technology combinations. + - "ignore": Leave those powerplants unchanged. + - "drop": Remove those powerplants. + type: string + enum: + - raise + - ignore + - drop + forced_class: + title: Forced `shape_class` adjustment. + description: | + Optional. Maps powerplant technologies to a `shape_class`. + Powerplants with a listed technology are moved to the nearest matching shape class within their assigned country. + Allowed values are "land" and "maritime". + type: object + additionalProperties: + type: string + enum: + - land + - maritime + time: + title: Time imputation + description: Settings in this section relate to temporal aspects of powerplants + additionalProperties: false + required: + - scenario + - lifetime_years + - retirement_delay_years + properties: + scenario: + title: Scenario + description: | + Determines which 'future' facilities to keep beyond the adjustment year. In order of lower to higher uncertainty: + - historical: only keep existing capacities. + - construction: keeps the above plus projects already in construction (RECOMMENDED). + - pre_construction: keeps the above plus projects in pre-construction stages (e.g., those processing permits). + - announced: keeps the above plus announced projects that have not reached permitting stages. Highly speculative. + type: string + enum: + - historical + - construction + - pre_construction + - announced + lifetime_years: + $ref: '#/$defs/year_map' + retirement_delay_years: + $ref: '#/$defs/year_map' diff --git a/workflow/internal/settings.yaml b/workflow/internal/settings.yaml index 43236bc..168c0d0 100644 --- a/workflow/internal/settings.yaml +++ b/workflow/internal/settings.yaml @@ -1,4 +1,6 @@ # Module settings that users cannot modify. +crs: + geographic: "EPSG:4326" resources: automatic: # Links for automatically downloaded files diff --git a/workflow/report/impute_location.rst b/workflow/report/impute_location.rst new file mode 100644 index 0000000..9681fa7 --- /dev/null +++ b/workflow/report/impute_location.rst @@ -0,0 +1 @@ +Powerplant relocation based on the imputation->location configuration. diff --git a/workflow/report/impute_category_combination_map.rst b/workflow/report/impute_time_explorer.rst similarity index 100% rename from workflow/report/impute_category_combination_map.rst rename to workflow/report/impute_time_explorer.rst diff --git a/workflow/report/impute_category_combination_histogram.rst b/workflow/report/impute_time_histogram.rst similarity index 52% rename from workflow/report/impute_category_combination_histogram.rst rename to workflow/report/impute_time_histogram.rst index 67a0fd4..625cf9b 100644 --- a/workflow/report/impute_category_combination_histogram.rst +++ b/workflow/report/impute_time_histogram.rst @@ -1,5 +1 @@ Histogram of evolving {{ snakemake.wildcards.category }} capacity in each country in {{ snakemake.wildcards.shapes }}. -Determined by: - -* Imputed lifetimes and retirement delay. -* Imputed powerplant exclusions and inclusions. diff --git a/workflow/report/prepare_shapes.rst b/workflow/report/prepare_shapes.rst deleted file mode 100644 index 6e34e5a..0000000 --- a/workflow/report/prepare_shapes.rst +++ /dev/null @@ -1 +0,0 @@ -Dissolved country shapes, used during imputation. diff --git a/workflow/rules/_utils.smk b/workflow/rules/_utils.smk index df78411..c4c0ed3 100644 --- a/workflow/rules/_utils.smk +++ b/workflow/rules/_utils.smk @@ -32,8 +32,10 @@ IMPUTED_CAT_WITHOUT_ADJUSTMENT = {"large_solar"} def additional_config_validation(): """Ensures technology mapping and lifetime-related names match.""" - lifetime_set = set(config["imputation"]["lifetime_years"].keys()) - mismatch = lifetime_set ^ set(config["imputation"]["retirement_delay_years"]) + lifetime_set = set(config["imputation"]["time"]["lifetime_years"].keys()) + mismatch = lifetime_set ^ set( + config["imputation"]["time"]["retirement_delay_years"] + ) if mismatch: raise ValueError( f"Technologies in lifetimes and retirement delays mismatch: {mismatch}." diff --git a/workflow/rules/aggregate.smk b/workflow/rules/aggregate.smk index b5577c0..83e14b4 100644 --- a/workflow/rules/aggregate.smk +++ b/workflow/rules/aggregate.smk @@ -8,7 +8,7 @@ rule aggregate_capacity: output: aggregated="", plot=report( - "/{shapes}/aggregated/{adjustment}/{category}.pdf", + "/{shapes}/aggregated/{adjustment}/{category}_aggregation.png", caption="../report/aggregate_capacity.rst", category="Powerplants module", subcategory="{category}", @@ -22,6 +22,7 @@ rule aggregate_capacity: "../envs/powerplants.yaml" params: category=lambda wc: wc.category, + proj_crs=config["crs"]["projected"], message: "Aggregating capacity for {wildcards.shapes}-{wildcards.adjustment}-{wildcards.category}." script: diff --git a/workflow/rules/impute.smk b/workflow/rules/impute.smk index 6bf92ac..a4d5704 100644 --- a/workflow/rules/impute.smk +++ b/workflow/rules/impute.smk @@ -1,73 +1,81 @@ """Rules related to internal and user-provided imputations.""" -rule impute_years: +rule impute_location: input: - prepared="/automatic/prepared/{category}.parquet", - dissolved_shapes=rules.prepare_shapes.output.dissolved, + internal="/automatic/prepared/{category}.parquet", + shapes="", + user=branch( + exists(""), + then=[""], + otherwise=[], + ), output: - imputed="/automatic/shapes/{shapes}/imputed/{category}.parquet", - plot="/automatic/shapes/{shapes}/imputed/{category}.pdf", + relocated=temp( + "/automatic/shapes/{shapes}/impute_location/{category}.parquet" + ), + plot=report( + "/automatic/shapes/{shapes}/impute_location/{category}_relocation.png", + caption="../report/impute_location.rst", + category="Powerplants module", + subcategory="{category}", + ), log: - "/{shapes}/{category}/impute_years.log", + "/{shapes}/{category}/impute_location.log", wildcard_constraints: - dataset="|".join(IMPUTED_CAT), + category="|".join(IMPUTED_CAT), conda: "../envs/powerplants.yaml" params: - imputation=config["imputation"], - projected_crs=config["crs"]["projected"], - tech_map=lambda wc: get_technology_mapping(wc.category), + crs=internal["crs"] | config["crs"], + location_cnf=config["imputation"]["location"], + excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), + tech_mapping=lambda wc: get_technology_mapping(f"{wc.category}"), message: - "National-level imputation of missing years for all powerplants in {wildcards.shapes}-{wildcards.category} dataset." + "Imputation of user-configured location adjustment for {wildcards.shapes}-{wildcards.category}." script: - "../scripts/impute_years.py" + "../scripts/impute_location.py" -rule impute_category_combination: +rule impute_time: input: - internal=rules.impute_years.output.imputed, - user=branch( - exists(""), - then=[""], - otherwise=[], - ), + relocated=rules.impute_location.output.relocated, output: - combined=workflow.pathvars.apply("").format( + aged=workflow.pathvars.apply("").format( shapes="{shapes}", adjustment="unadjusted", category="{category}", ), - plot=report( - "/{shapes}/powerplants/unadjusted/{category}.pdf", - caption="../report/impute_category_combination_histogram.rst", + histogram=report( + "/{shapes}/powerplants/unadjusted/{category}_histogram.pdf", + caption="../report/impute_time_histogram.rst", category="Powerplants module", subcategory="{category}", ), - explore=report( - "/{shapes}/powerplants/unadjusted/{category}.html", - caption="../report/impute_category_combination_map.rst", + explorer=report( + "/{shapes}/powerplants/unadjusted/{category}_explorer.html", + caption="../report/impute_time_explorer.rst", category="Powerplants module", subcategory="{category}", ), log: - "/{shapes}/{category}/impute_category_combination.log", + "/{shapes}/{category}/impute_time.log", wildcard_constraints: - category="|".join(IMPUTED_CAT), + dataset="|".join(IMPUTED_CAT), conda: "../envs/powerplants.yaml" params: - tech_map=lambda wc: get_technology_mapping(f"{wc.category}"), - excluded=lambda wc: get_excluded_powerplant_ids(f"{wc.category}"), + imputation=config["imputation"]["time"], + tech_map=lambda wc: get_technology_mapping(wc.category), message: - "National-level imputation of user-configured inclusions and exclusions for {wildcards.shapes}-{wildcards.category}." + "National-level imputation of missing powerplant ages in {wildcards.shapes}-{wildcards.category} dataset." script: - "../scripts/impute_category_combination.py" + "../scripts/impute_ages.py" rule impute_capacity_adjustment: input: - unadjusted=rules.impute_category_combination.output.combined, + unadjusted=rules.impute_time.output.aged, stats=rules.prepare_statistics.output.categories, output: adjusted=workflow.pathvars.apply("").format( @@ -76,7 +84,7 @@ rule impute_capacity_adjustment: category="{category}", ), plot=report( - "/{shapes}/powerplants/adjusted/{category}.pdf", + "/{shapes}/powerplants/adjusted/{category}_adjustment.pdf", caption="../report/impute_capacity_adjustment.rst", category="Powerplants module", subcategory="{category}", diff --git a/workflow/rules/prepare.smk b/workflow/rules/prepare.smk index d7fe38c..4fe76b0 100644 --- a/workflow/rules/prepare.smk +++ b/workflow/rules/prepare.smk @@ -1,29 +1,6 @@ """Rules in this file focus on parsing and cleaning data into shared schemas.""" -rule prepare_shapes: - input: - shapes="", - output: - dissolved="/automatic/shapes/{shapes}/dissolved.parquet", - dissolved_plt=report( - "/automatic/shapes/{shapes}/dissolved.png", - caption="../report/prepare_shapes.rst", - category="Powerplants module", - subcategory="preparation", - ), - log: - "/{shapes}/prepare_shapes.log", - conda: - "../envs/powerplants.yaml" - params: - crs=config["crs"]["projected"], - message: - "Preparing intermediate shapefile versions to speed up processing." - script: - "../scripts/prepare_shapes.py" - - rule prepare_hydropower: input: glohydrores_path=rules.download_glohydrores.output.path, @@ -35,6 +12,7 @@ rule prepare_hydropower: "../envs/powerplants.yaml" params: technology_mapping=config["category"]["hydropower"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], message: "Preparing hydropower powerplants using the GloHydroRes dataset." script: @@ -55,6 +33,7 @@ rule prepare_large_solar: dc_ac_ratio=config["category"]["solar"]["dc_ac_ratio"]["utility_pv"], utility_pv_name=config["category"]["solar"]["technology_mapping"]["utility_pv"], csp_name=config["category"]["solar"]["technology_mapping"]["csp"], + geo_crs=internal["crs"]["geographic"], message: "Preparing utility PV powerplants using the TZ-SAM and GEM-GSPT datasets." script: @@ -65,7 +44,6 @@ if config["category"]["wind"]["source"] == "gem": rule prepare_wind_gem: input: - script=workflow.source_path("../scripts/prepare_wind_gwpt.py"), gem_gwpt=rules.download_gem.output.path.format(dataset="GWPT"), output: path="/automatic/prepared/wind.parquet", @@ -75,19 +53,16 @@ if config["category"]["wind"]["source"] == "gem": "../envs/powerplants.yaml" params: tech_map=config["category"]["wind"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], message: "Preparing wind powerplants using the Global Wind Power Tracker (GEM-GWPT) dataset." - shell: - """ - python {input.script:q} {input.gem_gwpt:q} \ - -t "{params.tech_map}" -o {output:q} 2> {log:q} - """ + script: + "../scripts/prepare_wind_gwpt.py" elif config["category"]["wind"]["source"] == "wemi": rule prepare_wind_wemi: input: - script=workflow.source_path("../scripts/prepare_wind_wemi.py"), wemi="", output: path="/automatic/prepared/wind.parquet", @@ -96,14 +71,12 @@ elif config["category"]["wind"]["source"] == "wemi": conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], tech_map=config["category"]["wind"]["technology_mapping"], message: "Preparing wind powerplants using the Wind Energy Market Intelligence dataset." - shell: - """ - python {input.script:q} {input.wemi:q} \ - -t "{params.tech_map}" -o {output.path:q} 2> {log:q} - """ + script: + "../scripts/prepare_wind_wemi.py" else: raise ValueError( @@ -122,8 +95,9 @@ rule prepare_bioenergy: conda: "../envs/powerplants.yaml" params: - technology_mapping=config["category"]["bioenergy"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], fuel_mapping=internal["fuel_mapping"] | config["fuel_mapping"], + technology_mapping=config["category"]["bioenergy"]["technology_mapping"], message: "Preparing bioenergy powerplants using the Global Bioenergy Power Tracker (GBPT) dataset." script: @@ -144,8 +118,9 @@ rule prepare_fossil: conda: "../envs/powerplants.yaml" params: - technology_mapping=config["category"]["fossil"]["technology_mapping"], + geo_crs=internal["crs"]["geographic"], fuel_mapping=internal["fuel_mapping"] | config["fuel_mapping"], + technology_mapping=config["category"]["fossil"]["technology_mapping"], message: "Preparing fossil powerplants using the GOGPT and GCPT datasets." script: @@ -162,6 +137,7 @@ rule prepare_nuclear: conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], technology_mapping=config["category"]["nuclear"]["technology_mapping"], message: "Preparing nuclear powerplants using the Global Nuclear Power Tracker (GNPT) dataset." @@ -179,6 +155,7 @@ rule prepare_geothermal: conda: "../envs/powerplants.yaml" params: + geo_crs=internal["crs"]["geographic"], technology_mapping=config["category"]["geothermal"]["technology_mapping"], message: "Preparing geothermal powerplants using the Global Geothermal Power Tracker (GGPT) dataset." diff --git a/workflow/rules/solar.smk b/workflow/rules/solar.smk index 15db9b6..52691f3 100644 --- a/workflow/rules/solar.smk +++ b/workflow/rules/solar.smk @@ -25,7 +25,7 @@ rule proxy_rooftop_pv: output: proxy="/automatic/shapes/{shapes}/proxies/rooftop_pv.tif", plot=report( - "/automatic/shapes/{shapes}/proxies/rooftop_pv.pdf", + "/automatic/shapes/{shapes}/proxies/rooftop_pv.png", caption="../report/proxy_rooftop_pv.rst", category="Powerplants module", subcategory="solar", @@ -58,14 +58,14 @@ rule impute_adjustment_solar: adjustment="adjusted", category="solar", ), - plot_map=report( - "/{shapes}/aggregated/adjusted/solar_map.pdf", + plot_aggregation=report( + "/{shapes}/aggregated/adjusted/solar_aggregation.png", caption="../report/aggregate_capacity.rst", category="Powerplants module", subcategory="solar", ), - plot_stats=report( - "/{shapes}/aggregated/adjusted/solar_stats.pdf", + plot_adjustment=report( + "/{shapes}/aggregated/adjusted/solar_adjustment.pdf", caption="../report/impute_capacity_adjustment.rst", category="Powerplants module", subcategory="solar", @@ -76,6 +76,7 @@ rule impute_adjustment_solar: "../envs/powerplants.yaml" params: category="solar", + proj_crs=config["crs"]["projected"], technology=config["category"]["solar"]["technology_mapping"]["rooftop_pv"], message: "Aggregating capacity for {wildcards.shapes}-adjusted-solar." diff --git a/workflow/scripts/_plots.py b/workflow/scripts/_plots.py index 38138af..c917883 100644 --- a/workflow/scripts/_plots.py +++ b/workflow/scripts/_plots.py @@ -10,13 +10,15 @@ from cmap import Colormap from matplotlib import pyplot as plt from matplotlib import ticker as mticker +from matplotlib.axes import Axes from matplotlib.patches import Patch -def draw_empty(ax, title, message="No data available"): +def draw_empty(ax: Axes, title: str = "", message="No data available"): """Helper to render an empty-data placeholder.""" ax.text(0.5, 0.5, message, ha="center", va="center", fontsize=12, alpha=0.7) - ax.set_title(title) + if title: + ax.set_title(title) ax.set_axis_off() @@ -284,10 +286,15 @@ def plot_capacity_adjustment( def plot_capacity_aggregation( - aggregated_file: str, shapes_file: str, output_file: str, category: str + aggregated_file: str, + shapes_file: str, + output_file: str, + category: str, + crs: int | str, ): """Plot aggregated capacity per region.""" shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(shapes_file)) + shapes = shapes.to_crs(_utils.check_crs(crs, "projected")) agg = _schemas.AggregatedPlantSchema.validate(pd.read_parquet(aggregated_file)) title = f"Aggregated {category} capacity" @@ -300,12 +307,12 @@ def plot_capacity_aggregation( shapes = shapes.set_index("shape_id") shapes["output_capacity_mw"] = cap_by_shape.replace(0, np.nan) - fig, ax = plt.subplots(figsize=(6, 6), dpi=300, rasterized=True) + fig, ax = plt.subplots(dpi=200) ax = shapes.plot( ax=ax, column="output_capacity_mw", - cmap="magma", + cmap="plasma", edgecolor="grey", linewidth=0.5, legend=True, @@ -313,6 +320,4 @@ def plot_capacity_aggregation( missing_kwds={"color": "lightgrey", "alpha": 0.2}, ) ax.set_title(title + f" in year {agg.attrs['year']}") - ax.set_xlabel("Longitude ($deg$)") - ax.set_ylabel("Latitude ($deg$)") fig.savefig(output_file, bbox_inches="tight") diff --git a/workflow/scripts/_schemas.py b/workflow/scripts/_schemas.py index b83d83d..ecdac54 100644 --- a/workflow/scripts/_schemas.py +++ b/workflow/scripts/_schemas.py @@ -55,7 +55,7 @@ class Config: country_id: Series[str] category: Series[str] technology: Series[str] - output_capacity_mw: Series[float] + output_capacity_mw: Series[float] = Field(gt=0) chp: Series[bool] | None ccs: Series[bool] | None fuel_class: Series[str] | None @@ -78,7 +78,7 @@ class Config: "General category of the powerplant." technology: Series[str] "Subcategory of the powerplant, if necessary." - output_capacity_mw: Series[float] = Field(ge=0) + output_capacity_mw: Series[float] = Field(gt=0) "Powerplant gross output capacity in Megawatts." # Temporal aspects start_year: Series[float] diff --git a/workflow/scripts/_utils.py b/workflow/scripts/_utils.py index 34ce3eb..b1a8d04 100644 --- a/workflow/scripts/_utils.py +++ b/workflow/scripts/_utils.py @@ -5,12 +5,34 @@ import geopandas as gpd import pandas as pd from pandas.api.types import is_list_like +from pyproj import CRS # Average year where powerplant datasets were last updated. # MUST BE ADJUSTED WHENEVER DATASOURCES ARE UPDATED! DATASET_YEAR = 2023 +def check_crs( + crs: int | str, how: Literal["projected", "geographic", "geocentric"] +) -> CRS: + """Helper to verify user-provided CRS codes.""" + parsed = CRS.from_user_input(crs) + correct = False + match how: + case "projected": + if parsed.is_projected: + correct = True + case "geographic": + if parsed.is_geographic: + correct = True + case "geocentric": + if parsed.is_geocentric: + correct = True + if not correct: + raise ValueError(f"{crs!r} is not {how!r}.") + return parsed + + def listify(item) -> list: """Avoids ambiguity in YAML list parameters.""" return item if is_list_like(item) else [item] @@ -41,7 +63,9 @@ def get_point_col( raw: pd.DataFrame, lon_col: str, lat_col: str, crs: str = "EPSG:4326" ) -> gpd.GeoSeries: """Converts latitude / longitude columns to a point geometry.""" - return gpd.points_from_xy(raw[lon_col], raw[lat_col], crs=crs) + return gpd.points_from_xy( + raw[lon_col], raw[lat_col], crs=check_crs(crs, "geographic") + ) def get_combined_text_col( @@ -69,42 +93,50 @@ def check_single_category(df: pd.DataFrame) -> str: def filter_years( - powerplants_df: pd.DataFrame, - year: int, - how: Literal["operating", "future"] = "operating", + powerplants_df: pd.DataFrame, year: int, how: Literal["operating", "future", "past"] ) -> pd.DataFrame: - """Standardised filtering of powerplants based on start / end years. + """Filter powerplants based on start/end year. + + Assumptions: + - A powerplant comes online on January 1 of `start_year`. + - A powerplant goes offline on January 1 of `end_year`. + - `end_year` > `start_year`. Args: - powerplants_df (pd.DataFrame): powerplant dataset to filter. - year (int): year to filter. - how (Literal["operating", "future"], optional): filtering approach. - Defaults to "operating". - - operating: only active powerplants in the given year. - - future: active and planned powerplant projects in the given year. + powerplants_df: Powerplant dataset to filter. + year: Reference year. + how: + - "operating": plants active during `year` + - "future": plants not yet online in `year` + - "past": plants already offline by `year` Returns: - pd.DataFrame: copy of the given dataframe after filtering applied. + A copy of the filtered dataframe. """ - if how == "operating": - filtered = powerplants_df[ - (powerplants_df["start_year"] <= year) & (year < powerplants_df["end_year"]) - ].copy() - elif how == "future": - # FIXME: this does not filter 'future' years at all! - filtered = powerplants_df[(powerplants_df["start_year"] <= year)].copy() - return filtered - - -def _clean_positive_capacity(plants: pd.DataFrame) -> pd.DataFrame: + match how: + case "operating": + mask = (powerplants_df["start_year"] <= year) & ( + year < powerplants_df["end_year"] + ) + case "future": + mask = powerplants_df["start_year"] > year + case "past": + mask = year >= powerplants_df["end_year"] + case _: + raise ValueError(f"Invalid request {how!r}.") + + return powerplants_df.loc[mask].copy() + + +def ensure_positive_capacity(df: pd.DataFrame) -> pd.DataFrame: """Remove rows with non-positive capacity.""" - return plants[plants["output_capacity_mw"] > 0].copy() + return df[df["output_capacity_mw"] > 0].copy() def get_adjusted_capacity( operating_plants: pd.DataFrame, expected_capacity: pd.Series ) -> pd.Series: - """Adjust powerplant capacity the total expected capacity per country. + """Adjust powerplant capacity to the total expected capacity per country. Args: operating_plants (pd.DataFrame): dataframe with all operating plants to adjust. @@ -120,40 +152,13 @@ def get_adjusted_capacity( return adjusted_cap_mw -def adjust_powerplant_capacity(plants, stats, year): - """Adjust powerplant capacity to national statistics in the given year. - - Keeps "future" projects beyond the given year. - """ - category = check_single_category(plants) - cat_stats = get_eia_stats_in_cat_yr(stats, year, category) - expected_capacity = cat_stats.groupby(["country_id"])["capacity_mw"].sum() - adjusted = _clean_positive_capacity(filter_years(plants, year, how="future")) - operating = filter_years(adjusted, year, how="operating") - operating = operating[operating["country_id"].isin(expected_capacity.index)] - - # Remove operating rows that cannot be matched to the requested year's stats. - adjusted = adjusted.drop( - index=filter_years(adjusted, year, how="operating").index.difference( - operating.index - ) - ) - - if operating.empty: - return adjusted.reset_index(drop=True) - - adjusted_cap = get_adjusted_capacity(operating, expected_capacity) - adjusted.loc[adjusted_cap.index, "output_capacity_mw"] = adjusted_cap - return adjusted.reset_index(drop=True) - - def adjust_aggregated_capacity(plants, stats, year): """Adjust capacity to national statistics in the given year.""" category = check_single_category(plants) stats = get_eia_stats_in_cat_yr(stats, year, category) expected_capacity = stats.groupby(["country_id"])["capacity_mw"].sum() - adjusted = _clean_positive_capacity(plants) + adjusted = ensure_positive_capacity(plants) adjusted = adjusted[adjusted["country_id"].isin(expected_capacity.index)] if adjusted.empty: diff --git a/workflow/scripts/aggregate_capacity.py b/workflow/scripts/aggregate_capacity.py index 11ecc00..49f0875 100644 --- a/workflow/scripts/aggregate_capacity.py +++ b/workflow/scripts/aggregate_capacity.py @@ -57,7 +57,7 @@ def capacity(powerplant_file: str, shapes_file: str, year: int, output_file: str agg_plants_df = pd.concat(agg_plants_arr, axis="index", ignore_index=True) agg_plants_df.attrs["year"] = year - agg_plants_df = _utils._clean_positive_capacity(agg_plants_df) + agg_plants_df = _utils.ensure_positive_capacity(agg_plants_df) _schemas.AggregatedPlantSchema.validate(agg_plants_df).to_parquet(output_file) @@ -74,4 +74,5 @@ def capacity(powerplant_file: str, shapes_file: str, year: int, output_file: str shapes_file=snakemake.input.shapes, output_file=snakemake.output.plot, category=snakemake.params.category, + crs=snakemake.params.proj_crs, ) diff --git a/workflow/scripts/impute_adjustment_solar.py b/workflow/scripts/impute_adjustment_solar.py index c029cdd..beecf52 100644 --- a/workflow/scripts/impute_adjustment_solar.py +++ b/workflow/scripts/impute_adjustment_solar.py @@ -33,7 +33,7 @@ def capacity_solar( # Combine and clean the data solar_mw = pd.concat([agg_roof_pv_cap, large_pv], ignore_index=True) - solar_mw = _utils._clean_positive_capacity(solar_mw) + solar_mw = _utils.ensure_positive_capacity(solar_mw) solar_mw.attrs = large_pv.attrs | agg_roof_pv_cap.attrs return _schemas.AggregatedPlantSchema.validate(solar_mw) @@ -50,15 +50,16 @@ def main(): _plots.plot_capacity_aggregation( aggregated_file=snakemake.output.aggregated, shapes_file=snakemake.input.shapes, - output_file=snakemake.output.plot_map, + output_file=snakemake.output.plot_aggregation, category=snakemake.params.category, + crs=snakemake.params.proj_crs, ) _plots.plot_capacity_adjustment( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.large_solar, adjusted_file=snakemake.output.aggregated, year=_utils.DATASET_YEAR, - output_file=snakemake.output.plot_stats, + output_file=snakemake.output.plot_adjustment, is_disagg=False, ) diff --git a/workflow/scripts/impute_ages.py b/workflow/scripts/impute_ages.py new file mode 100644 index 0000000..cd85627 --- /dev/null +++ b/workflow/scripts/impute_ages.py @@ -0,0 +1,160 @@ +"""Imputation of missing values.""" + +import sys +from typing import TYPE_CHECKING, Any + +import _plots +import _schemas +import _utils +import geopandas as gpd +import pandas as pd + +if TYPE_CHECKING: + snakemake: Any + +HISTORICAL = {"operating", "retired"} +SCENARIO_MAP = { + "historical": HISTORICAL, + "construction": HISTORICAL | {"construction"}, + "pre_construction": HISTORICAL | {"construction", "pre-construction"}, + "announced": HISTORICAL | {"construction", "pre-construction", "announced"}, +} + + +def _impute_start_year( + prepared_df: pd.DataFrame, lifetimes: dict[str, int] +) -> pd.Series: + """Fill start year using reasonable assumptions and user settings.""" + start_year = prepared_df["start_year"].copy() + + # Impute using the lifetime if possible. + mask_life = start_year.isna() & prepared_df["end_year"].notna() + start_year[mask_life] = prepared_df.loc[mask_life, "end_year"] - prepared_df.loc[ + mask_life, "technology" + ].map(lifetimes) + # Impute using country averages. + averages = ( + prepared_df.groupby(["country_id", "category", "technology", "status"])[ + "start_year" + ] + .transform("mean") + .round() + ) + mask_na = start_year.isna() + start_year.loc[mask_na] = averages[mask_na] + + return start_year + + +def _impute_end_year( + df: pd.DataFrame, lifetimes: dict[str, int], delay: dict[str, int] +) -> pd.Series: + """Impute end_year using lifetime. + + Old plants operating beyond lifetime will retired with a given delay. + """ + ref_year = _utils.DATASET_YEAR + + # Impute expected end year only if no data is present. + expected_end = df["start_year"] + df["technology"].map(lifetimes) + result = df["end_year"].copy().fillna(expected_end) + + # Plants operating beyond expected lifetime will be retired after a delay of >=1 yr + needs_delay = (result <= ref_year) & (df["status"] == "operating") + delayed_end = result + df["technology"].map(delay).fillna(0).astype(int) + delayed_end = delayed_end.clip(lower=ref_year + 1) + result.loc[needs_delay] = delayed_end.loc[needs_delay] + + return result + + +def _impute_status(df: pd.DataFrame) -> pd.Series: + """Impute powerplant status. + + Must be called after start/end years are complete. + """ + status = df["status"].copy() + ref_year = _utils.DATASET_YEAR + status.loc[ref_year < df["start_year"]] = "planned" + status.loc[(df["start_year"] <= ref_year) & (ref_year < df["end_year"])] = ( + "operating" + ) + status.loc[df["end_year"] <= ref_year] = "retired" + + if status.isna().any(): + raise ValueError("Entries with ambiguous states were left in the dataframe.") + + return status + + +def impute( + relocated_gdf: gpd.GeoDataFrame, imputation: dict, technology_mapping: dict +) -> gpd.GeoDataFrame: + """Add automatic and user imputations to fill missing data. + + Args: + relocated_gdf (gpd.GeoDataFrame): relocated powerplants (must have country_id). + imputation (str): imputation configuration. + technology_mapping (str): technology mapping configuration. + """ + if relocated_gdf.empty: + imputed = relocated_gdf + else: + _utils.check_single_category(relocated_gdf) + if (relocated_gdf.geometry.geom_type != "Point").any(): + raise ValueError( + "Polygon powerplant geometries detected. Only Points are supported." + ) + + lifetimes = imputation["lifetime_years"] + retirement_delay_years = imputation["retirement_delay_years"] + scenario = SCENARIO_MAP[imputation["scenario"]] + + # Get facilities within the requested scenario + imputed = relocated_gdf[relocated_gdf["status"].isin(scenario)].copy() + + if not imputed.empty: + # Adjust project dates + imputed["start_year"] = _impute_start_year(imputed, lifetimes) + imputed["end_year"] = _impute_end_year( + imputed, lifetimes, retirement_delay_years + ) + # Drop projects with insufficient date data + imputed = imputed.dropna(subset=["start_year", "end_year"]) + # Update the powerplant status + imputed["status"] = _impute_status(imputed) + + schema = _schemas.build_schema(technology_mapping, "impute") + return schema.validate(imputed) + + +def explore(imputed: gpd.GeoDataFrame, output_path: str, colormap="tab20"): + """Create a HTML map for users to explore.""" + if imputed.empty: + with open(output_path, "w") as f: + f.write("No data") + else: + explorer = imputed.explore( + column="technology", legend=True, popup=True, cmap=colormap + ) + explorer.save(output_path) + + +def main() -> None: + """Main snakemake process.""" + imputed_gdf = impute( + relocated_gdf=gpd.read_parquet(snakemake.input.relocated), + imputation=snakemake.params.imputation, + technology_mapping=snakemake.params.tech_map, + ) + imputed_gdf.to_parquet(snakemake.output.aged) + + _plots.plot_powerplant_capacity_buildup( + imputed_gdf, snakemake.output.histogram, "seaborn:tab20" + ) + explore(imputed_gdf, snakemake.output.explorer) + + +if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") + main() diff --git a/workflow/scripts/impute_capacity_adjustment.py b/workflow/scripts/impute_capacity_adjustment.py index 02eee99..f27b8f0 100644 --- a/workflow/scripts/impute_capacity_adjustment.py +++ b/workflow/scripts/impute_capacity_adjustment.py @@ -11,7 +11,43 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") + + +def adjustment(plants: pd.DataFrame, stats: pd.DataFrame, year: int) -> pd.DataFrame: + """Adjust operating plant capacity to national statistics in the given year. + + Rules: + - Past plants are kept unchanged. + - Future plants are kept unchanged. + - Operating plants are adjusted to match the statistics. + - If a country's expected operating capacity is missing or zero in that year, + operating plants in that country are removed. + """ + category = _utils.check_single_category(plants) + cat_stats = _utils.get_eia_stats_in_cat_yr(stats, year, category) + expected_capacity = cat_stats.groupby("country_id")["capacity_mw"].sum() + + # isolate powerplants operating in the requested year + adjusted = plants.copy() + operating = _utils.filter_years(adjusted, year, how="operating") + non_operating = adjusted.drop(index=operating.index) + + # empty placeholder for currently operating powerplants + adjusted_operating = operating.head(0).copy() + if not operating.empty and not expected_capacity.empty: + positive_expected = expected_capacity[expected_capacity > 0] + operating_to_adjust = operating[ + operating["country_id"].isin(positive_expected.index) + ].copy() + + if not operating_to_adjust.empty: + operating_to_adjust["output_capacity_mw"] = _utils.get_adjusted_capacity( + operating_to_adjust, positive_expected + ) + adjusted_operating = operating_to_adjust + + adjusted = pd.concat([non_operating, adjusted_operating], ignore_index=True) + return adjusted def adjust_powerplant_capacity( @@ -27,19 +63,19 @@ def adjust_powerplant_capacity( if plants.empty: adjusted_plants = plants else: - adjusted_plants = _utils.adjust_powerplant_capacity(plants, stats, year) + adjusted_plants = adjustment(plants, stats, year) _schemas.PlantSchema.validate(adjusted_plants).to_parquet(output_file) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") adjust_powerplant_capacity( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.unadjusted, year=_utils.DATASET_YEAR, output_file=snakemake.output.adjusted, ) - _plots.plot_capacity_adjustment( stats_file=snakemake.input.stats, unadjusted_file=snakemake.input.unadjusted, diff --git a/workflow/scripts/impute_category_combination.py b/workflow/scripts/impute_category_combination.py deleted file mode 100644 index 6d8fd57..0000000 --- a/workflow/scripts/impute_category_combination.py +++ /dev/null @@ -1,75 +0,0 @@ -"""Combine a given number of files belonging to a technology category. - -Optionally, remove a given number of powerplant_id. -""" - -import sys -from typing import TYPE_CHECKING, Any - -import _plots -import _schemas -import _utils -import geopandas as gpd -import pandas as pd - -if TYPE_CHECKING: - snakemake: Any -sys.stderr = open(snakemake.log[0], "w") - - -def impute( - input_paths: list[str], - output_path: str, - tech_mapping: dict[str, str], - excluded: list[str] | None = None, -): - """Combine a given number of category files into a final dataset.""" - combined_capacity = pd.concat( - (gpd.read_parquet(f) for f in input_paths), - ignore_index=True, - sort=False, - axis="index", - ) - if excluded: - to_drop = _utils.listify(excluded) - combined_capacity = combined_capacity[ - ~combined_capacity["powerplant_id"].isin(to_drop) - ] - - if not combined_capacity.empty: - _utils.check_single_category(combined_capacity) - - schema = _schemas.build_schema(tech_mapping, "impute") - schema.validate(combined_capacity).to_parquet(output_path) - - -def plot(imputed_path: str, output_path: str, colormap="tab20"): - """Plot stacked bar charts of active powerplant capacity over time per country.""" - df = pd.read_parquet(imputed_path) - _plots.plot_powerplant_capacity_buildup(df, output_path, colormap) - - -def explore(imputed_path: str, output_path: str, colormap="tab20"): - """Create a HTML map for users to explore.""" - df = gpd.read_parquet(imputed_path) - if df.empty: - with open(output_path, "w") as f: - f.write("No data") - else: - explorer = df.explore( - column="technology", legend=True, popup=True, cmap=colormap - ) - explorer.save(output_path) - - -if __name__ == "__main__": - impute( - input_paths=[snakemake.input.internal, *snakemake.input.user], - output_path=snakemake.output.combined, - tech_mapping=snakemake.params.tech_map, - excluded=snakemake.params.excluded, - ) - plot(imputed_path=snakemake.output.combined, output_path=snakemake.output.plot) - explore( - imputed_path=snakemake.output.combined, output_path=snakemake.output.explore - ) diff --git a/workflow/scripts/impute_location.py b/workflow/scripts/impute_location.py new file mode 100644 index 0000000..bfac9db --- /dev/null +++ b/workflow/scripts/impute_location.py @@ -0,0 +1,525 @@ +"""Adjust powerplant point locations to valid placement shapes. + +First, this will: +- combine powerplant categories and user provided files +- apply optional exclusions +- assign country IDs to in-scope powerplants +- split or reject powerplants falling in overlapping shapes + +Then, configured technologies are moved (per country) to the nearest shape matching +their requested shape class. +- Plants already inside a valid shape are kept in place. +- Plants outside are nudged just inside the nearest valid shape while minimizing +movement from their original location. +""" + +import sys +from collections.abc import Iterator, Mapping +from dataclasses import dataclass +from math import hypot +from typing import TYPE_CHECKING, Any, Literal + +import _schemas +import _utils +import geopandas as gpd +import pandas as pd +from matplotlib import pyplot as plt +from shapely import union_all +from shapely.geometry import LineString, MultiPolygon, Point, Polygon +from shapely.geometry.base import BaseGeometry +from shapely.ops import nearest_points + +if TYPE_CHECKING: + snakemake: Any + + +type OnError = Literal["raise", "ignore", "drop"] +type CountryOverlapMethod = Literal["raise", "split_capacity"] +type PolygonLike = Polygon | MultiPolygon + + +@dataclass(frozen=True) +class CountryAssignmentResult: + """Assigned powerplants and split-created IDs.""" + + powerplants: gpd.GeoDataFrame + split_powerplant_ids: pd.Index + + +@dataclass(frozen=True) +class LocationAdjustmentResult: + """Adjusted powerplants and forced-moved IDs.""" + + powerplants: gpd.GeoDataFrame + moved_powerplant_ids: pd.Index + + +def _nearest_part(point: Point, geometry: PolygonLike) -> Polygon: + """Return the polygon part closest to point.""" + if isinstance(geometry, MultiPolygon): + return min(geometry.geoms, key=point.distance) + return geometry + + +def _line_strings(geometry: BaseGeometry) -> Iterator[LineString]: + """Yield non-empty LineStrings from a possibly multipart geometry.""" + if geometry.is_empty: + return + + if isinstance(geometry, LineString) and geometry.length > 0: + yield geometry + return + + for part in getattr(geometry, "geoms", []): + yield from _line_strings(part) + + +def _entry_cut(ray: LineString, polygon: Polygon, contact: Point) -> LineString | None: + """Return the first interior segment entered from contact.""" + cut = min( + _line_strings(ray.intersection(polygon)), + key=lambda line: line.distance(contact), + default=None, + ) + + if cut is not None: + coords = list(cut.coords) + if Point(coords[-1]).distance(contact) < Point(coords[0]).distance(contact): + cut = LineString(coords[::-1]) + + return cut + + +def _polygon_span(polygon: Polygon) -> float: + """Return a conservative span length for extending a ray.""" + minx, miny, maxx, maxy = polygon.bounds + return hypot(maxx - minx, maxy - miny) + + +def _inward_ray( + point: Point, polygon: Polygon, inner_distance: float +) -> tuple[LineString, Point, Point]: + """Build a straight ray from the nearest contact point into the polygon.""" + contact = nearest_points(point, polygon)[1] + fallback = polygon.representative_point() + + dx = contact.x - point.x + dy = contact.y - point.y + length = hypot(dx, dy) + + if length == 0: + dx = fallback.x - contact.x + dy = fallback.y - contact.y + length = hypot(dx, dy) + + if length == 0: + return LineString([contact, fallback]), contact, fallback + + extension = _polygon_span(polygon) + inner_distance + end = Point( + contact.x + extension * dx / length, contact.y + extension * dy / length + ) + + return LineString([contact, end]), contact, fallback + + +def _point_on_cut(cut: LineString, inner_distance: float) -> Point: + """Return inner-distance point, or midpoint if the cut is too short.""" + distance = inner_distance if inner_distance < cut.length else cut.length / 2 + return cut.interpolate(distance) + + +def _place_slightly_inside( + point: Point, polygon: PolygonLike, inner_distance: float +) -> Point: + """Place point just inside polygon using the nearest inward cut.""" + polygon = _nearest_part(point, polygon) + ray, contact, fallback = _inward_ray(point, polygon, inner_distance) + + cut = _entry_cut(ray, polygon, contact) + if cut is None: + cut = _entry_cut(LineString([contact, fallback]), polygon, contact) + + placed = _point_on_cut(cut, inner_distance) if cut is not None else fallback + return placed if placed.within(polygon) else fallback + + +def _polygonal_geometry(geometry: BaseGeometry) -> PolygonLike | None: + """Return polygonal geometry, or None if no polygonal area remains.""" + if geometry.is_empty: + return None + + if isinstance(geometry, Polygon | MultiPolygon): + return geometry + + polygons = [] + for part in getattr(geometry, "geoms", []): + if isinstance(part, Polygon): + polygons.append(part) + elif isinstance(part, MultiPolygon): + polygons.extend(part.geoms) + + if not polygons: + return None + + merged = union_all(polygons) + return merged if isinstance(merged, Polygon | MultiPolygon) else None + + +def _raise_country_overlaps(overlaps: gpd.GeoDataFrame) -> None: + """Raise an error for powerplants assigned to multiple countries.""" + duplicate_ids = overlaps["powerplant_id"].drop_duplicates() + raise ValueError( + "Found powerplants intersecting multiple countries: " + f"{', '.join(map(str, duplicate_ids))}. " + "Please adjust your shapes, or enable the 'split_capacity' correction." + ) + + +def _split_shape_overlaps( + overlaps: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Split overlapping rows and move them into shape-exclusive areas.""" + overlaps = overlaps.copy() + + counts = overlaps["powerplant_id"].map(overlaps["powerplant_id"].value_counts()) + overlaps["output_capacity_mw"] /= counts + + shape_geometry = shapes.set_index("shape_id")["geometry"] + exclusive_geometry = { + shape_id: _polygonal_geometry( + shape_geometry.loc[shape_id].difference( + union_all(shape_geometry.drop(index=shape_id).to_list()) + ) + ) + for shape_id in overlaps["shape_id"].unique() + } + + missing = [ + shape_id for shape_id, geom in exclusive_geometry.items() if geom is None + ] + if missing: + raise ValueError(f"Shapes with no exclusive placement area: {missing}.") + + overlaps["geometry"] = overlaps.apply( + lambda row: _place_slightly_inside( + row.geometry, exclusive_geometry[row["shape_id"]], inner_distance + ), + axis=1, + ) + + suffixes = overlaps.groupby("powerplant_id").cumcount().astype(str) + overlaps["powerplant_id"] += "_duplicate_" + suffixes + + return overlaps + + +def assign_country_id( + prepared_powerplants: gpd.GeoDataFrame, + shapes: gpd.GeoDataFrame, + projected_crs: _utils.CRS, + inner_distance: float, + overlap_method: CountryOverlapMethod, +) -> CountryAssignmentResult: + """Assign country_id, dropping out-of-scope plants and handling shape overlaps.""" + output_columns = [*prepared_powerplants.columns, "country_id"] + shapes = shapes.to_crs(projected_crs)[["shape_id", "country_id", "geometry"]] + split_powerplant_ids = pd.Index([], dtype="object", name="powerplant_id") + + assigned = ( + gpd.sjoin( + prepared_powerplants.to_crs(projected_crs), + shapes, + predicate="intersects", + how="inner", + ) + .drop(columns="index_right") + .reset_index(drop=True) + ) + + overlaps = assigned[assigned["powerplant_id"].duplicated(keep=False)] + if not overlaps.empty: + if overlap_method == "raise": + _raise_country_overlaps(overlaps) + elif overlap_method == "split_capacity": + split_rows = _split_shape_overlaps( + overlaps=overlaps, shapes=shapes, inner_distance=inner_distance + ) + assigned.loc[overlaps.index] = split_rows + split_powerplant_ids = pd.Index( + split_rows["powerplant_id"], name="powerplant_id" + ) + else: + raise ValueError(f"Invalid overlap method {overlap_method!r}.") + + return CountryAssignmentResult( + powerplants=assigned.to_crs(prepared_powerplants.crs)[output_columns], + split_powerplant_ids=split_powerplant_ids, + ) + + +def _has_target_shape( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame +) -> pd.Series: + """Return whether each candidate has a matching shape class in-country.""" + available = pd.MultiIndex.from_frame( + shapes[["country_id", "shape_class"]].drop_duplicates() + ) + requested = pd.MultiIndex.from_frame( + candidates[["country_id", "target_shape_class"]] + ) + + return pd.Series(requested.isin(available), index=candidates.index) + + +def _raise_missing_shapes(missing: gpd.GeoDataFrame) -> None: + """Raise a compact error describing missing country/class combinations.""" + summary = ( + missing[["technology", "country_id", "target_shape_class"]] + .value_counts() + .rename("count") + .reset_index() + ) + details = "; ".join( + f"technology={row.technology!r}, country_id={row.country_id!r}, " + f"shape_class={row.target_shape_class!r} ({row['count']} plants)" + for _, row in summary.iterrows() + ) + raise ValueError(f"No matching shapes found: {details}.") + + +def _already_correct_index( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame +) -> pd.Index: + """Return candidate rows already inside a valid target shape.""" + if candidates.empty: + return pd.Index([]) + + shape_lookup = shapes[["shape_id", "country_id", "shape_class", "geometry"]].rename( + columns={"country_id": "shape_country_id", "shape_class": "matched_shape_class"} + ) + inside = gpd.sjoin(candidates, shape_lookup, how="inner", predicate="within") + inside = inside[ + (inside["country_id"] == inside["shape_country_id"]) + & (inside["target_shape_class"] == inside["matched_shape_class"]) + ] + + return inside.index.unique() + + +def _adjust_group( + group: gpd.GeoDataFrame, eligible_shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Adjust one country/class group to each point's nearest eligible shape.""" + matched = gpd.sjoin_nearest( + group, + eligible_shapes[["shape_id", "geometry"]], + how="left", + distance_col="_distance", + ) + + geometry_by_shape_id = eligible_shapes.set_index("shape_id")["geometry"] + matched["matched_geometry"] = matched["shape_id"].map(geometry_by_shape_id) + + matched = matched.sort_values(["powerplant_id", "_distance", "shape_id"]).loc[ + lambda df: ~df.index.duplicated(keep="first") + ] + + matched["geometry"] = matched.apply( + lambda row: _place_slightly_inside( + row.geometry, row.matched_geometry, inner_distance + ), + axis=1, + ) + + return matched[["geometry"]] + + +def _adjust_candidates( + candidates: gpd.GeoDataFrame, shapes: gpd.GeoDataFrame, inner_distance: float +) -> gpd.GeoDataFrame: + """Adjust all misplaced candidates to nearest valid shapes.""" + adjusted_parts = [] + + for (country_id, shape_class), group in candidates.groupby( + ["country_id", "target_shape_class"], sort=False + ): + eligible_shapes = shapes.loc[ + (shapes["country_id"] == country_id) + & (shapes["shape_class"] == shape_class), + ["shape_id", "geometry"], + ] + adjusted_parts.append(_adjust_group(group, eligible_shapes, inner_distance)) + + adjusted = ( + pd.concat(adjusted_parts) + if adjusted_parts + else gpd.GeoDataFrame( + columns=["geometry"], geometry="geometry", crs=candidates.crs + ) + ) + + return gpd.GeoDataFrame(adjusted, geometry="geometry", crs=candidates.crs) + + +def adjust_powerplant_location( + powerplants: gpd.GeoDataFrame, + shapes: gpd.GeoDataFrame, + forced_shape_class: Mapping[str, str], + projected_crs: _utils.CRS, + inner_distance: float, + on_error: OnError = "raise", +) -> LocationAdjustmentResult: + """Adjust configured technologies to nearest valid shape in the same country.""" + plants = powerplants.to_crs(projected_crs).copy() + plants["target_shape_class"] = plants["technology"].map(forced_shape_class) + + shapes = shapes.to_crs(projected_crs).copy() + candidates = plants[plants["target_shape_class"].notna()].copy() + + has_target = _has_target_shape(candidates, shapes) + missing = candidates[~has_target] + + if not missing.empty and on_error == "raise": + _raise_missing_shapes(missing) + + drop_index = missing.index if on_error == "drop" else pd.Index([]) + valid = candidates[has_target & ~candidates.index.isin(drop_index)] + + already_correct = _already_correct_index(valid, shapes) + to_adjust = valid[~valid.index.isin(already_correct)] + moved_powerplant_ids = pd.Index(to_adjust["powerplant_id"], name="powerplant_id") + + adjusted = _adjust_candidates(to_adjust, shapes, inner_distance) + + result = plants.drop(index=drop_index) + if not adjusted.empty: + result.loc[adjusted.index, "geometry"] = adjusted["geometry"] + + result = result.drop(columns="target_shape_class") + result = result[powerplants.columns] + result = result.to_crs(powerplants.crs) + + return LocationAdjustmentResult( + powerplants=result, moved_powerplant_ids=moved_powerplant_ids + ) + + +def combine_powerplants( + file_paths: list[str], geo_crs: _utils.CRS, excluded: list[str] +) -> gpd.GeoDataFrame: + """Combine internal category files and user files into a final dataset.""" + combined = pd.concat( + (gpd.read_parquet(f).to_crs(geo_crs) for f in file_paths), + ignore_index=True, + sort=False, + ) + combined = combined[~combined["powerplant_id"].isin(_utils.listify(excluded))] + if not combined.empty: + _utils.check_single_category(combined) + return combined + + +def plot( + adjustment: LocationAdjustmentResult, + assignment: CountryAssignmentResult, + shapes: gpd.GeoDataFrame, + projected_crs: _utils.CRS, +): + """Plot final powerplants, highlighting split and forced-moved plants.""" + fig, ax = plt.subplots(layout="constrained") + + adjusted = adjustment.powerplants.to_crs(projected_crs).set_index( + "powerplant_id", drop=False + ) + shapes = shapes.to_crs(projected_crs) + + split = adjusted.loc[adjusted.index.intersection(assignment.split_powerplant_ids)] + moved = adjusted.loc[adjusted.index.intersection(adjustment.moved_powerplant_ids)] + + highlighted = split.index.union(moved.index) + other = adjusted.loc[adjusted.index.difference(highlighted)] + + if not shapes.empty: + shapes.boundary.plot( + ax=ax, linewidth=0.5, color="black", label=f"Shapes (n={len(shapes)})" + ) + + layers = [ + (other, "Valid plants"), + (split, "Split due to overlaps"), + (moved, "Forced shape class"), + ] + + for data, label in layers: + if not data.empty: + data.plot( + ax=ax, + markersize=8 if label == "Valid plants" else 28, + marker=".", + alpha=0.5 if label == "Valid plants" else 0.9, + label=f"{label} (n={len(data)})", + ) + + ax.set_aspect("equal") + ax.set_axis_off() + + handles, labels = ax.get_legend_handles_labels() + if handles: + ax.legend( + handles, labels, loc="upper left", bbox_to_anchor=(1, 1), borderaxespad=0 + ) + + return fig, ax + + +def main() -> None: + """Main snakemake process.""" + shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes)) + + projected_crs = _utils.check_crs(snakemake.params.crs["projected"], "projected") + geographic_crs = _utils.check_crs(snakemake.params.crs["geographic"], "geographic") + + location_cnf = snakemake.params.location_cnf + inner_distance = location_cnf["inner_distance"] + + combined = combine_powerplants( + file_paths=[snakemake.input.internal, *snakemake.input.user], + geo_crs=geographic_crs, + excluded=snakemake.params.excluded, + ) + schema = _schemas.build_schema(snakemake.params.tech_mapping, "prepare") + combined = schema.validate(combined) + + assignment = assign_country_id( + prepared_powerplants=combined, + shapes=shapes, + projected_crs=projected_crs, + inner_distance=inner_distance, + overlap_method=location_cnf["on_overlap"], + ) + assigned = schema.validate(assignment.powerplants) + + adjustment = adjust_powerplant_location( + powerplants=assigned, + shapes=shapes, + forced_shape_class=location_cnf.get("forced_class", {}), + projected_crs=projected_crs, + inner_distance=inner_distance, + on_error=location_cnf["on_forced_class_error"], + ) + adjusted = schema.validate(adjustment.powerplants) + adjusted.to_parquet(snakemake.output.relocated) + + fig, _ = plot( + adjustment=adjustment, + assignment=assignment, + shapes=shapes, + projected_crs=projected_crs, + ) + fig.savefig(snakemake.output.plot, dpi=200, bbox_inches="tight") + + +if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") + main() diff --git a/workflow/scripts/impute_years.py b/workflow/scripts/impute_years.py deleted file mode 100644 index 792f6c7..0000000 --- a/workflow/scripts/impute_years.py +++ /dev/null @@ -1,207 +0,0 @@ -"""Imputation of missing values.""" - -import sys -from typing import TYPE_CHECKING, Any - -import _plots -import _schemas -import _utils -import geopandas as gpd -import pandas as pd - -if TYPE_CHECKING: - snakemake: Any - -HISTORICAL = {"operating", "retired"} -SCENARIO_MAP = { - "historical": HISTORICAL, - "near-future": HISTORICAL | {"construction"}, - "far-future": HISTORICAL | {"construction", "pre-construction"}, - "far-off-future": HISTORICAL | {"construction", "pre-construction", "announced"}, -} - - -def _impute_start_year( - prepared_df: pd.DataFrame, lifetimes: dict[str, int] -) -> pd.Series: - """Fill start year using reasonable assumptions and user settings.""" - start_year = prepared_df["start_year"].copy() - - # Impute using the lifetime if possible. - mask_life = start_year.isna() & prepared_df["end_year"].notna() - start_year[mask_life] = prepared_df.loc[mask_life, "end_year"] - prepared_df.loc[ - mask_life, "technology" - ].map(lifetimes) - # Impute using country averages. - averages = ( - prepared_df.groupby(["country_id", "category", "technology", "status"])[ - "start_year" - ] - .transform("mean") - .round() - ) - mask_na = start_year.isna() - start_year.loc[mask_na] = averages[mask_na] - - return start_year - - -def _impute_end_year( - df: pd.DataFrame, lifetimes: dict[str, int], delay: dict[str, int] -) -> pd.Series: - """Impute end_year using lifetime. - - Old plants operating beyond lifetime will retired with a given delay. - """ - ref_year = _utils.DATASET_YEAR - - # Impute expected end year only if no data is present. - expected_end = df["start_year"] + df["technology"].map(lifetimes) - result = df["end_year"].copy().fillna(expected_end) - - # Plants operating beyond expected lifetime will be retired after a delay of >=1 yr - needs_delay = (result <= ref_year) & (df["status"] == "operating") - delayed_end = result + df["technology"].map(delay).fillna(0).astype(int) - delayed_end = delayed_end.clip(lower=ref_year + 1) - result.loc[needs_delay] = delayed_end.loc[needs_delay] - - return result - - -def _impute_status(df: pd.DataFrame) -> pd.Series: - """Impute powerplant status. - - Must be called after start/end years are complete. - """ - status = df["status"].copy() - ref_year = _utils.DATASET_YEAR - status.loc[ref_year < df["start_year"]] = "planned" - status.loc[(df["start_year"] <= ref_year) & (ref_year < df["end_year"])] = ( - "operating" - ) - status.loc[df["end_year"] <= ref_year] = "retired" - - if status.isna().any(): - raise ValueError("Entries with ambiguous states were left in the dataframe.") - - return status - - -def impute( - prepared_cat_gdf: gpd.GeoDataFrame, - countries_gdf: gpd.GeoDataFrame, - imputation: dict, - technology_mapping: dict, - projected_crs: str, -) -> gpd.GeoDataFrame: - """Add automatic and user imputations to fill missing data. - - Args: - prepared_cat_gdf (gpd.GeoDataFrame): cleaned category dataset following our schema. - countries_gdf (str): country-level shapes to use. - output_path (str): resulting dataset. - imputation (str): imputation configuration. - technology_mapping (str): technology mapping configuration. - projected_crs (str): crs used to calculate centroids. - """ - _utils.check_single_category(prepared_cat_gdf) - - # Re-map polygons to their centroid to simplify further processing. - # TODO: consider splitting them between shapes instead? - polygon_mask = prepared_cat_gdf[ - prepared_cat_gdf.geometry.geom_type != "Point" - ].index - if polygon_mask.any(): - if projected_crs: - prev_crs = prepared_cat_gdf.crs - prepared_cat_gdf.loc[polygon_mask, "geometry"] = ( - prepared_cat_gdf.loc[polygon_mask, "geometry"] - .to_crs(projected_crs) - .centroid.to_crs(prev_crs) - ) - else: - raise ValueError( - "Polygon powerplant geometries detected. Specify a projected CRS." - ) - - lifetimes = imputation["lifetime_years"] - retirement_delay_years = imputation["retirement_delay_years"] - scenario = SCENARIO_MAP[imputation["scenario"]] - - # Get facilities within the provided regions and for the given scenario - imputed = gpd.sjoin( - prepared_cat_gdf[prepared_cat_gdf["status"].isin(scenario)], - countries_gdf, - predicate="intersects", - how="inner", - ).drop("index_right", axis="columns") - - if not imputed.empty: - # Adjust project dates - imputed["start_year"] = _impute_start_year(imputed, lifetimes) - imputed["end_year"] = _impute_end_year( - imputed, lifetimes, retirement_delay_years - ) - - # Drop projects with insufficient date data and then adjust status. - imputed = imputed.dropna(subset=["start_year", "end_year"]) - imputed["status"] = _impute_status(imputed) - - imputed = handle_polygon_overlaps( - imputed, imputation["shape_overlap_correction"] - ) - - schema = _schemas.build_schema(technology_mapping, "impute") - return schema.validate(imputed) - - -def handle_polygon_overlaps(df: gpd.GeoDataFrame, method: str) -> gpd.GeoDataFrame: - """Handle duplicate powerplant assignments caused by overlapping shapes.""" - if method not in {"strict", "split_capacity"}: - raise ValueError( - f"Unsupported polygon overlap method {method!r}. " - "Expected 'strict' or 'split_capacity'." - ) - - dup_mask = df["powerplant_id"].duplicated(keep=False) - - if dup_mask.any(): - duplicate_ids = df.loc[dup_mask, "powerplant_id"] - - if method == "strict": - raise ValueError( - "Found duplicate IDs, likely due to overlapping polygons: " - f"{', '.join(map(str, duplicate_ids.unique()))}. " - "Please adjust your shapes, or enable the 'split_capacity' correction." - ) - - counts = duplicate_ids.map(df["powerplant_id"].value_counts()) - - df.loc[dup_mask, "output_capacity_mw"] = ( - df.loc[dup_mask, "output_capacity_mw"] / counts - ) - - suffixes = df.loc[dup_mask].groupby("powerplant_id").cumcount().astype(str) - df.loc[dup_mask, "powerplant_id"] = duplicate_ids + "_duplicate_" + suffixes - - return df.reset_index(drop=True) - - -def main() -> None: - """Main snakemake process.""" - imputed_gdf = impute( - prepared_cat_gdf=gpd.read_parquet(snakemake.input.prepared), - countries_gdf=gpd.read_parquet(snakemake.input.dissolved_shapes), - imputation=snakemake.params.imputation, - technology_mapping=snakemake.params.tech_map, - projected_crs=snakemake.params.projected_crs, - ) - imputed_gdf.to_parquet(snakemake.output.imputed) - _plots.plot_powerplant_capacity_buildup( - imputed_gdf, snakemake.output.plot, "seaborn:tab20" - ) - - -if __name__ == "__main__": - sys.stderr = open(snakemake.log[0], "w") - main() diff --git a/workflow/scripts/prepare_bioenergy.py b/workflow/scripts/prepare_bioenergy.py index 1d042b3..f4f4f76 100644 --- a/workflow/scripts/prepare_bioenergy.py +++ b/workflow/scripts/prepare_bioenergy.py @@ -32,6 +32,7 @@ def main( gem_gbpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], + crs: str, output_plants_path: str, output_fuels_path: str, ): @@ -55,11 +56,12 @@ def main( "start_year": _start_year(raw_df), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": False, "chp": False, "fuel_class": fuel_class, - } + }, + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(bioenergy_df).to_parquet(output_plants_path) @@ -71,6 +73,7 @@ def main( gem_gbpt_path=snakemake.input.gem_gbpt, technology_mapping=snakemake.params.technology_mapping, fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, output_fuels_path=snakemake.output.fuels, ) diff --git a/workflow/scripts/prepare_fossil.py b/workflow/scripts/prepare_fossil.py index e290a53..d8bfd8f 100644 --- a/workflow/scripts/prepare_fossil.py +++ b/workflow/scripts/prepare_fossil.py @@ -73,7 +73,10 @@ def _get_end_year( def prepare_gem_gcpt( - gem_gcpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str] + gem_gcpt_path: str, + technology_mapping: dict[str, str], + fuel_mapping: dict[str, str], + crs: str, ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain coal power locations using GEM-GCPT data.""" raw_df = gem.read_gem_dataset(gem_gcpt_path, ["Units"]) @@ -97,12 +100,14 @@ def prepare_gem_gcpt( "start_year": gem.year_col(raw_df, "start"), "end_year": _get_end_year(raw_df, planned_ret_col="planned_retirement"), "status": gem.status_col(raw_df, mapping=STATUS_MAPPING_COAL), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": _get_coal_ccs(raw_df), "chp": False, # Not specified in GCPT "fuel_class": fuel_class, - } + }, + crs=crs, ).reset_index(drop=True) + coal_df = _utils.ensure_positive_capacity(coal_df) schema = _schemas.build_schema(technology_mapping, "prepare") return schema.validate(coal_df), _schemas.FuelSchema.validate(fuels_df) @@ -111,6 +116,7 @@ def prepare_gem_gogpt( gem_gogpt_path: str, technology_mapping: dict[str, str], fuel_mapping: dict[str, str], + crs: str, ) -> tuple[gpd.GeoDataFrame, pd.DataFrame]: """Obtain oil and gas power plants using GEM-GOGPT data.""" raw_df = gem.read_gem_dataset( @@ -135,12 +141,14 @@ def prepare_gem_gogpt( "start_year": gem.year_col(raw_df, "start"), "end_year": _get_end_year(raw_df, planned_ret_col="planned_retire"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), "ccs": raw_df["ccs_attachment?"] == "yes", "chp": raw_df["chp"] == "yes", "fuel_class": fuel_class, - } + }, + crs=crs, ).reset_index(drop=True) + oil_gas_df = _utils.ensure_positive_capacity(oil_gas_df) schema = _schemas.build_schema(technology_mapping, "prepare") return schema.validate(oil_gas_df), _schemas.FuelSchema.validate(fuels_df) @@ -152,6 +160,7 @@ def main() -> None: gem_gogpt_path=snakemake.input.gem_gogpt, technology_mapping=snakemake.params.technology_mapping["oil_gas"], fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs, ) og_plants.to_parquet(snakemake.output.og_plants) og_fuels.to_parquet(snakemake.output.og_fuels) @@ -161,6 +170,7 @@ def main() -> None: gem_gcpt_path=snakemake.input.gem_gcpt, technology_mapping=snakemake.params.technology_mapping["coal"], fuel_mapping=snakemake.params.fuel_mapping, + crs=snakemake.params.geo_crs, ) coal_plants.to_parquet(snakemake.output.coal_plants) coal_fuels.to_parquet(snakemake.output.coal_fuels) diff --git a/workflow/scripts/prepare_geothermal.py b/workflow/scripts/prepare_geothermal.py index 12fec71..3d29517 100644 --- a/workflow/scripts/prepare_geothermal.py +++ b/workflow/scripts/prepare_geothermal.py @@ -10,11 +10,13 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def main( - gem_ggpt_path: str, technology_mapping: dict[str, str], output_plants_path: str + gem_ggpt_path: str, + technology_mapping: dict[str, str], + crs: str, + output_plants_path: str, ): """Obtain geothermal power plants using GEM-GGPT data.""" raw_df = gem.read_gem_dataset( @@ -35,16 +37,19 @@ def main( "start_year": gem.year_col(raw_df, "start"), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(geo_df).to_parquet(output_plants_path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main( gem_ggpt_path=snakemake.input.gem_ggpt, technology_mapping=snakemake.params.technology_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, ) diff --git a/workflow/scripts/prepare_hydropower.py b/workflow/scripts/prepare_hydropower.py index f8ff632..c7315b2 100644 --- a/workflow/scripts/prepare_hydropower.py +++ b/workflow/scripts/prepare_hydropower.py @@ -11,7 +11,6 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def _technology(gem_df: pd.DataFrame, tech_mapping: dict[str, str]) -> pd.Series: @@ -37,9 +36,12 @@ def _reservoir_km3(gem_df: pd.DataFrame) -> pd.Series: return gem_df["res_vol_km3"].where(gem_df["res_vol_km3"] >= 0, np.nan) -def main(input_path: str, output_path: str, technology_mapping: dict[str, str]): +def main(): """Prepare a cleaned hydropower dataset.""" - raw_df = pd.read_csv(input_path) + raw_df = pd.read_csv(snakemake.input.glohydrores_path) + technology_mapping = snakemake.params.technology_mapping + crs = snakemake.params.geo_crs + raw_df = raw_df.dropna(subset=["capacity_mw", "plant_lon", "plant_lat"]) hydro_df = gpd.GeoDataFrame( { @@ -51,17 +53,15 @@ def main(input_path: str, output_path: str, technology_mapping: dict[str, str]): "start_year": raw_df["year"], "end_year": np.nan, "status": "operating", - "geometry": _utils.get_point_col(raw_df, "plant_lon", "plant_lat"), + "geometry": _utils.get_point_col(raw_df, "plant_lon", "plant_lat", crs), "reservoir_km3": _reservoir_km3(raw_df), - } + }, + crs=crs, ) schema = _schemas.build_schema(technology_mapping, "prepare") - schema.validate(hydro_df).to_parquet(output_path) + schema.validate(hydro_df).to_parquet(snakemake.output.output_path) if __name__ == "__main__": - main( - input_path=snakemake.input.glohydrores_path, - output_path=snakemake.output.output_path, - technology_mapping=snakemake.params.technology_mapping, - ) + sys.stderr = open(snakemake.log[0], "w") + main() diff --git a/workflow/scripts/prepare_large_solar.py b/workflow/scripts/prepare_large_solar.py index 6077d85..daef7e0 100644 --- a/workflow/scripts/prepare_large_solar.py +++ b/workflow/scripts/prepare_large_solar.py @@ -59,9 +59,9 @@ def fill_tz_with_gem( def get_gem_v_tz_mismatch( gem_df: gpd.GeoDataFrame, tz_sam_df: gpd.GeoDataFrame, - valid_status: list[str] | None = None, + valid_status: list[str], buffer=1000, - proj_crs="epsg:3857", + proj_crs="epsg:8857", ) -> gpd.GeoDataFrame: """Estimation of future projects missed by TZ-SAM's satellite-based approach. @@ -69,6 +69,7 @@ def get_gem_v_tz_mismatch( - Only projects with known pre-operation states are accepted. - Only projects outside a buffer are accepted (default 1 km). """ + _utils.check_crs(proj_crs, "projected") if gem_df.crs != tz_sam_df.crs: raise ValueError("GEM and TZ-SAM CRS mismatch.") @@ -76,7 +77,7 @@ def get_gem_v_tz_mismatch( if valid_status: future_gem_df = gem_df[gem_df["status"].isin(valid_status)] else: - future_gem_df = gem_df + raise ValueError(f"Must specify valid status to get. Got {valid_status!r}.") # Buffer around the points using a projected CRS future_gem_buffered = future_gem_df.copy() @@ -149,9 +150,9 @@ def prepare_solar_utility_pv( valid_status=["announced", "pre-construction", "construction", "retired"], ) - utility_pv = pd.concat([filled_tz_df, gem_mismatch_df], axis="index") - utility_pv = utility_pv.reset_index(drop=True) - + utility_pv = pd.concat([filled_tz_df, gem_mismatch_df], ignore_index=True) + # Convert to points to make further processing easier. + utility_pv["geometry"] = utility_pv["geometry"].representative_point() schema = _schemas.build_schema({"utility_pv": tech_name}, "prepare") return schema.validate(utility_pv) @@ -188,7 +189,6 @@ def prepare_solar_csp( def main() -> None: """Main snakemake process.""" - sys.stderr = open(snakemake.log[0], "w") utility_pv_gdf = prepare_solar_utility_pv( snakemake.input.tz_sam, snakemake.input.gem_gspt, @@ -197,12 +197,15 @@ def main() -> None: ) csp_gdf = prepare_solar_csp(snakemake.input.gem_gspt, snakemake.params.csp_name) + crs = _utils.check_crs(snakemake.params.geo_crs, "geographic") + # Combine into one large category large_solar_gdf = pd.concat( [utility_pv_gdf, csp_gdf], ignore_index=True, sort=False, axis="index" ) - large_solar_gdf.to_parquet(snakemake.output.large_solar) + large_solar_gdf.to_crs(crs).to_parquet(snakemake.output.large_solar) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() diff --git a/workflow/scripts/prepare_nuclear.py b/workflow/scripts/prepare_nuclear.py index f7adbd1..4985320 100644 --- a/workflow/scripts/prepare_nuclear.py +++ b/workflow/scripts/prepare_nuclear.py @@ -11,11 +11,13 @@ if TYPE_CHECKING: snakemake: Any -sys.stderr = open(snakemake.log[0], "w") def main( - gem_gnpt_path: str, technology_mapping: dict[str, str], output_plants_path: str + gem_gnpt_path: str, + technology_mapping: dict[str, str], + crs: str, + output_plants_path: str, ): """Obtain nuclear power plants using GEM-GNPT data.""" raw_df = gem.read_gem_dataset(gem_gnpt_path, ["Data"]) @@ -38,16 +40,19 @@ def main( raw_df["retirement_date"], format="mixed" ).dt.year, "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs, ).reset_index(drop=True) schema = _schemas.build_schema(technology_mapping, "prepare") schema.validate(nuclear_df).to_parquet(output_plants_path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main( gem_gnpt_path=snakemake.input.gem_gnpt, technology_mapping=snakemake.params.technology_mapping, + crs=snakemake.params.geo_crs, output_plants_path=snakemake.output.plants, ) diff --git a/workflow/scripts/prepare_shapes.py b/workflow/scripts/prepare_shapes.py deleted file mode 100644 index f8f8654..0000000 --- a/workflow/scripts/prepare_shapes.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Prepare intermediate versions of the shape file used during processing.""" - -import sys -from typing import TYPE_CHECKING, Any - -import _schemas -import geopandas as gpd -from cmap import Colormap -from matplotlib import pyplot as plt -from matplotlib.axes import Axes -from matplotlib.figure import Figure - -if TYPE_CHECKING: - snakemake: Any - - -def dissolve_by_country(shapes: gpd.GeoDataFrame) -> gpd.GeoDataFrame: - """Dissolve shapes by their country.""" - countries_gdf = ( - shapes[["country_id", "geometry"]].dissolve("country_id").reset_index() - ) - countries_gdf["geometry"] = countries_gdf.buffer(0) - return countries_gdf - - -def plot( - original: gpd.GeoDataFrame, - new: gpd.GeoDataFrame, - *, - name: str, - crs: str, - cmap: str, - col: str, -) -> tuple[Figure, Axes]: - """Helper to view computed changes.""" - fig, axes = plt.subplots(1, 2, layout="constrained") - original.to_crs(crs).plot("shape_class", ax=axes[0]) - axes[0].set_title("User") - new_proj = new.to_crs(crs) - if len(new[col].unique()) == 1: - # FIXME: odd issue when plotting a single category - # Upgrading geopandas might fix it. - new_proj.plot(ax=axes[1], color="lightcoral") - else: - new_proj.plot(col, ax=axes[1], cmap=Colormap(cmap).to_mpl()) - new_proj.boundary.plot(ax=axes[1], color="black", lw=0.05) - axes[1].set_title(name) - for ax in axes: - ax.set(xticks=[], yticks=[], xlabel="", ylabel="") - return fig, axes - - -def main(): - """Prepare a cleaned hydropower dataset.""" - shapes = _schemas.ShapeSchema.validate(gpd.read_parquet(snakemake.input.shapes)) - dissolved = dissolve_by_country(shapes) - dissolved.to_parquet(snakemake.output.dissolved) - - fig, _ = plot( - shapes, - dissolved, - col="country_id", - name="Dissolved", - crs=snakemake.params.crs, - cmap="glasbey:glasbey", - ) - fig.savefig(snakemake.output.dissolved_plt, dpi=200, bbox_inches="tight") - - -if __name__ == "__main__": - sys.stderr = open(snakemake.log[0], "w") - main() diff --git a/workflow/scripts/prepare_wind_gwpt.py b/workflow/scripts/prepare_wind_gwpt.py index 7eb5fb1..3eebd8d 100644 --- a/workflow/scripts/prepare_wind_gwpt.py +++ b/workflow/scripts/prepare_wind_gwpt.py @@ -4,13 +4,17 @@ A free alternative to the WEMI dataset. """ +import sys +from typing import TYPE_CHECKING, Any + import _gem as gem import _schemas import _utils -import click import geopandas as gpd import pandas as pd -import yaml + +if TYPE_CHECKING: + snakemake: Any INTERNAL_TECH_MAPPING = { "Onshore": "onshore", @@ -24,19 +28,15 @@ def _technology(gem_df: pd.DataFrame): return gem_df["installation_type"].apply(lambda x: INTERNAL_TECH_MAPPING[x]) -@click.command() -@click.argument("gem_gwpt_path", type=click.Path(dir_okay=False)) -@click.option("-o", "output_path", type=click.Path(dir_okay=False), required=True) -@click.option("-t", "tech_mapping", type=str, required=True) -def main(gem_gwpt_path: str, output_path: str, tech_mapping: str): +def prepare_gem_gwpt( + gem_gwpt_path: str, tech_mapping: dict, crs: str +) -> gpd.GeoDataFrame: """Obtain wind power locations using GEM-GWPT data.""" raw_df = gem.read_gem_dataset(gem_gwpt_path, gem.GEM_GWPT_SHEETS) # Remove unknown installation types to avoid misplacement raw_df = raw_df[raw_df["installation_type"] != "Unknown"] raw_df = raw_df.dropna(subset=["installation_type"]) - tech_map = yaml.safe_load(tech_mapping) - wind_df = gpd.GeoDataFrame( { "powerplant_id": _utils.get_combined_text_col( @@ -46,17 +46,27 @@ def main(gem_gwpt_path: str, output_path: str, tech_mapping: str): raw_df, ["project_name", "phase_name"] ), "category": "wind", - "technology": _technology(raw_df).map(tech_map), + "technology": _technology(raw_df).map(tech_mapping), "output_capacity_mw": raw_df["capacity_(mw)"], "start_year": gem.year_col(raw_df, "start"), "end_year": gem.year_col(raw_df, "end"), "status": gem.status_col(raw_df), - "geometry": _utils.get_point_col(raw_df, "longitude", "latitude"), - } + "geometry": _utils.get_point_col(raw_df, "longitude", "latitude", crs), + }, + crs=crs, + ) + schema = _schemas.build_schema(tech_mapping, "prepare") + return schema.validate(wind_df) + + +def main(): + """Main snakemake process.""" + gem_gwpt = prepare_gem_gwpt( + snakemake.input.gem_gwpt, snakemake.params.tech_map, snakemake.params.geo_crs ) - schema = _schemas.build_schema(tech_map, "prepare") - schema.validate(wind_df).to_parquet(output_path) + gem_gwpt.to_parquet(snakemake.output.path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() diff --git a/workflow/scripts/prepare_wind_wemi.py b/workflow/scripts/prepare_wind_wemi.py index a7f3c60..f0bfcd3 100644 --- a/workflow/scripts/prepare_wind_wemi.py +++ b/workflow/scripts/prepare_wind_wemi.py @@ -4,14 +4,18 @@ Available here: https://www.thewindpower.net/about_en.php """ +import sys +from typing import TYPE_CHECKING, Any + import _schemas -import click import geopandas as gpd import numpy as np import pandas as pd -import yaml from _utils import get_point_col +if TYPE_CHECKING: + snakemake: Any + WEMI_CRS = "EPSG:4326" KW_TO_MW = 1 / 1000 STATUS_MAPPING = { @@ -42,19 +46,14 @@ def _start_year(raw_df: pd.DataFrame) -> pd.Series: return pd.to_numeric(year, errors="coerce") -@click.command() -@click.argument("input_path", type=click.Path(dir_okay=False)) -@click.option("-o", "output_path", type=click.Path(dir_okay=False), required=True) -@click.option("-t", "technology_mapping", type=str, required=True) -def main(input_path: str, output_path: str, technology_mapping: str): - """Saves a standardised and validated version of the WEMI dataset.""" +def prepare_wemi(wemi_path: str, tech_map: dict, crs: str) -> gpd.GeoDataFrame: + """Standardised and validated version of the WEMI dataset.""" raw_df = pd.read_excel( - input_path, + wemi_path, sheet_name="Windfarms", skiprows=[1], na_values=["#ND"], # codespell:ignore ) - tech_map = yaml.safe_load(technology_mapping) # Cleanup columns with problematic empty values. raw_df = raw_df.dropna(subset=["Total power", "Latitude", "Longitude"]) @@ -69,13 +68,22 @@ def main(input_path: str, output_path: str, technology_mapping: str): "start_year": start_year, "end_year": np.nan, "status": raw_df["Status"].replace(STATUS_MAPPING), - "geometry": get_point_col(raw_df, "Longitude", "Latitude", crs=WEMI_CRS), + "geometry": get_point_col(raw_df, "Longitude", "Latitude", crs=crs), }, - crs=WEMI_CRS, + crs=crs, ) schema = _schemas.build_schema(tech_map, "prepare") - schema.validate(processed_df).to_parquet(output_path) + return schema.validate(processed_df) + + +def main(): + """Saves a standardised and validated version of the WEMI dataset.""" + wemi_gdf = prepare_wemi( + snakemake.input.wemi, snakemake.params.tech_map, snakemake.params.geo_crs + ) + wemi_gdf.to_parquet(snakemake.output.path) if __name__ == "__main__": + sys.stderr = open(snakemake.log[0], "w") main() diff --git a/workflow/scripts/proxy.py b/workflow/scripts/proxy.py index 4c151ff..41b62b5 100644 --- a/workflow/scripts/proxy.py +++ b/workflow/scripts/proxy.py @@ -12,18 +12,11 @@ import rioxarray as rxr from cmap import Colormap from matplotlib import pyplot as plt -from xarray import DataArray if TYPE_CHECKING: snakemake: Any -def _check_crs(raster: DataArray): - crs = raster.rio.crs - if crs is None or not crs.is_valid: - raise ValueError(f"The provided raster has an invalid CRS: {crs}.") - - def _get_borders_gdf(shapes_file: str) -> gpd.GeoDataFrame: """Constructs country borders by removing marine regions and dissolving land regions. @@ -69,7 +62,6 @@ def proxy_rooftop_pv_capacity( borders_df = _get_borders_gdf(shapes_file).set_index("country_id") stats_df = pd.read_parquet(stats_file) area_potential_da = rxr.open_rasterio(proxy_file).squeeze() # type: ignore[union-attr] - _check_crs(area_potential_da) agg_unadjusted_df = pd.read_parquet(aggregated_unadjusted_file) unadj_cap_mw = agg_unadjusted_df.groupby("country_id")["output_capacity_mw"].sum() @@ -107,7 +99,6 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ shapes_gdf = gpd.read_parquet(shapes_file) area_potential_da = rxr.open_rasterio(proxy_file).squeeze() # type: ignore[union-attr] - _check_crs(area_potential_da) # Compute a coarsening factor to avoid memory limits nx, ny = area_potential_da.sizes["x"], area_potential_da.sizes["y"] @@ -117,7 +108,7 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ # Coarsen the proxy data coarse = area_potential_da.coarsen(x=factor, y=factor, boundary="trim").mean() - fig, ax = plt.subplots(figsize=(6, 6), dpi=300, rasterized=True) + fig, ax = plt.subplots(figsize=(6, 6), dpi=300) coarse.plot.imshow( ax=ax, cmap=Colormap("seaborn:rocket").to_matplotlib(), @@ -125,11 +116,10 @@ def plot(proxy_file: str, shapes_file: str, output_file: str, pixels: int = 500_ cbar_kwargs={"location": "right", "label": "Proxied potential"}, alpha=1, ) + # project to the raster's CRS for speed shapes_gdf.to_crs(area_potential_da.rio.crs).geometry.boundary.plot( ax=ax, color="lightgrey", linewidth=0.3, alpha=0.5 ) - ax.set_xlabel("Longitude") - ax.set_ylabel("Latitude") ax.set_title(f"Aggregation proxy (coarsened ~{pixel_count:.1e} pixels)") fig.savefig(output_file, bbox_inches="tight")