Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/batcontrol_config_dummy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ battery_control:
always_allow_discharge_limit: 0.90 # 0.00 to 1.00 above this SOC limit using energy from the battery is always allowed
max_charging_from_grid_limit: 0.89 # 0.00 to 1.00 charging from the grid is only allowed until this SOC limit
# min_grid_charge_soc: 0.55 # optional 0.00 to 1.00 target to preserve/charge before expensive slots
# grid_charge_target_strategy: fixed # fixed = use min_grid_charge_soc unchanged; forecast = raise it from forecasted expensive-slot need
# grid_charge_forecast_pv_factor: 1.0 # forecast strategy only: 0.0 to 1.0 multiplier for PV forecast trust
min_recharge_amount: 100 # in Wh, start & minimum amount of energy to recharge the battery

#--------------------------
Expand Down
15 changes: 14 additions & 1 deletion src/batcontrol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .logic import CalculationInput, CalculationParameters
from .logic import CommonLogic
from .logic import PeakShavingConfig
from .logic.grid_charge_target import GridChargeTargetConfig

from .dynamictariff import DynamicTariff as tariff_factory
from .inverter import Inverter as inverter_factory
Expand Down Expand Up @@ -235,6 +236,13 @@ def __init__(self, configdict: dict):
self.batconfig.get('min_grid_charge_soc', None),
'battery_control.min_grid_charge_soc'
)
self.grid_charge_target_config = (
GridChargeTargetConfig.from_battery_control_config(self.batconfig)
)
self.grid_charge_target_strategy = self.grid_charge_target_config.strategy
self.grid_charge_forecast_pv_factor = (
self.grid_charge_target_config.pv_forecast_factor
)
self.preserve_min_grid_charge_soc = False
if (self.min_grid_charge_soc is not None
and self.min_grid_charge_soc > self.max_charging_from_grid_limit):
Expand Down Expand Up @@ -616,14 +624,16 @@ def run(self):
self.peak_shaving_config,
enabled=peak_shaving_config_enabled and not evcc_disable_peak_shaving,
)
max_capacity = self.get_max_capacity()
calc_parameters = CalculationParameters(
self.max_charging_from_grid_limit,
self.min_price_difference,
self.min_price_difference_rel,
self.get_max_capacity(),
max_capacity,
min_grid_charge_soc=self.min_grid_charge_soc,
preserve_min_grid_charge_soc=self.preserve_min_grid_charge_soc,
peak_shaving=ps_runtime,
grid_charge_target=self.grid_charge_target_config,
)

self.last_logic_instance = this_logic_run
Expand All @@ -643,6 +653,9 @@ def run(self):
if self.mqtt_api is not None:
self.mqtt_api.publish_min_dynamic_price_diff(
calc_output.min_dynamic_price_difference)
if calc_output.effective_min_grid_charge_soc is not None:
self.mqtt_api.publish_effective_min_grid_charge_soc(
calc_output.effective_min_grid_charge_soc)

if self.discharge_blocked and not \
self.general_logic.is_discharge_always_allowed_soc(self.get_SOC()):
Expand Down
61 changes: 52 additions & 9 deletions src/batcontrol/logic/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .logic_interface import CalculationOutput, InverterControlSettings
from .common import CommonLogic
from .decision_logging import GridRechargeDecision, log_grid_recharge_decision
from .grid_charge_target import calculate_effective_min_grid_charge_soc

# Minimum remaining time in hours to prevent division by very small numbers
# when calculating charge rates. This constant serves as a safety threshold:
Expand Down Expand Up @@ -65,6 +66,9 @@ def calculate(self, input_data: CalculationInput, calc_timestamp: Optional[datet
required_recharge_energy=0.0,
min_dynamic_price_difference=0.0
)
self.calculation_output.effective_min_grid_charge_soc = (
self.__calculate_effective_min_grid_charge_soc(input_data)
)

self.inverter_control_settings = self.calculate_inverter_mode(
input_data,
Expand Down Expand Up @@ -304,22 +308,26 @@ def __is_discharge_allowed(self, calc_input: CalculationInput,
# add_remaining required_energy to reserved_storage
reserved_storage += required_energy

forecast_target_active = self.__is_forecast_target_raised()
min_grid_charge_soc_active = (
self.calculation_parameters.preserve_min_grid_charge_soc
and reserved_storage > 0
and self.__has_grid_charge_soc_price_signal(
consumption,
prices,
max_slots,
current_price,
min_dynamic_price_difference
and (reserved_storage > 0 or forecast_target_active)
and (
forecast_target_active
or self.__has_grid_charge_soc_price_signal(
consumption,
prices,
max_slots,
current_price,
min_dynamic_price_difference
)
)
)
reserved_storage = self.common.apply_min_grid_charge_soc_reserve(
reserved_storage,
calc_input.stored_energy,
calc_input.stored_usable_energy,
self.calculation_parameters.min_grid_charge_soc,
self.calculation_output.effective_min_grid_charge_soc,
min_grid_charge_soc_active
)

Expand Down Expand Up @@ -458,13 +466,16 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput,
"[Rule] No additional energy required, because stored energy is sufficient."
)
recharge_energy = 0.0

target_soc = self.calculation_output.effective_min_grid_charge_soc
if required_energy == 0.0 and not self.__is_forecast_target_raised():
self.calculation_output.required_recharge_energy = recharge_energy
return recharge_energy

recharge_energy = self.common.apply_min_grid_charge_soc_target(
recharge_energy,
calc_input.stored_energy,
self.calculation_parameters.min_grid_charge_soc
target_soc
)

free_capacity = calc_input.free_capacity
Expand All @@ -484,6 +495,38 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput,
self.calculation_output.required_recharge_energy = recharge_energy
return recharge_energy

def __calculate_effective_min_grid_charge_soc(
self, calc_input: CalculationInput) -> Optional[float]:
"""Calculate the runtime grid-charge target inside the logic layer."""
effective_soc = calculate_effective_min_grid_charge_soc(
config=self.calculation_parameters.grid_charge_target,
calc_input=calc_input,
configured_min_grid_charge_soc=(
self.calculation_parameters.min_grid_charge_soc),
max_charging_from_grid_limit=(
self.calculation_parameters.max_charging_from_grid_limit),
max_capacity=self.calculation_parameters.max_capacity,
min_price_difference=self.calculation_parameters.min_price_difference,
min_price_difference_rel=(
self.calculation_parameters.min_price_difference_rel),
)
if effective_soc != self.calculation_parameters.min_grid_charge_soc:
logger.info(
'Forecast grid-charge target raised min_grid_charge_soc '
'from %.1f%% to %.1f%%',
self.calculation_parameters.min_grid_charge_soc * 100,
effective_soc * 100,
)
return effective_soc

def __is_forecast_target_raised(self) -> bool:
"""Return True when forecast strategy raised the configured floor."""
effective_soc = self.calculation_output.effective_min_grid_charge_soc
configured_soc = self.calculation_parameters.min_grid_charge_soc
return (effective_soc is not None
and configured_soc is not None
and effective_soc > configured_soc)

def __calculate_min_dynamic_price_difference(self, price: float) -> float:
""" Calculate the dynamic limit for the current price """
return round(
Expand Down
185 changes: 185 additions & 0 deletions src/batcontrol/logic/grid_charge_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""Effective grid-charge SoC target calculation."""

from dataclasses import dataclass
from typing import Optional, Sequence, Any

GRID_CHARGE_TARGET_STRATEGY_FIXED = 'fixed'
GRID_CHARGE_TARGET_STRATEGY_FORECAST = 'forecast'
GRID_CHARGE_TARGET_STRATEGIES = (
GRID_CHARGE_TARGET_STRATEGY_FIXED,
GRID_CHARGE_TARGET_STRATEGY_FORECAST,
)


@dataclass(frozen=True)
class GridChargeTargetConfig:
"""Configuration for effective grid-charge target calculation."""

strategy: str = GRID_CHARGE_TARGET_STRATEGY_FIXED
pv_forecast_factor: float = 1.0

@classmethod
def from_battery_control_config(cls, config: dict) -> 'GridChargeTargetConfig':
"""Create target strategy configuration from battery_control config."""
return cls(
strategy=_parse_grid_charge_target_strategy(
config.get(
'grid_charge_target_strategy',
GRID_CHARGE_TARGET_STRATEGY_FIXED,
)
),
pv_forecast_factor=_parse_ratio(
config.get('grid_charge_forecast_pv_factor', 1.0),
'battery_control.grid_charge_forecast_pv_factor',
),
)


def _parse_ratio(value, config_key: str) -> float:
"""Parse a required 0..1 ratio config value."""
try:
ratio = float(value)
except (TypeError, ValueError) as exc:
raise ValueError(
f"{config_key} must be numeric between 0 and 1, got {value!r}"
) from exc
if not 0 <= ratio <= 1:
raise ValueError(
f"{config_key} must be between 0 and 1, got {ratio}"
)
return ratio


def _parse_grid_charge_target_strategy(value) -> str:
"""Parse the grid-charge target strategy config value."""
strategy = str(value).strip().lower()
if strategy not in GRID_CHARGE_TARGET_STRATEGIES:
raise ValueError(
f"battery_control.grid_charge_target_strategy must be one of "
f"{GRID_CHARGE_TARGET_STRATEGIES}, got {value!r}"
)
return strategy


def _ordered_values(values) -> Sequence[float]:
if isinstance(values, dict):
if not values:
return []
keys = set(values.keys())
error_message = (
"forecast dict values must use consecutive integer "
"indices starting at 0"
)
if not all(isinstance(index, int) for index in keys):
raise ValueError(error_message)
expected_keys = set(range(max(keys) + 1))
if keys != expected_keys:
raise ValueError(error_message)
return [values[index] for index in range(max(keys) + 1)]
return list(values)


def _calculate_min_dynamic_price_difference(
current_price: float,
min_price_difference: float,
min_price_difference_rel: float) -> float:
return max(min_price_difference,
min_price_difference_rel * abs(current_price))


def calculate_effective_min_grid_charge_soc(
config: GridChargeTargetConfig,
calc_input: Any,
configured_min_grid_charge_soc: Optional[float],
max_charging_from_grid_limit: float,
max_capacity: float,
min_price_difference: float,
min_price_difference_rel: float = 0.0) -> Optional[float]:
"""Calculate the effective minimum grid-charge SoC from logic inputs."""
min_soc_energy = max(
0.0,
calc_input.stored_energy - calc_input.stored_usable_energy,
)
return calculate_effective_grid_charge_soc(
strategy=config.strategy,
configured_min_grid_charge_soc=configured_min_grid_charge_soc,
max_charging_from_grid_limit=max_charging_from_grid_limit,
max_capacity=max_capacity,
min_soc_energy=min_soc_energy,
production=calc_input.production,
consumption=calc_input.consumption,
prices=calc_input.prices,
min_price_difference=min_price_difference,
min_price_difference_rel=min_price_difference_rel,
pv_forecast_factor=config.pv_forecast_factor,
)


def calculate_effective_grid_charge_soc(
strategy: str,
configured_min_grid_charge_soc: Optional[float],
max_charging_from_grid_limit: float,
max_capacity: float,
min_soc_energy: float,
production,
consumption,
prices,
min_price_difference: float,
min_price_difference_rel: float = 0.0,
pv_forecast_factor: float = 1.0) -> Optional[float]:
"""Calculate the effective minimum grid-charge SoC for this evaluation.

``fixed`` returns the configured target unchanged. ``forecast`` treats the
configured target as a floor and raises it when future expensive-slot net
demand implies a higher target. Forecast PV can be discounted with
``pv_forecast_factor`` to account for uncertain PV ramps.
"""
if configured_min_grid_charge_soc is None:
return None
if strategy not in GRID_CHARGE_TARGET_STRATEGIES:
raise ValueError(
f"grid_charge_target_strategy must be one of "
f"{GRID_CHARGE_TARGET_STRATEGIES}, got '{strategy}'"
)
if strategy == GRID_CHARGE_TARGET_STRATEGY_FIXED:
return configured_min_grid_charge_soc
if max_capacity <= 0:
raise ValueError("max_capacity must be greater than 0")
if not 0 <= pv_forecast_factor <= 1:
raise ValueError(
"grid_charge_forecast_pv_factor must be between 0 and 1, "
f"got {pv_forecast_factor}"
)

production_values = _ordered_values(production)
consumption_values = _ordered_values(consumption)
price_values = _ordered_values(prices)
max_slot = min(len(production_values), len(consumption_values), len(price_values))
if max_slot < 2:
return configured_min_grid_charge_soc

current_price = price_values[0]
min_dynamic_price_difference = _calculate_min_dynamic_price_difference(
current_price,
min_price_difference,
min_price_difference_rel,
)

# Evaluate until the next price slot that is no more expensive than the
# current slot. This keeps the target tied to the current cheap/economical
# charging window rather than charging for the whole forecast horizon.
for slot in range(1, max_slot):
if price_values[slot] <= current_price:
max_slot = slot
break

forecast_need = 0.0
for slot in range(1, max_slot):
if price_values[slot] <= current_price + min_dynamic_price_difference:
continue
discounted_pv = production_values[slot] * pv_forecast_factor
forecast_need += max(0.0, consumption_values[slot] - discounted_pv)

forecast_target = (min_soc_energy + forecast_need) / max_capacity
forecast_target = min(forecast_target, max_charging_from_grid_limit)
return max(configured_min_grid_charge_soc, forecast_target)
12 changes: 11 additions & 1 deletion src/batcontrol/logic/logic_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Any
import datetime
import numpy as np

Expand All @@ -11,6 +11,12 @@
PEAK_SHAVING_VALID_MODES = ('time', 'price', 'combined')


def _default_grid_charge_target_config():
"""Create default grid-charge target strategy config lazily."""
from .grid_charge_target import GridChargeTargetConfig # pylint: disable=import-outside-toplevel
return GridChargeTargetConfig()


@dataclass
class PeakShavingConfig:
""" Holds peak shaving configuration parameters, initialized from the config dict.
Expand Down Expand Up @@ -117,6 +123,9 @@ class CalculationParameters:
# Peak shaving sub-configuration. evcc may set ``enabled=False`` for a
# single calculation cycle via ``dataclasses.replace`` in core.py.
peak_shaving: PeakShavingConfig = field(default_factory=PeakShavingConfig)
# Grid-charge target strategy sub-configuration. The concrete config class
# lives in grid_charge_target.py; use a lazy factory to avoid import cycles.
grid_charge_target: Any = field(default_factory=_default_grid_charge_target_config)

def __post_init__(self):
if self.min_grid_charge_soc is not None:
Expand All @@ -138,6 +147,7 @@ class CalculationOutput:
reserved_energy: float = 0.0
required_recharge_energy: float = 0.0
min_dynamic_price_difference: float = 0.05
effective_min_grid_charge_soc: Optional[float] = None

@dataclass
class InverterControlSettings:
Expand Down
Loading