diff --git a/src/muse/constraints.py b/src/muse/constraints.py index be0dcbc3..5c2aa687 100644 --- a/src/muse/constraints.py +++ b/src/muse/constraints.py @@ -94,7 +94,6 @@ def constraints( from __future__ import annotations from collections.abc import Mapping, MutableMapping, Sequence -from dataclasses import dataclass from enum import Enum, auto from typing import ( Any, @@ -110,7 +109,7 @@ def constraints( from mypy_extensions import KwArg from muse.registration import registrator -from muse.timeslices import broadcast_timeslice, distribute_timeslice, drop_timeslice +from muse.timeslices import broadcast_timeslice, distribute_timeslice CAPACITY_DIMS = "asset", "replacement", "region" """Default dimensions for capacity decision variables.""" @@ -752,479 +751,3 @@ def minimum_service( dict(capacity=-capacity, production=production, b=b), attrs=dict(kind=ConstraintKind.LOWER_BOUND), ) - - -def lp_costs( - capacity_costs: xr.DataArray, - commodities: list[str], - timeslice_level: str | None = None, -) -> xr.Dataset: - """Creates dataset of costs for solving with scipy's LP solver. - - Importantly, this also defines the decision variables in the linear program. - - The costs applied to the capacity decision variables are provided. This should - have dimensions "asset" and "replacement". In other words, capacity addition - is solved for each replacement technology for each existing asset. - - No cost is applied to the production decision variables. Thus, the production - component of the resulting dataset is zero, with dimensions determining the - production decision variables. This will have dimensions "asset", "replacement", - "commodity", and "timeslice". In other words, production is solved for each - replacement technology for each existing asset, for each commodity, and for each - timeslice. - - Args: - capacity_costs: DataArray with dimensions "asset" and "replacement" defining the - costs of adding capacity to the system. - commodities: List of commodities to create production decision variables for. - timeslice_level: The timeslice level of the linear problem. - """ - assert set(capacity_costs.dims) == {"asset", "replacement"} - - # Start with capacity costs as template (defines "asset" and "replacement" dims) - production_costs = xr.zeros_like(capacity_costs) - - # Add a "timeslice" dimension, convert multiindex to single index - production_costs = broadcast_timeslice(production_costs, level=timeslice_level) - production_costs = drop_timeslice(production_costs) - production_costs["timeslice"] = pd.Index( - production_costs.get_index("timeslice"), tupleize_cols=False - ) - - # Add a "commodity" dimension - production_costs = production_costs.expand_dims(commodity=commodities) - assert set(production_costs.dims) == { - "asset", - "replacement", - "commodity", - "timeslice", - } - - # Result is dataset of provided capacity costs and zero production costs - return xr.Dataset(dict(capacity=capacity_costs, production=production_costs)) - - -def lp_constraint(constraint: Constraint, lpcosts: xr.Dataset) -> Constraint: - """Transforms the constraint to LP data. - - The goal is to create from ``lpcosts.capacity``, ``constraint.capacity``, and - ``constraint.b`` a 2d-matrix ``constraint`` vs ``decision variables``. - - #. The dimensions of ``constraint.b`` are the constraint dimensions. They are - renamed ``"c(xxx)"``. - #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are - renamed ``"d(xxx)"``. - #. ``set(b.dims).intersection(lpcosts.xxx.dims)`` are diagonal - in constraint dimensions and decision variables dimension, with ``xxx`` the - capacity or the production - #. ``set(constraint.xxx.dims) - set(lpcosts.xxx.dims) - set(b.dims)`` are reduced by - summation, with ``xxx`` the capacity or the production - #. ``set(lpcosts.xxx.dims) - set(constraint.xxx.dims) - set(b.dims)`` are added for - expansion, with ``xxx`` the capacity or the production - - See :py:func:`muse.constraints.lp_constraint_matrix` for a more detailed explanation - of the transformations applied here. - """ - constraint = constraint.copy(deep=False) - - # Deal with timeslice multiindex - if "timeslice" in constraint.dims: - constraint = drop_timeslice(constraint) - constraint["timeslice"] = pd.Index( - constraint.get_index("timeslice"), tupleize_cols=False - ) - - # Rename dimensions in b - b = constraint.b.drop_vars(set(constraint.b.coords) - set(constraint.b.dims)) - b = b.rename({k: f"c({k})" for k in b.dims}) - - # Create capacity constraint matrix - capacity = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) - capacity = capacity.drop_vars(set(capacity.coords) - set(capacity.dims)) - - # Create production constraint matrix - production = lp_constraint_matrix( - constraint.b, constraint.production, lpcosts.production - ) - production = production.drop_vars(set(production.coords) - set(production.dims)) - - # Combine data - result = xr.Dataset( - {"b": b, "capacity": capacity, "production": production}, attrs=constraint.attrs - ) - return result - - -def lp_constraint_matrix( - b: xr.DataArray, constraint: xr.DataArray, lpcosts: xr.DataArray -): - """Transforms one constraint block into an lp matrix. - - The goal is to create from ``lpcosts``, ``constraint``, and ``b`` a 2d-matrix of - constraints vs decision variables. - - #. The dimensions of ``b`` are the constraint dimensions. They are renamed - ``"c(xxx)"``. - #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are - renamed ``"d(xxx)"``. - #. ``set(b.dims).intersection(lpcosts.dims)`` are diagonal - in constraint dimensions and decision variables dimension - #. ``set(constraint.dims) - set(lpcosts.dims) - set(b.dims)`` are reduced by - summation - #. ``set(lpcosts.dims) - set(constraint.dims) - set(b.dims)`` are added for - expansion - #. ``set(b.dims) - set(constraint.dims) - set(lpcosts.dims)`` are added for - expansion. Such dimensions only make sense if they consist of one point. - - The result is the constraint matrix, expanded, reduced and diagonalized for the - conditions above. - - """ - from functools import reduce - - from numpy import eye - - # Sum over all dimensions that are not in the constraint or the decision variables - result = constraint.sum(set(constraint.dims) - set(lpcosts.dims) - set(b.dims)) - - # Rename dimensions for decision variables - result = result.rename( - {k: f"d({k})" for k in set(result.dims).intersection(lpcosts.dims)} - ) - - # Rename dimensions for constraints - result = result.rename( - {k: f"c({k})" for k in set(result.dims).intersection(b.dims)} - ) - - # Expand dimensions that are in the decision variables but not in the constraint - expand = set(lpcosts.dims) - set(constraint.dims) - set(b.dims) - result = result.expand_dims( - {f"d({k})": lpcosts[k].rename({k: f"d({k})"}).set_index() for k in expand} - ) - - # Expand dimensions that are in the constraint but not in the decision variables - expand = set(b.dims) - set(constraint.dims) - set(lpcosts.dims) - result = result.expand_dims( - {f"c({k})": b[k].rename({k: f"c({k})"}).set_index() for k in expand} - ) - - # Dimensions that are in both the decision variables and the constraint - diag_dims = set(b.dims).intersection(lpcosts.dims) - diag_dims = sorted(diag_dims) - - if diag_dims: - - def get_dimension(dim): - if dim in b.dims: - return b[dim].values - if dim in lpcosts.dims: - return lpcosts[dim].values - return constraint[dim].values - - diagonal_submats = [ - xr.DataArray( - eye(len(b[k])), - coords={f"c({k})": get_dimension(k), f"d({k})": get_dimension(k)}, - dims=(f"c({k})", f"d({k})"), - ) - for k in diag_dims - ] - reduced = reduce(xr.DataArray.__mul__, diagonal_submats) - if "d(timeslice)" in reduced.dims: - reduced = reduced.drop_vars("d(timeslice)") - result = result * reduced - - return result - - -@dataclass -class ScipyAdapter: - """Creates the input for the scipy solvers. - - This adapter is required to convert data (costs and constraints) from a series of - dataarrays to a format that can be used by scipy's linear programming solver. - - The scipy solver requires the following inputs as a set of 1D or 2D numpy arrays: - c: Coefficients of the linear objective function to be minimized. This is a 1D - vector, each element representing the cost of a decision variable. - A_ub: The inequality constraint matrix. This is a 2D matrix containing - coefficients of the linear inequality constraints, with constraints as rows - and decision variables as columns. - b_ub: The inequality constraint vector. This is a 1D vector, each element - representing an upper bound on the corresponding constraint in A_ub. - A_eq: The equality constraint matrix. In practice, since all of the constraints - currently implemented are inequality constraints, this will always be zeros. - However, this may be changed in the future. - b_eq: The equality constraint vector. As above, this will currently always be - zeros. - - See the documentation for `scipy.optimize.linprog` for more details: - https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linprog.html - - This class takes in costs and constraints as xarray dataarrays, and performs the - necessary conversions to create the above inputs. These are saved together in the - `kwargs` property, which is passed to the scipy solver. This has an additional - attribute `bounds`, which is a tuple of the lower and upper bounds for the - decision variables. In practice, we will always use (0, np.inf) as the bounds. - - The output of the scipy solver is a 1D array of decision variables, which must then - be converted back into a format that MUSE can understand. To aid this, we - save templates for the capacity and production decision variables, which are used - to convert the output back into a labelled xarray dataset using the helper function - `to_muse`. - """ - - c: np.ndarray - to_muse: Callable[[np.ndarray], xr.Dataset] - bounds: tuple[float | None, float | None] = (0, np.inf) - A_ub: np.ndarray | None = None - b_ub: np.ndarray | None = None - A_eq: np.ndarray | None = None - b_eq: np.ndarray | None = None - - @classmethod - def factory( - cls, - costs: xr.DataArray, - constraints: list[Constraint], - commodities: list[str], - timeslice_level: str | None = None, - ) -> ScipyAdapter: - # Calculate costs for the linear problem - lpcosts = lp_costs( - capacity_costs=costs, - commodities=commodities, - timeslice_level=timeslice_level, - ) - - # Create dataset from costs and constraints - data = cls._unified_dataset(lpcosts, *constraints) - - # Get capacity constraint matrix / costs - capacities = cls._selected_quantity(data, "capacity") - - # Get production constraint matrix / costs - productions = cls._selected_quantity(data, "production") - - # Get constraint vector - bs = cls._selected_quantity(data, "b") - - # Prepare scipy adapter from constraints - kwargs = cls._to_scipy_adapter(capacities, productions, bs, *constraints) - - def to_muse(x: np.ndarray) -> xr.Dataset: - return ScipyAdapter._back_to_muse( - x, - capacity_template=capacities.costs, - production_template=productions.costs, - ) - - return ScipyAdapter(to_muse=to_muse, **kwargs) - - @property - def kwargs(self): - return { - "c": self.c, - "A_eq": self.A_eq, - "b_eq": self.b_eq, - "A_ub": self.A_ub, - "b_ub": self.b_ub, - "bounds": self.bounds, - } - - @staticmethod - def _unified_dataset(lpcosts: xr.Dataset, *constraints: Constraint) -> xr.Dataset: - """Creates single xr.Dataset from costs and constraints.""" - from xarray import merge - - # Reformat constraints to lp format - lp_constraints = [ - lp_constraint(constraint, lpcosts) for constraint in constraints - ] - - # Rename variables in lp constraints - lp_constraints = [ - constraint.rename( - b=f"b{i}", capacity=f"capacity{i}", production=f"production{i}" - ) - for i, constraint in enumerate(lp_constraints) - ] - - # Rename dimensions in lpcosts - lpcosts = lpcosts.rename({k: f"d({k})" for k in lpcosts.dims}) - - # Merge data - data = merge([lpcosts, *lp_constraints]) - - # An adjustment is required for lower bound constraints - for i, constraint in enumerate(constraints): - if constraint.kind == ConstraintKind.LOWER_BOUND: - data[f"b{i}"] = -data[f"b{i}"] - data[f"capacity{i}"] = -data[f"capacity{i}"] - data[f"production{i}"] = -data[f"production{i}"] - - # Enusure consistent ordering of dimensions - return data.transpose(*data.dims) - - @staticmethod - def _selected_quantity(data: xr.Dataset, name: str) -> xr.Dataset: - # Select data for the specified quantity ("capacity", "production", or "b") - result = cast( - xr.Dataset, data[[u for u in data.data_vars if str(u).startswith(name)]] - ) - - # Rename variables ("costs" for the costs variable, 0/1/2 etc. for constraints) - return result.rename( - { - k: ("costs" if k == name else int(str(k).replace(name, ""))) - for k in result.data_vars - } - ) - - @staticmethod - def _to_scipy_adapter( - capacities: xr.Dataset, productions: xr.Dataset, bs: xr.Dataset, *constraints - ): - """Converts constraints to scipy format. - - The constraints are converted to a format that can be used by scipy's linear - programming solver. The constraints are converted to a 2D matrix of constraints - vs decision variables. The constraints are then converted to a dictionary that - can be used by scipy's linear programming solver. - - Args: - capacities: Dataset with decision variables for capacity constraints. - productions: Dataset with decision variables for production constraints. - bs: Dataset with constraints. - *constraints: List of constraints. - - Returns: - Dictionary with constraints in a format that can be used by scipy's linear - programming solver. - """ - - def reshape(matrix: xr.DataArray) -> np.ndarray: - """Convert constraints matrix to a 2D np array. - - The rows of the constraaints matrix will represent the constraints, and the - columns will represent the decision variables. - """ - # Before building LP we need to sort dimensions for consistency - if list(matrix.dims) != sorted(matrix.dims): - matrix = matrix.transpose(*sorted(matrix.dims)) - - # Size of the first dimension - # This dimension represents the number of constraints - size = np.prod( - [matrix[u].shape[0] for u in matrix.dims if str(u).startswith("c")] - ) - - # Reshape into a 2D array: N constraints x N decision variables - return matrix.values.reshape((size, -1)) - - def extract_bA(constraints, *kinds: ConstraintKind): - """Extracts A and b for constraints of specified kinds. - - These will end up as A_ub and b_ub for inequality constraints, and - A_eq and b_eq for equality constraints (see ScipyAdapter). - """ - # Get indices of constraints of the specified kind - indices = [i for i in range(len(bs)) if constraints[i].kind in kinds] - - # Convert constraints matrices to 2d np arrays - capa_constraints = [reshape(capacities[i]) for i in indices] - prod_constraints = [reshape(productions[i]) for i in indices] - - # Convert constraints vectors to 1d - constraints_vectors = [ - bs[i].stack(constraint=sorted(bs[i].dims)) for i in indices - ] - - # Concatenate constraints - if capa_constraints: - A = np.concatenate( - ( - np.concatenate(capa_constraints, axis=0), - np.concatenate(prod_constraints, axis=0), - ), - axis=1, - ) - b = np.concatenate(constraints_vectors, axis=0) - else: - # If there are no constraints of the given kind, return None - A = None - b = None - return A, b - - # Create costs vector by concatenating capacity and production costs - c = np.concatenate( - ( - capacities["costs"].values.flatten(), - productions["costs"].values.flatten(), - ), - axis=0, - ) - - # Extract A and b for inequality constraints - A_ub, b_ub = extract_bA( - constraints, ConstraintKind.UPPER_BOUND, ConstraintKind.LOWER_BOUND - ) - - # Extract A and b for equality constraints - A_eq, b_eq = extract_bA(constraints, ConstraintKind.EQUALITY) - - # Prepare scipy adapter - return { - "c": c, - "A_ub": A_ub, - "b_ub": b_ub, - "A_eq": A_eq, - "b_eq": b_eq, - "bounds": (0, np.inf), - } - - @staticmethod - def _back_to_muse_quantity( - x: np.ndarray, template: xr.DataArray | xr.Dataset - ) -> xr.DataArray: - """Convert a vector of decision variables to a DataArray. - - Args: - x: 1D vector of decision variables, outputted from the scipy solver. - template: Template for the decision variables. This may be for either - capacity or production variables. - """ - # First create a multidimensional dataarray based on the template - result = xr.DataArray( - x.reshape(template.shape), coords=template.coords, dims=template.dims - ) - - # Then rename the dimensions (e.g. "d(asset)" -> "asset") - return result.rename({k: str(k)[2:-1] for k in result.dims}) - - @staticmethod - def _back_to_muse( - x: np.ndarray, - capacity_template: xr.DataArray, - production_template: xr.DataArray, - ) -> xr.Dataset: - """Convert the full set of decision variables to a Dataset. - - This must have capacity variables first, followed by production variables. - - Args: - x: 1D vector of decision variables, outputted from the scipy solver. - capacity_template: Template for the capacity decision variables. - production_template: Template for the production decision variables. - """ - n_capa = capacity_template.size # number of capacity decision variables - - capa = ScipyAdapter._back_to_muse_quantity( - x[:n_capa], template=capacity_template - ) - prod = ScipyAdapter._back_to_muse_quantity( - x[n_capa:], template=production_template - ) - return xr.Dataset({"capacity": capa, "production": prod}) diff --git a/src/muse/investments.py b/src/muse/investments.py index f5467d91..8a514936 100644 --- a/src/muse/investments.py +++ b/src/muse/investments.py @@ -288,7 +288,7 @@ def scipy_match_demand( from scipy.optimize import linprog - from muse.constraints import ScipyAdapter + from muse.lp_adapter import ScipyAdapter assert "year" not in technologies.dims @@ -296,8 +296,8 @@ def scipy_match_demand( costs = timeslice_max(costs) # Run scipy optimization with highs solver - adapter = ScipyAdapter.factory( - costs=costs, + adapter = ScipyAdapter.from_muse_data( + capacity_costs=costs, constraints=constraints, commodities=commodities, timeslice_level=timeslice_level, diff --git a/src/muse/lp_adapter.py b/src/muse/lp_adapter.py new file mode 100644 index 00000000..5430f368 --- /dev/null +++ b/src/muse/lp_adapter.py @@ -0,0 +1,416 @@ +"""Utilities for adapting MUSE data structures to linear programming solvers. + +This module provides utilities to convert MUSE's xarray-based data structures to and +from the format required by scipy's linear programming solver. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np +import pandas as pd +import xarray as xr + +from muse.constraints import ConstraintKind +from muse.timeslices import broadcast_timeslice, drop_timeslice + + +def unified_dataset(lpcosts: xr.Dataset, *constraints) -> xr.Dataset: + """Creates single xr.Dataset from costs and constraints.""" + from xarray import merge + + # Reformat constraints to lp format + lp_constraints = [lp_constraint(constraint, lpcosts) for constraint in constraints] + + # Rename variables in lp constraints + lp_constraints = [ + constraint.rename( + b=f"b{i}", capacity=f"capacity{i}", production=f"production{i}" + ) + for i, constraint in enumerate(lp_constraints) + ] + + # Rename dimensions in lpcosts + lpcosts = lpcosts.rename({k: f"d({k})" for k in lpcosts.dims}) + + # Merge data + data = merge([lpcosts, *lp_constraints]) + + # An adjustment is required for lower bound constraints + for i, constraint in enumerate(constraints): + if constraint.kind == ConstraintKind.LOWER_BOUND: + data[f"b{i}"] = -data[f"b{i}"] + data[f"capacity{i}"] = -data[f"capacity{i}"] + data[f"production{i}"] = -data[f"production{i}"] + + # Ensure consistent ordering of dimensions + return data.transpose(*data.dims) + + +def selected_quantity(data: xr.Dataset, name: str) -> xr.Dataset: + """Select and rename variables for a specific quantity.""" + # Select data for the specified quantity ("capacity", "production", or "b") + result = data[[u for u in data.data_vars if str(u).startswith(name)]] + + # Rename variables ("costs" for the costs variable, 0/1/2 etc. for constraints) + return result.rename( + { + k: ("costs" if k == name else int(str(k).replace(name, ""))) + for k in result.data_vars + } + ) + + +def reshape_constraint_matrix(matrix: xr.DataArray) -> np.ndarray: + """Convert constraints matrix to a 2D np array. + + The rows of the constraints matrix will represent the constraints, and the + columns will represent the decision variables. + """ + # Before building LP we need to sort dimensions for consistency + if list(matrix.dims) != sorted(matrix.dims): + matrix = matrix.transpose(*sorted(matrix.dims)) + + # Size of the first dimension + # This dimension represents the number of constraints + size = np.prod([matrix[u].shape[0] for u in matrix.dims if str(u).startswith("c")]) + + # Reshape into a 2D array: N constraints x N decision variables + return matrix.values.reshape((size, -1)) + + +def extract_constraint_matrices( + capacities: xr.Dataset, + productions: xr.Dataset, + bs: xr.Dataset, + constraints, + *kinds: ConstraintKind, +): + """Extracts A and b matrices for constraints of specified kinds. + + These will end up as A_ub and b_ub for inequality constraints, and A_eq and b_eq for + equality constraints (see ScipyAdapter). + """ + # Get indices of constraints of the specified kind + indices = [i for i in range(len(bs)) if constraints[i].kind in kinds] + + # Convert constraints matrices to 2d np arrays + capa_constraints = [reshape_constraint_matrix(capacities[i]) for i in indices] + prod_constraints = [reshape_constraint_matrix(productions[i]) for i in indices] + + # Convert constraints vectors to 1d + constraints_vectors = [bs[i].stack(constraint=sorted(bs[i].dims)) for i in indices] + + # Concatenate constraints + if capa_constraints: + A = np.concatenate( + ( + np.concatenate(capa_constraints, axis=0), + np.concatenate(prod_constraints, axis=0), + ), + axis=1, + ) + b = np.concatenate(constraints_vectors, axis=0) + else: + # If there are no constraints of the given kind, return None + A = None + b = None + return A, b + + +def back_to_muse_quantity(x: np.ndarray, template: xr.DataArray) -> xr.DataArray: + """Convert a vector of decision variables to a DataArray. + + Args: + x: 1D vector of decision variables, outputted from the scipy solver. + template: Template for the decision variables. This may be for either + capacity or production variables. + """ + # First create a multidimensional dataarray based on the template + result = xr.DataArray( + x.reshape(template.shape), coords=template.coords, dims=template.dims + ) + + # Then rename the dimensions (e.g. "d(asset)" -> "asset") + return result.rename({k: str(k)[2:-1] for k in result.dims}) + + +@dataclass +class ScipyAdapter: + """Adapts MUSE data structures to scipy's linear programming solver format. + + This adapter converts data (costs and constraints) from xarray DataArrays to + the format required by scipy's linear programming solver, and back. + """ + + c: np.ndarray + capacity_template: xr.DataArray + production_template: xr.DataArray + bounds: tuple[float | None, float | None] = (0, np.inf) + A_ub: np.ndarray | None = None + b_ub: np.ndarray | None = None + A_eq: np.ndarray | None = None + b_eq: np.ndarray | None = None + + @classmethod + def from_muse_data( + cls, + capacity_costs: xr.DataArray, + constraints: list, + commodities: list[str], + timeslice_level: str | None = None, + ) -> ScipyAdapter: + """Creates a ScipyAdapter from MUSE data structures.""" + # Calculate costs for the linear problem + lpcosts = lp_costs(capacity_costs, commodities, timeslice_level) + + # Create dataset from costs and constraints + data = unified_dataset(lpcosts, *constraints) + + # Get capacity constraint matrix / costs + capacities = selected_quantity(data, "capacity") + + # Get production constraint matrix / costs + productions = selected_quantity(data, "production") + + # Get constraint vector + bs = selected_quantity(data, "b") + + # Create costs vector by concatenating capacity and production costs + c = np.concatenate( + ( + capacities["costs"].values.flatten(), + productions["costs"].values.flatten(), + ), + axis=0, + ) + + # Extract A and b for inequality constraints + A_ub, b_ub = extract_constraint_matrices( + capacities, + productions, + bs, + constraints, + ConstraintKind.UPPER_BOUND, + ConstraintKind.LOWER_BOUND, + ) + + # Extract A and b for equality constraints + A_eq, b_eq = extract_constraint_matrices( + capacities, productions, bs, constraints, ConstraintKind.EQUALITY + ) + + return cls( + c=c, + A_ub=A_ub, + b_ub=b_ub, + A_eq=A_eq, + b_eq=b_eq, + capacity_template=capacities["costs"], + production_template=productions["costs"], + ) + + def to_muse(self, x: np.ndarray) -> xr.Dataset: + """Convert scipy solver output back to MUSE format.""" + n_capa = self.capacity_template.size + capa = back_to_muse_quantity(x[:n_capa], self.capacity_template) + prod = back_to_muse_quantity(x[n_capa:], self.production_template) + return xr.Dataset({"capacity": capa, "production": prod}) + + @property + def kwargs(self): + """Dictionary of kwargs for scipy.optimize.linprog.""" + return { + "c": self.c, + "A_eq": self.A_eq, + "b_eq": self.b_eq, + "A_ub": self.A_ub, + "b_ub": self.b_ub, + "bounds": self.bounds, + } + + +def lp_costs( + capacity_costs: xr.DataArray, + commodities: list[str], + timeslice_level: str | None = None, +) -> xr.Dataset: + """Creates dataset of costs for solving with scipy's LP solver. + + Importantly, this also defines the decision variables in the linear program. + + The costs applied to the capacity decision variables are provided. This should + have dimensions "asset" and "replacement". In other words, capacity addition + is solved for each replacement technology for each existing asset. + + No cost is applied to the production decision variables. Thus, the production + component of the resulting dataset is zero, with dimensions determining the + production decision variables. This will have dimensions "asset", "replacement", + "commodity", and "timeslice". In other words, production is solved for each + replacement technology for each existing asset, for each commodity, and for each + timeslice. + + Args: + capacity_costs: DataArray with dimensions "asset" and "replacement" defining the + costs of adding capacity to the system. + commodities: List of commodities to create production decision variables for. + timeslice_level: The timeslice level of the linear problem. + """ + assert set(capacity_costs.dims) == {"asset", "replacement"} + + # Start with capacity costs as template (defines "asset" and "replacement" dims) + production_costs = xr.zeros_like(capacity_costs) + + # Add a "timeslice" dimension, convert multiindex to single index + production_costs = broadcast_timeslice(production_costs, level=timeslice_level) + production_costs = drop_timeslice(production_costs) + production_costs["timeslice"] = pd.Index( + production_costs.get_index("timeslice"), tupleize_cols=False + ) + + # Add a "commodity" dimension + production_costs = production_costs.expand_dims(commodity=commodities) + assert set(production_costs.dims) == { + "asset", + "replacement", + "commodity", + "timeslice", + } + + # Result is dataset of provided capacity costs and zero production costs + return xr.Dataset(dict(capacity=capacity_costs, production=production_costs)) + + +def lp_constraint(constraint, lpcosts: xr.Dataset) -> xr.Dataset: + """Transforms the constraint to LP data. + + The goal is to create from ``lpcosts.capacity``, ``constraint.capacity``, and + ``constraint.b`` a 2d-matrix ``constraint`` vs ``decision variables``. + + #. The dimensions of ``constraint.b`` are the constraint dimensions. They are + renamed ``"c(xxx)"``. + #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are + renamed ``"d(xxx)"``. + #. ``set(b.dims).intersection(lpcosts.xxx.dims)`` are diagonal + in constraint dimensions and decision variables dimension, with ``xxx`` the + capacity or the production + #. ``set(constraint.xxx.dims) - set(lpcosts.xxx.dims) - set(b.dims)`` are reduced by + summation, with ``xxx`` the capacity or the production + #. ``set(lpcosts.xxx.dims) - set(constraint.xxx.dims) - set(b.dims)`` are added for + expansion, with ``xxx`` the capacity or the production + + See :py:func:`muse.lp_adapter.lp_constraint_matrix` for a more detailed explanation + of the transformations applied here. + + """ + constraint = constraint.copy(deep=False) + + # Deal with timeslice multiindex + if "timeslice" in constraint.dims: + constraint = drop_timeslice(constraint) + constraint["timeslice"] = pd.Index( + constraint.get_index("timeslice"), tupleize_cols=False + ) + + # Rename dimensions in b + b = constraint.b.drop_vars(set(constraint.b.coords) - set(constraint.b.dims)) + b = b.rename({k: f"c({k})" for k in b.dims}) + + # Create capacity constraint matrix + capacity = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) + capacity = capacity.drop_vars(set(capacity.coords) - set(capacity.dims)) + + # Create production constraint matrix + production = lp_constraint_matrix( + constraint.b, constraint.production, lpcosts.production + ) + production = production.drop_vars(set(production.coords) - set(production.dims)) + + # Combine data + result = xr.Dataset( + {"b": b, "capacity": capacity, "production": production}, attrs=constraint.attrs + ) + return result + + +def lp_constraint_matrix( + b: xr.DataArray, constraint: xr.DataArray, lpcosts: xr.DataArray +): + """Transforms one constraint block into an lp matrix. + + The goal is to create from ``lpcosts``, ``constraint``, and ``b`` a 2d-matrix of + constraints vs decision variables. + + #. The dimensions of ``b`` are the constraint dimensions. They are renamed + ``"c(xxx)"``. + #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are + renamed ``"d(xxx)"``. + #. ``set(b.dims).intersection(lpcosts.dims)`` are diagonal + in constraint dimensions and decision variables dimension + #. ``set(constraint.dims) - set(lpcosts.dims) - set(b.dims)`` are reduced by + summation + #. ``set(lpcosts.dims) - set(constraint.dims) - set(b.dims)`` are added for + expansion + #. ``set(b.dims) - set(constraint.dims) - set(lpcosts.dims)`` are added for + expansion. Such dimensions only make sense if they consist of one point. + + The result is the constraint matrix, expanded, reduced and diagonalized for the + conditions above. + """ + from functools import reduce + + from numpy import eye + + # Sum over all dimensions that are not in the constraint or the decision variables + result = constraint.sum(set(constraint.dims) - set(lpcosts.dims) - set(b.dims)) + + # Rename dimensions for decision variables + result = result.rename( + {k: f"d({k})" for k in set(result.dims).intersection(lpcosts.dims)} + ) + + # Rename dimensions for constraints + result = result.rename( + {k: f"c({k})" for k in set(result.dims).intersection(b.dims)} + ) + + # Expand dimensions that are in the decision variables but not in the constraint + expand = set(lpcosts.dims) - set(constraint.dims) - set(b.dims) + result = result.expand_dims( + {f"d({k})": lpcosts[k].rename({k: f"d({k})"}).set_index() for k in expand} + ) + + # Expand dimensions that are in the constraint but not in the decision variables + expand = set(b.dims) - set(constraint.dims) - set(lpcosts.dims) + result = result.expand_dims( + {f"c({k})": b[k].rename({k: f"c({k})"}).set_index() for k in expand} + ) + + # Dimensions that are in both the decision variables and the constraint + diag_dims = set(b.dims).intersection(lpcosts.dims) + diag_dims = sorted(diag_dims) + + if diag_dims: + + def get_dimension(dim): + if dim in b.dims: + return b[dim].values + if dim in lpcosts.dims: + return lpcosts[dim].values + return constraint[dim].values + + diagonal_submats = [ + xr.DataArray( + eye(len(b[k])), + coords={f"c({k})": get_dimension(k), f"d({k})": get_dimension(k)}, + dims=(f"c({k})", f"d({k})"), + ) + for k in diag_dims + ] + reduced = reduce(xr.DataArray.__mul__, diagonal_submats) + if "d(timeslice)" in reduced.dims: + reduced = reduced.drop_vars("d(timeslice)") + result = result * reduced + + return result diff --git a/tests/test_constraints.py b/tests/test_constraints.py index a55c232a..42bfb934 100644 --- a/tests/test_constraints.py +++ b/tests/test_constraints.py @@ -5,64 +5,61 @@ import xarray as xr from pytest import approx, fixture, raises +from muse import examples +from muse.constraints import ( + demand, + demand_limiting_capacity, + max_capacity_expansion, + max_production, + minimum_service, +) +from muse.lp_adapter import ( + ScipyAdapter, + back_to_muse_quantity, + lp_constraint_matrix, + selected_quantity, + unified_dataset, +) from muse.timeslices import drop_timeslice -from muse.utilities import interpolate_capacity, reduce_assets +from muse.utilities import broadcast_over_assets, interpolate_capacity, reduce_assets CURRENT_YEAR = 2020 INVESTMENT_YEAR = 2025 @fixture -def model(): - return "medium" - - -@fixture -def residential(model): - from muse import examples - - return examples.sector("residential", model=model) - - -@fixture -def technologies(residential): - return residential.technologies.squeeze("region").sel(year=INVESTMENT_YEAR) - - -@fixture -def search_space(model, assets): - from muse import examples - - space = examples.search_space("residential", model) - return space.sel(asset=assets.technology.values) - - -@fixture -def costs(search_space): - shape = search_space.shape - return search_space * np.arange(np.prod(shape)).reshape(shape) +def model_data(): + """Model data required for the constraints. + + Returns: + dict: Contains all necessary data for building constraints: + - technologies: Technology data for a single year + - capacity: Capacity data for assets in the current and investment year + - demand: Demand for the investment year + - search_space: Search space for the assets + """ + from muse.quantities import maximum_production + # Load residential sector data + residential = examples.sector("residential", model="medium") -@fixture -def assets(residential): - return next(a.assets for a in residential.agents) + # Extract technologies and assets + technologies = residential.technologies.squeeze("region").sel(year=INVESTMENT_YEAR) + assets = next(a.assets for a in residential.agents) + # Add minimum service factor data to allow calculation of the constraint + technologies["minimum_service_factor"] = 0.1 * xr.ones_like( + technologies.technology, dtype=float + ) -@fixture -def capacity(assets): - return interpolate_capacity( + # Calculate capacity + capacity = interpolate_capacity( reduce_assets(assets.capacity, coords=("technology", "region")), year=[CURRENT_YEAR, INVESTMENT_YEAR], ) - -@fixture -def market_demand(assets, technologies): - from muse.quantities import maximum_production - from muse.utilities import broadcast_over_assets - - # Set demand just below the maximum production of existing assets - res = 0.8 * maximum_production( + # Create initial market demand as 80% of maximum production + market_demand = 0.8 * maximum_production( broadcast_over_assets(technologies, assets), assets.capacity, ).sel(year=INVESTMENT_YEAR).groupby("technology").sum("asset").rename( @@ -70,115 +67,223 @@ def market_demand(assets, technologies): ) # Remove un-demanded commodities - res = res.sel(commodity=(res > 0).any(dim=["timeslice", "asset"])) - return res + market_demand = market_demand.sel( + commodity=(market_demand > 0).any(dim=["timeslice", "asset"]) + ) + # Create search space + search_space = examples.search_space("residential", "medium") + search_space = search_space.sel(asset=assets.technology.values) -@fixture -def commodities(market_demand): - return list(market_demand.commodity.values) + # Return dictionary of data + return { + "technologies": technologies, + "capacity": capacity, + "demand": market_demand, + "search_space": search_space, + } -def test_fixtures(technologies, search_space, costs, assets, capacity, market_demand): - assert set(technologies.dims) == {"technology", "commodity"} - assert set(search_space.dims) == {"asset", "replacement"} - assert set(costs.dims) == {"asset", "replacement"} - assert set(assets.dims) == {"asset", "year"} - assert set(capacity.dims) == {"asset", "year"} - assert set(market_demand.dims) == {"asset", "commodity", "timeslice"} +@fixture(params=["timeslice_as_list", "timeslice_as_multindex"]) +def constraints(request, model_data): + """Default set of constraints for testing.""" + constraints = { + "max_production": max_production(**model_data), + "demand": demand(**model_data), + "max_capacity_expansion": max_capacity_expansion(**model_data), + "demand_limiting_capacity": demand_limiting_capacity(**model_data), + "minimum_service": minimum_service(**model_data), + } + # Testing two different ways of handling timeslices + if request.param == "timeslice_as_multindex": + constraints = {key: _as_list(cs) for key, cs in constraints.items()} + return constraints -@fixture -def lpcosts(costs, commodities): - from muse.constraints import lp_costs - return lp_costs(costs, commodities=commodities) +def _as_list(data: Union[xr.DataArray, xr.Dataset]) -> Union[xr.DataArray, xr.Dataset]: + """Helper function to convert timeslice data to list format.""" + if "timeslice" in data.dims: + data = data.copy(deep=False) + index = pd.MultiIndex.from_tuples( + data.get_index("timeslice"), names=("month", "day", "hour") + ) + mindex_coords = xr.Coordinates.from_pandas_multiindex(index, "timeslice") + data = drop_timeslice(data).assign_coords(mindex_coords) + return data -@fixture -def max_production(market_demand, capacity, search_space, technologies): - from muse.constraints import max_production +def test_model_data(model_data): + """Validating that the model data has appropriate dimensions.""" + assert set(model_data["technologies"].dims) == {"technology", "commodity"} + assert set(model_data["search_space"].dims) == {"asset", "replacement"} + assert set(model_data["capacity"].dims) == {"asset", "year"} + assert set(model_data["demand"].dims) == { + "asset", + "commodity", + "timeslice", + } - return max_production(market_demand, capacity, search_space, technologies) +def test_constraints_dimensions(constraints): + """Test dimensions of all constraint matrices.""" + # Max production constraint + max_prod_dims = {"asset", "commodity", "replacement", "timeslice"} + assert set(constraints["max_production"].capacity.dims) == max_prod_dims + assert set(constraints["max_production"].production.dims) == max_prod_dims + assert set(constraints["max_production"].b.dims) == max_prod_dims -@fixture -def demand_constraint(market_demand, capacity, search_space, technologies): - from muse.constraints import demand + # Demand constraint + assert set(constraints["demand"].capacity.dims) == set() + assert set(constraints["demand"].production.dims) == set() + assert set(constraints["demand"].b.dims) == {"asset", "commodity", "timeslice"} - return demand(market_demand, capacity, search_space, technologies) + # Demand limiting capacity constraint + assert set(constraints["demand_limiting_capacity"].capacity.dims) == { + "asset", + "commodity", + "replacement", + } + assert set(constraints["demand_limiting_capacity"].production.dims) == set() + assert set(constraints["demand_limiting_capacity"].b.dims) == {"asset", "commodity"} + # Max capacity expansion constraint + assert set(constraints["max_capacity_expansion"].capacity.dims) == set() + assert set(constraints["max_capacity_expansion"].production.dims) == set() + assert set(constraints["max_capacity_expansion"].b.dims) == {"replacement"} -@fixture -def max_capacity_expansion(market_demand, capacity, search_space, technologies): - from muse.constraints import max_capacity_expansion + # Minimum service constraint + assert set(constraints["minimum_service"].capacity.dims) == max_prod_dims + assert set(constraints["minimum_service"].production.dims) == max_prod_dims + assert set(constraints["minimum_service"].b.dims) == max_prod_dims - return max_capacity_expansion(market_demand, capacity, search_space, technologies) +def test_max_capacity_expansion(constraints): + """Checking basic properties of the max capacity expansion constraint.""" + max_capacity_expansion = constraints["max_capacity_expansion"] + assert (max_capacity_expansion.capacity == 1).all() + assert max_capacity_expansion.production == 0 + assert max_capacity_expansion.b.dims == ("replacement",) + assert max_capacity_expansion.b.shape == (4,) + assert ( + max_capacity_expansion.replacement + == ["estove", "gasboiler", "gasstove", "heatpump"] + ).all() -@fixture -def demand_limiting_capacity(market_demand, capacity, search_space, technologies): - from muse.constraints import demand_limiting_capacity - return demand_limiting_capacity(market_demand, capacity, search_space, technologies) +def test_max_production(constraints): + """Checking basic properties of the max production constraint.""" + assert (constraints["max_production"].capacity <= 0).all() -@fixture -def constraint(max_production): - return max_production +def test_demand_limiting_capacity(constraints): + """Checking basic properties of the demand limiting capacity constraint.""" + demand_limiting_capacity = constraints["demand_limiting_capacity"] + max_production = constraints["max_production"] + demand_constraint = constraints["demand"] + # Test capacity values + expected_capacity = ( + -max_production.capacity.max("timeslice").values + if "timeslice" in max_production.capacity.dims + else -max_production.capacity.values + ) + assert demand_limiting_capacity.capacity.values == approx(expected_capacity) -@fixture(params=["timeslice_as_list", "timeslice_as_multindex"]) -def constraints( - request, - max_production, - demand_constraint, - demand_limiting_capacity, - max_capacity_expansion, -): - constraints = [ - max_production, - demand_limiting_capacity, - demand_constraint, - max_capacity_expansion, - ] - if request.param == "timeslice_as_multindex": - constraints = [_as_list(cs) for cs in constraints] - return constraints + # Test production and b values + assert demand_limiting_capacity.production == 0 + expected_b = ( + demand_constraint.b.max("timeslice").values + if "timeslice" in demand_constraint.b.dims + else demand_constraint.b.values + ) + assert demand_limiting_capacity.b.values == approx(expected_b) -def test_constraints_dimensions( - max_production, demand_constraint, demand_limiting_capacity, max_capacity_expansion -): - # Max production constraint - max_prod_dims = {"asset", "commodity", "replacement", "timeslice"} - assert set(max_production.capacity.dims) == max_prod_dims - assert set(max_production.production.dims) == max_prod_dims - assert set(max_production.b.dims) == max_prod_dims +def test_max_capacity_expansion_no_limits(model_data): + """Checking that the constraint is None when no limits are set.""" + technologies = model_data["technologies"].drop_vars( + ["max_capacity_addition", "max_capacity_growth", "total_capacity_limit"] + ) + assert ( + max_capacity_expansion(**{**model_data, "technologies": technologies}) is None + ) - # Demand constraint - assert set(demand_constraint.capacity.dims) == set() - assert set(demand_constraint.production.dims) == set() - assert set(demand_constraint.b.dims) == {"asset", "commodity", "timeslice"} - # Demand limiting capacity constraint - assert set(demand_limiting_capacity.capacity.dims) == { - "asset", - "commodity", - "replacement", +def test_max_capacity_expansion_infinite_limits(model_data): + """Checking that error is raised when infinite limits are set.""" + technologies = model_data["technologies"].copy() + for limit in [ + "max_capacity_addition", + "max_capacity_growth", + "total_capacity_limit", + ]: + technologies[limit] = np.inf + with raises(ValueError): + max_capacity_expansion(**{**model_data, "technologies": technologies}) + + +def test_max_capacity_expansion_seed(model_data): + """Sanity checks for the seed parameter of the max capacity expansion constraint.""" + seed = 10 + technologies = model_data["technologies"].copy() + technologies["growth_seed"] = seed + + # Test different capacity scenarios + scenarios = [0, seed, 2 * seed] + results = [] + for cap in scenarios: + capacity = model_data["capacity"].copy() + capacity.sel(year=2020)[:] = cap + results.append( + max_capacity_expansion( + **{ + **model_data, + "technologies": technologies, + "capacity": capacity, + } + ) + ) + + # Zero capacity should match seed capacity + assert results[0].b.values == approx(results[1].b.values) + # Higher capacity should differ + assert results[0].b.values != approx(results[2].b.values) + + +def test_no_minimum_service(model_data): + """Checking that the constraint is None when no minimum service factor is set.""" + technologies = model_data["technologies"].drop_vars("minimum_service_factor") + assert minimum_service(**{**model_data, "technologies": technologies}) is None + + +@fixture +def lp_inputs(model_data): + """Inputs to the lp adapter, in addition to the constraints.""" + # Make up capacity costs data + shape = model_data["search_space"].shape + costs = model_data["search_space"] * np.arange(np.prod(shape)).reshape(shape) + + # List of commodities + commodities = list(model_data["demand"].commodity.values) + + return { + "capacity_costs": costs, + "commodities": commodities, } - assert set(demand_limiting_capacity.production.dims) == set() - assert set(demand_limiting_capacity.b.dims) == {"asset", "commodity"} - # Max capacity expansion constraint - assert set(max_capacity_expansion.capacity.dims) == set() - assert set(max_capacity_expansion.production.dims) == set() - assert set(max_capacity_expansion.b.dims) == {"replacement"} + +@fixture +def lpcosts(lp_inputs): + """Benchmark lpcosts dataset to test against.""" + from muse.lp_adapter import lp_costs + + return lp_costs(**lp_inputs) -def test_lp_constraints_matrix_b_is_scalar(constraint, lpcosts): +def test_lp_constraints_matrix_b_is_scalar(lpcosts, constraints): """B is a scalar - output should be equivalent to a single row matrix.""" - from muse.constraints import lp_constraint_matrix + constraint = constraints["max_production"] for attr in ["capacity", "production"]: lpconstraint = lp_constraint_matrix( @@ -191,9 +296,9 @@ def test_lp_constraints_matrix_b_is_scalar(constraint, lpcosts): } -def test_max_production_constraint_diagonal(constraint, lpcosts): +def test_max_production_constraint_diagonal(lpcosts, constraints): """Test production side of max capacity production is diagonal.""" - from muse.constraints import lp_constraint_matrix + constraint = constraints["max_production"] # Test capacity constraints result = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) @@ -219,8 +324,10 @@ def test_max_production_constraint_diagonal(constraint, lpcosts): assert stacked.values == approx(np.eye(stacked.shape[0])) -def test_lp_constraint(constraint, lpcosts): - from muse.constraints import lp_constraint +def test_lp_constraint(lpcosts, constraints): + from muse.lp_adapter import lp_constraint + + constraint = constraints["max_production"] result = lp_constraint(constraint, lpcosts) constraint_dims = { @@ -244,13 +351,13 @@ def test_lp_constraint(constraint, lpcosts): assert result.b.values == approx(0) -def test_to_scipy_adapter_maxprod(costs, max_production, commodities, lpcosts): +def test_to_scipy_adapter_maxprod(lp_inputs, lpcosts, constraints): """Test scipy adapter with max production constraint.""" - from muse.constraints import ScipyAdapter - - adapter = ScipyAdapter.factory( - costs, constraints=[max_production], commodities=commodities + adapter = ScipyAdapter.from_muse_data( + **lp_inputs, + constraints=[constraints["max_production"]], ) + assert set(adapter.kwargs) == {"c", "A_ub", "b_ub", "A_eq", "b_eq", "bounds"} assert adapter.bounds == (0, np.inf) assert adapter.A_ub is not None @@ -280,13 +387,13 @@ def test_to_scipy_adapter_maxprod(costs, max_production, commodities, lpcosts): assert adapter.A_ub[:, capsize:] == approx(np.eye(prodsize)) -def test_to_scipy_adapter_demand(costs, demand_constraint, commodities, lpcosts): +def test_to_scipy_adapter_demand(lp_inputs, lpcosts, constraints): """Test scipy adapter with demand constraint.""" - from muse.constraints import ScipyAdapter - - adapter = ScipyAdapter.factory( - costs, constraints=[demand_constraint], commodities=commodities + adapter = ScipyAdapter.from_muse_data( + **lp_inputs, + constraints=[constraints["demand"]], ) + assert set(adapter.kwargs) == {"c", "A_ub", "b_ub", "A_eq", "b_eq", "bounds"} assert adapter.bounds == (0, np.inf) assert adapter.A_ub is not None @@ -310,15 +417,13 @@ def test_to_scipy_adapter_demand(costs, demand_constraint, commodities, lpcosts) assert set(adapter.A_ub[:, capsize:].flatten()) == {0.0, -1.0} -def test_to_scipy_adapter_max_capacity_expansion( - costs, max_capacity_expansion, commodities, lpcosts -): +def test_to_scipy_adapter_max_capacity_expansion(lp_inputs, lpcosts, constraints): """Test scipy adapter with max capacity expansion constraint.""" - from muse.constraints import ScipyAdapter - - adapter = ScipyAdapter.factory( - costs, constraints=[max_capacity_expansion], commodities=commodities + adapter = ScipyAdapter.from_muse_data( + **lp_inputs, + constraints=[constraints["max_capacity_expansion"]], ) + assert set(adapter.kwargs) == {"c", "A_ub", "b_ub", "A_eq", "b_eq", "bounds"} assert adapter.bounds == (0, np.inf) assert adapter.A_ub is not None @@ -342,10 +447,12 @@ def test_to_scipy_adapter_max_capacity_expansion( assert set(adapter.A_ub[:, :capsize].flatten()) == {0.0, 1.0} -def test_scipy_adapter_no_constraint(costs, commodities, lpcosts): - from muse.constraints import ScipyAdapter +def test_scipy_adapter_no_constraint(lp_inputs, lpcosts): + adapter = ScipyAdapter.from_muse_data( + **lp_inputs, + constraints=[], + ) - adapter = ScipyAdapter.factory(costs, constraints=[], commodities=commodities) assert set(adapter.kwargs) == {"c", "A_ub", "b_ub", "A_eq", "b_eq", "bounds"} assert adapter.bounds == (0, np.inf) assert all( @@ -356,38 +463,34 @@ def test_scipy_adapter_no_constraint(costs, commodities, lpcosts): def test_back_to_muse_quantities(lpcosts): - from muse.constraints import ScipyAdapter - - data = ScipyAdapter._unified_dataset(lpcosts) + data = unified_dataset(lpcosts) # Test capacity - lpquantity = ScipyAdapter._selected_quantity(data, "capacity") + lpquantity = selected_quantity(data, "capacity") assert set(lpquantity.dims) == {"d(asset)", "d(replacement)"} - copy = ScipyAdapter._back_to_muse_quantity( + copy = back_to_muse_quantity( lpquantity.costs.values, xr.zeros_like(lpquantity.costs) ) assert (copy == lpcosts.capacity).all() # Test production - lpquantity = ScipyAdapter._selected_quantity(data, "production") + lpquantity = selected_quantity(data, "production") assert set(lpquantity.dims) == { "d(asset)", "d(replacement)", "d(timeslice)", "d(commodity)", } - copy = ScipyAdapter._back_to_muse_quantity( + copy = back_to_muse_quantity( lpquantity.costs.values, xr.zeros_like(lpquantity.costs) ) assert (copy == lpcosts.production).all() -def test_back_to_muse_all(lpcosts): - from muse.constraints import ScipyAdapter - - data = ScipyAdapter._unified_dataset(lpcosts) - lpcapacity = ScipyAdapter._selected_quantity(data, "capacity") - lpproduction = ScipyAdapter._selected_quantity(data, "production") +def test_back_to_muse_all(lpcosts, lp_inputs): + data = unified_dataset(lpcosts) + lpcapacity = selected_quantity(data, "capacity") + lpproduction = selected_quantity(data, "production") x = np.concatenate( ( @@ -400,181 +503,42 @@ def test_back_to_muse_all(lpcosts): ) ) - copy = ScipyAdapter._back_to_muse( - x, xr.zeros_like(lpcapacity.costs), xr.zeros_like(lpproduction.costs) - ) + adapter = ScipyAdapter.from_muse_data(**lp_inputs, constraints=[]) + copy = adapter.to_muse(x) assert copy.capacity.size + copy.production.size == x.size assert (copy.capacity == lpcosts.capacity).all() assert (copy.production == lpcosts.production).all() -def test_scipy_adapter_back_to_muse(costs, constraints, commodities, lpcosts): - """Test converting back from scipy adapter format to MUSE format.""" - from muse.constraints import ScipyAdapter - - data = ScipyAdapter._unified_dataset(lpcosts) - lpcapacity = ScipyAdapter._selected_quantity(data, "capacity") - lpproduction = ScipyAdapter._selected_quantity(data, "production") - - x = np.concatenate( - ( - lpcosts.capacity.transpose( - *[u[2:-1] for u in lpcapacity.dims] - ).values.flatten(), - lpcosts.production.transpose( - *[u[2:-1] for u in lpproduction.dims] - ).values.flatten(), - ) +def test_scipy_adapter_standard_constraints(lp_inputs, constraints): + adapter = ScipyAdapter.from_muse_data( + **lp_inputs, constraints=list(constraints.values()) ) - adapter = ScipyAdapter.factory(costs, constraints, commodities=commodities) - assert (adapter.to_muse(x).capacity == lpcosts.capacity).all() - assert (adapter.to_muse(x).production == lpcosts.production).all() - - -def _as_list(data: Union[xr.DataArray, xr.Dataset]) -> Union[xr.DataArray, xr.Dataset]: - if "timeslice" in data.dims: - data = data.copy(deep=False) - index = pd.MultiIndex.from_tuples( - data.get_index("timeslice"), names=("month", "day", "hour") - ) - mindex_coords = xr.Coordinates.from_pandas_multiindex(index, "timeslice") - data = drop_timeslice(data).assign_coords(mindex_coords) - return data - - -def test_scipy_adapter_standard_constraints(costs, constraints, commodities): - from muse.constraints import ScipyAdapter - - adapter = ScipyAdapter.factory(costs, constraints, commodities=commodities) - constraint_map = {cs.name: cs for cs in constraints} - maxprod = constraint_map["max_production"] - maxcapa = constraint_map["max capacity expansion"] - demand = constraint_map["demand"] - dlc = constraint_map["demand_limiting_capacity"] - n_constraints = adapter.b_ub.size n_decision_vars = adapter.c.size + maxprod_constraint = constraints["max_production"] - assert n_decision_vars == costs.size + maxprod.production.size + assert ( + n_decision_vars + == lp_inputs["capacity_costs"].size + maxprod_constraint.production.size + ) assert adapter.b_eq is None assert adapter.A_eq is None assert adapter.A_ub.shape == (n_constraints, n_decision_vars) - assert n_constraints == sum(c.b.size for c in [demand, maxprod, maxcapa, dlc]) + assert n_constraints == sum(c.b.size for c in constraints.values()) -def test_scipy_solver(technologies, costs, constraints, commodities): +def test_scipy_solver(model_data, lp_inputs, constraints): """Test the scipy solver for demand matching.""" from muse.investments import scipy_match_demand solution = scipy_match_demand( - costs=costs, - search_space=xr.ones_like(costs), - technologies=technologies, - constraints=constraints, - commodities=commodities, + costs=lp_inputs["capacity_costs"], + commodities=lp_inputs["commodities"], + search_space=model_data["search_space"], + technologies=model_data["technologies"], + constraints=list(constraints.values()), ) assert isinstance(solution, xr.DataArray) assert set(solution.dims) == {"asset", "replacement"} - - -def test_minimum_service( - market_demand, capacity, search_space, technologies, constraints -): - from muse.constraints import minimum_service - - # Test with no minimum service factor - assert minimum_service(market_demand, capacity, search_space, technologies) is None - - # Test with minimum service factor - technologies["minimum_service_factor"] = 0.4 * xr.ones_like( - technologies.technology, dtype=float - ) - min_service = minimum_service(market_demand, capacity, search_space, technologies) - constraints.append(min_service) - assert isinstance(min_service, xr.Dataset) - - -def test_max_capacity_expansion_properties(max_capacity_expansion): - assert (max_capacity_expansion.capacity == 1).all() - assert max_capacity_expansion.production == 0 - assert max_capacity_expansion.b.dims == ("replacement",) - assert max_capacity_expansion.b.shape == (4,) - assert ( - max_capacity_expansion.replacement - == ["estove", "gasboiler", "gasstove", "heatpump"] - ).all() - - -def test_max_capacity_expansion_no_limits( - market_demand, capacity, search_space, technologies -): - from muse.constraints import max_capacity_expansion - - techs = technologies.drop_vars( - ["max_capacity_addition", "max_capacity_growth", "total_capacity_limit"] - ) - assert max_capacity_expansion(market_demand, capacity, search_space, techs) is None - - -def test_max_capacity_expansion_seed( - market_demand, capacity, search_space, technologies -): - from muse.constraints import max_capacity_expansion - - seed = 10 - technologies["growth_seed"] = seed - - # Test different capacity scenarios - scenarios = [0, seed, 2 * seed] - results = [] - for cap in scenarios: - capacity.sel(year=2020)[:] = cap - results.append( - max_capacity_expansion(market_demand, capacity, search_space, technologies) - ) - - # Zero capacity should match seed capacity - assert results[0].b.values == approx(results[1].b.values) - # Higher capacity should differ - assert results[0].b.values != approx(results[2].b.values) - - -def test_max_capacity_expansion_infinite_limits( - market_demand, capacity, search_space, technologies -): - from muse.constraints import max_capacity_expansion - - for limit in [ - "max_capacity_addition", - "max_capacity_growth", - "total_capacity_limit", - ]: - technologies[limit] = np.inf - with raises(ValueError): - max_capacity_expansion(market_demand, capacity, search_space, technologies) - - -def test_max_production(max_production): - assert (max_production.capacity <= 0).all() - - -def test_demand_limiting_capacity( - demand_limiting_capacity, max_production, demand_constraint -): - # Test capacity values - expected_capacity = ( - -max_production.capacity.max("timeslice").values - if "timeslice" in max_production.capacity.dims - else -max_production.capacity.values - ) - assert demand_limiting_capacity.capacity.values == approx(expected_capacity) - - # Test production and b values - assert demand_limiting_capacity.production == 0 - expected_b = ( - demand_constraint.b.max("timeslice").values - if "timeslice" in demand_constraint.b.dims - else demand_constraint.b.values - ) - assert demand_limiting_capacity.b.values == approx(expected_b)