-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathspores_algorithm.py
More file actions
183 lines (148 loc) · 7.41 KB
/
spores_algorithm.py
File metadata and controls
183 lines (148 loc) · 7.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import calliope
import xarray as xr
import numpy as np
###---------------------------------------------
# SPORES algorithm
###---------------------------------------------
def run_spores(number_of_spores, cost_optimal_model, scoring_method='integer'):
# Create some lists to store results as they get generated
spores = [] # full results
scores = [] # scores only
spores_counter = 1
number_of_spores = number_of_spores
model = cost_optimal_model
least_feasible_cost = model.results.cost.loc[{"costs": "monetary"}].sum().sum()
for i in range(spores_counter, spores_counter + number_of_spores):
if spores_counter == 1:
# Store the cost-optimal results
spores.append(model.results.expand_dims(spores=[0]))
scores.append(
model.backend.get_parameter("cost_flow_cap", as_backend_objs=False)
.sel(costs="spores_score")
.expand_dims(spores=[0])
)
# Update the slack-cost backend parameter based on the calculated minimum feasible system design cost
least_feasible_cost
model.backend.update_input("spores_cost_max", least_feasible_cost)
# Update the objective_cost_weights to reflect the ones defined for the SPORES mode
model.backend.update_input(
"objective_cost_weights", model.inputs.spores_objective_cost_weights
)
else:
pass
# Calculate weights based on a scoring method
spores_score = score_via_method(scoring_method, model)
# Assign a new score based on the calculated penalties
model.backend.update_input(
"cost_flow_cap", spores_score.reindex_like(model.inputs.cost_flow_cap)
)
# Run the model again to get a solution that reflects the new penalties
model.solve(force=True)
results = model.results.expand_dims(spores=[i])
# Store the results
spores.append(results)
scores.append(
model.backend.get_parameter("cost_flow_cap", as_backend_objs=False)
.sel(costs="spores_score")
.expand_dims(spores=[i])
)
spores_counter += 1
# Concatenate the results in the storage lists into xarray objects
spore_ds = xr.concat(spores, dim="spores", combine_attrs="drop")
score_da = xr.concat(scores, dim="spores")
backend = model.backend
return spore_ds, score_da, backend
###--------------------------------------------
# SCORING methods
###--------------------------------------------
def score_via_method(scoring_method, model):
scoring_method=scoring_method
results=model.results
backend=model.backend
def scoring_integer(results, backend):
# Filter for technologies of interest
spores_techs = model.inputs["spores_tracker"].notnull()
# Look at capacity deployment in the previous iteration
previous_cap = results.flow_cap
# Make sure that penalties are applied only to non-negligible deployments of capacity
min_relevant_size = 0.1 * previous_cap.where(spores_techs).max(
["nodes", "carriers", "techs"]
)
# Where capacity was deployed more than the minimal relevant size, assign an integer penalty (score)
new_score = previous_cap.copy()
new_score = new_score.where(spores_techs, other=0)
new_score = new_score.where(new_score > min_relevant_size, other=0)
new_score = new_score.where(new_score == 0, other=1000)
# Transform the score into a "cost" parameter
new_score.rename("cost_flow_cap")
new_score = new_score.expand_dims(costs=["spores_score"]).copy()
new_score = new_score.sum("carriers")
# Extract the existing cost parameters from the backend
all_costs = backend.get_parameter("cost_flow_cap", as_backend_objs=False)
try:
all_costs = all_costs.expand_dims(nodes=results.nodes).copy()
except:
pass
# Create a new version of the cost parameters by adding up the calculated scores
new_all_costs = all_costs
new_all_costs.loc[{"costs":"spores_score"}] += new_score.loc[{"costs":"spores_score"}]
return new_all_costs
def scoring_random(results, backend):
# Filter for technologies of interest
spores_techs = model.inputs["spores_tracker"].notnull()
# Look at capacity deployment in the previous iteration
previous_cap = results.flow_cap
# Assign a random penalty (score)
new_score = previous_cap.copy()
new_score = new_score.where(spores_techs, other=0)
new_score = new_score.where(
new_score == 0,
other=np.random.choice([0,1000],size=(previous_cap.shape)))
# Transform the score into a "cost" parameter
new_score.rename("cost_flow_cap")
new_score = new_score.expand_dims(costs=["spores_score"]).copy()
new_score = new_score.sum("carriers")
# Extract the existing cost parameters from the backend
all_costs = backend.get_parameter("cost_flow_cap", as_backend_objs=False)
try:
all_costs = all_costs.expand_dims(nodes=results.nodes).copy()
except:
pass
# Create a new version of the cost parameters by adding up the calculated scores
new_all_costs = all_costs
new_all_costs.loc[{"costs":"spores_score"}] += new_score.loc[{"costs":"spores_score"}]
return new_all_costs
def scoring_relative_deployment(results, backend):
# Get bigM or max limit for any tech as 'realistic infinite' limit
bigM = float(min(backend.inputs.bigM,backend.inputs['flow_cap_max'].max()))
# Filter for technologies of interest and that exist at a given node
existence = backend.inputs.definition_matrix
spores_techs = model.inputs["spores_tracker"].notnull()
# Look at capacity deployment in the previous iteration and calculate the relative deployment
previous_cap = results.flow_cap
potential_max_cap = backend.inputs['flow_cap_max']
potential_max_cap = potential_max_cap.fillna(float(bigM)) * existence.astype(float) * spores_techs.astype(float)
relative_cap = previous_cap / potential_max_cap
# Assign a penalty (score) based on the relative deployment
new_score = relative_cap.copy()
# Transform the score into a "cost" parameter
new_score.rename("cost_flow_cap")
new_score = new_score.expand_dims(costs=["spores_score"]).copy()
new_score = new_score.sum("carriers")
# Extract the existing cost parameters from the backend
all_costs = backend.get_parameter("cost_flow_cap", as_backend_objs=False)
try:
all_costs = all_costs.expand_dims(nodes=results.nodes).copy()
except:
pass
# Create a new version of the cost parameters by adding up the calculated scores
new_all_costs = all_costs
new_all_costs.loc[{"costs":"spores_score"}] += new_score.loc[{"costs":"spores_score"}]
return new_all_costs
allowed_methods = {
'integer': scoring_integer,
'relative_deployment': scoring_relative_deployment,
'random':scoring_random,
# 'evolving_average': scoring_evolving_average
}
return(allowed_methods[scoring_method](results, backend))