diff --git a/README.md b/README.md index 64adfc34..12f659e5 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ EOS Connect is a tool designed to optimize energy usage by interacting with the * Manages configurations via a user-friendly config.yaml file. * Displays results dynamically on a webpage. * Controlling FRONIUS inverters and battery charging systems interactively. +* Battery charging is dynamicaly limited depending on the SOC (charging curve) ## Current Status diff --git a/src/CONFIG_README.md b/src/CONFIG_README.md index 0eab6663..219074ae 100644 --- a/src/CONFIG_README.md +++ b/src/CONFIG_README.md @@ -23,6 +23,8 @@ A default config file will be created with the first start, if there is no confi - **price.source**: Data source for electricity price. Possible values: `tibber`, `akkudoktor`. - **price.token**: Token for electricity price. +- **price.feed_in_price**: Feed-in price for the grid. +- **price.negative_price_switch**: Switch for handling negative electricity prices (e.g., no payment if negative stock price). ### Battery Configuration - **battery.source**: Data source for battery SOC. Possible values: openhab, homeassistant, default (static data). diff --git a/src/config.py b/src/config.py index 3e335074..b612d947 100644 --- a/src/config.py +++ b/src/config.py @@ -59,6 +59,8 @@ def create_default_config(self): { "source": "default", "token": "tibberBearerToken", # token for electricity price + "feed_in_price": 0.0, # feed in price for the grid + "negative_price_switch": False, # switch for negative price } ), "battery": CommentedMap( @@ -155,6 +157,12 @@ def create_default_config(self): config["price"].yaml_set_comment_before_after_key( "token", before="Token for electricity price" ) + config["price"].yaml_set_comment_before_after_key( + "feed_in_price", before="feed in price for the grid" + ) + config["price"].yaml_set_comment_before_after_key( + "negative_price_switch", before="switch for no payment if negative stock price" + ) # battery configuration config.yaml_set_comment_before_after_key("battery", before="battery configuration") config["battery"].yaml_set_comment_before_after_key( diff --git a/src/eos_connect.py b/src/eos_connect.py index 9b77f652..78a3f918 100644 --- a/src/eos_connect.py +++ b/src/eos_connect.py @@ -9,7 +9,6 @@ import logging import json import threading -import sched import pytz import requests from flask import Flask, Response, render_template_string @@ -21,11 +20,10 @@ from interfaces.inverter_fronius import FroniusWR from interfaces.evcc_interface import EvccInterface from interfaces.eos_interface import EosInterface +from interfaces.price_interface import PriceInterface EOS_TGT_DURATION = 48 EOS_API_GET_PV_FORECAST = "https://api.akkudoktor.net/forecast" -AKKUDOKTOR_API_PRICES = "https://api.akkudoktor.net/prices" -TIBBER_API = "https://api.tibber.com/v1-beta/gql" ################################################################################################### @@ -45,205 +43,108 @@ def formatTime(self, record, datefmt=None): return record_time.strftime(datefmt or self.default_time_format) -# getting data -def get_prices(tgt_duration, start_time=None): - """ - Retrieve prices based on the target duration and optional start time. - - This function fetches prices from different sources based on the configuration. - It supports fetching prices from 'tibber' and 'default' sources. - - Args: - tgt_duration (int): The target duration for which prices are to be fetched. - start_time (datetime, optional): The start time from which prices are to be fetched. - Defaults to None. - - Returns: - list: A list of prices for the specified duration and start time. Returns an empty list - if the price source is not supported. - """ - if config_manager.config["price"]["source"] == "tibber": - return get_prices_from_tibber(tgt_duration, start_time) - if config_manager.config["price"]["source"] == "default": - return get_prices_from_akkudoktor(tgt_duration, start_time) - logger.error("[PRICES] Price source currently not supported.") - return [] - - -def get_prices_from_akkudoktor(tgt_duration, start_time=None): - """ - Fetches and processes electricity prices for today and tomorrow. - - This function retrieves electricity prices for today and tomorrow from an API, - processes the prices, and returns a list of prices for the specified duration starting - from the specified start time. If tomorrow's prices are not available, today's prices are - repeated for tomorrow. +################################################################################################### +LOGLEVEL = logging.DEBUG # start before reading the config file +logger = logging.getLogger(__name__) +formatter = logging.Formatter( + "%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S" +) +streamhandler = logging.StreamHandler(sys.stdout) - Args: - tgt_duration (int): The target duration in hours for which the prices are needed. - start_time (datetime, optional): The start time for fetching prices. Defaults to None. +streamhandler.setFormatter(formatter) +logger.addHandler(streamhandler) +logger.setLevel(LOGLEVEL) +logger.info("[Main] Starting eos_connect") +################################################################################################### +base_path = os.path.dirname(os.path.abspath(__file__)) +# get param to set a specific path +if len(sys.argv) > 1: + current_dir = sys.argv[1] +else: + current_dir = base_path +################################################################################################### +config_manager = ConfigManager(current_dir) +time_zone = pytz.timezone(config_manager.config["time_zone"]) - Returns: - list: A list of electricity prices for the specified duration starting - from the specified start time. - """ - if config_manager.config["price"]["source"] != "default": - logger.error( - "[PRICES] Price source %s currently not supported.", - config_manager.config["price"]["source"], - ) - return [] - logger.debug("[PRICES] Fetching prices from akkudoktor ...") - if start_time is None: - start_time = datetime.now(time_zone).replace(minute=0, second=0, microsecond=0) - current_hour = start_time.hour - request_url = ( - AKKUDOKTOR_API_PRICES - + "?start=" - + start_time.strftime("%Y-%m-%d") - + "&end=" - + (start_time + timedelta(days=1)).strftime("%Y-%m-%d") +LOGLEVEL = config_manager.config["log_level"].upper() +logger.setLevel(LOGLEVEL) +formatter = TimezoneFormatter( + "%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S", tz=time_zone +) +streamhandler.setFormatter(formatter) +logger.info( + "[Main] set user defined time zone to %s and loglevel to %s", + config_manager.config["time_zone"], + LOGLEVEL, +) +# initialize eos interface +eos_interface = EosInterface( + eos_server=config_manager.config["eos"]["server"], + eos_port=config_manager.config["eos"]["port"], + timezone=time_zone, +) +# initialize base control +base_control = BaseControl(config_manager.config, time_zone) +# initialize the inverter interface +inverter_interface = None +if config_manager.config["inverter"]["type"] == "fronius_gen24": + inverter_config = { + "address": config_manager.config["inverter"]["address"], + "max_grid_charge_rate": config_manager.config["inverter"][ + "max_grid_charge_rate" + ], + "max_pv_charge_rate": config_manager.config["inverter"]["max_pv_charge_rate"], + "user": config_manager.config["inverter"]["user"], + "password": config_manager.config["inverter"]["password"], + } + inverter_interface = FroniusWR(inverter_config) +else: + logger.info( + "[Inverter] Inverter type %s - no external connection." + + " Changing to show only mode.", + config_manager.config["inverter"]["type"], ) - logger.debug("[PRICES] Requesting prices from akkudoktor: %s", request_url) - try: - response = requests.get(request_url, timeout=10) - response.raise_for_status() - data = response.json() - except requests.exceptions.Timeout: - logger.error( - "[PRICES] Request timed out while fetching prices from akkudoktor." - ) - return [] - except requests.exceptions.RequestException as e: - logger.error( - "[PRICES] Request failed while fetching prices from akkudoktor: %s", e - ) - return [] - - prices = [] - for price in data["values"]: - prices.append(round(price["marketpriceEurocentPerKWh"] / 100000, 9)) - # logger.debug( - # "[Main] day 1 - price for %s -> %s", price["marketpriceEurocentPerKWh"], - # price["start"] - # ) - if start_time is None: - start_time = datetime.now(time_zone).replace(minute=0, second=0, microsecond=0) - current_hour = start_time.hour - extended_prices = prices[current_hour : current_hour + tgt_duration] - - if len(extended_prices) < tgt_duration: - remaining_hours = tgt_duration - len(extended_prices) - extended_prices.extend(prices[:remaining_hours]) - logger.info("[PRICES] Prices from AKKUDOKTOR fetched successfully.") - return extended_prices - - -def get_prices_from_tibber(tgt_duration, start_time=None): - """ - Fetches and processes electricity prices for today and tomorrow. - - This function retrieves electricity prices for today and tomorrow from a web service, - processes the prices, and returns a list of prices for the specified duration starting - from the specified start time. If tomorrow's prices are not available, today's prices are - repeated for tomorrow. - - Args: - tgt_duration (int): The target duration in hours for which the prices are needed. - start_time (datetime, optional): The start time for fetching prices. Defaults to None. - Returns: - list: A list of electricity prices for the specified duration starting - from the specified start time. +# callback function for evcc interface +def charging_state_callback(new_state): """ - logger.debug("[PRICES] Prices fetching from TIBBER started") - if config_manager.config["price"]["source"] != "tibber": - logger.error("[PRICES] Price source currently not supported.") - return [] - headers = { - "Authorization": config_manager.config["price"]["token"], - "Content-Type": "application/json", - } - query = """ - { - viewer { - homes { - currentSubscription { - priceInfo { - today { - total - startsAt - } - tomorrow { - total - startsAt - } - } - } - } - } - } + Callback function that gets triggered when the charging state changes. """ - try: - response = requests.post( - TIBBER_API, headers=headers, json={"query": query}, timeout=10 - ) - response.raise_for_status() - except requests.exceptions.Timeout: - logger.error("[PRICES] Request timed out while fetching prices from Tibber.") - return [] - except requests.exceptions.RequestException as e: - logger.error("[PRICES] Request failed while fetching prices from Tibber: %s", e) - return [] + logger.info("[MAIN] EVCC Event - Charging state changed to: %s", new_state) + change_control_state() - response.raise_for_status() - data = response.json() - if "errors" in data and data["errors"] is not None: - logger.error( - "[PRICES] Error fetching prices - tibber API response: %s", - data["errors"][0]["message"], - ) - return [] - today_prices = json.dumps( - data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]["today"] - ) - tomorrow_prices = json.dumps( - data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"][ - "tomorrow" - ] - ) +evcc_interface = EvccInterface( + url=config_manager.config["evcc"]["url"], + update_interval=10, + on_charging_state_change=charging_state_callback, +) + +# intialize the load interface +load_interface = LoadInterface( + config_manager.config.get("load", {}).get("source", ""), + config_manager.config.get("load", {}).get("url", ""), + config_manager.config.get("load", {}).get("load_sensor", ""), + config_manager.config.get("load", {}).get("car_charge_load_sensor", ""), + config_manager.config.get("load", {}).get("access_token", ""), + time_zone, +) - today_prices_json = json.loads(today_prices) - tomorrow_prices_json = json.loads(tomorrow_prices) - prices = [] - - for price in today_prices_json: - prices.append(round(price["total"] / 1000, 9)) - # logger.debug( - # "[Main] day 1 - price for %s -> %s", price["startsAt"], price["total"] - # ) - if tomorrow_prices_json: - for price in tomorrow_prices_json: - prices.append(round(price["total"] / 1000, 9)) - # logger.debug( - # "[Main] day 2 - price for %s -> %s", price["startsAt"], price["total"] - # ) - else: - prices.extend(prices[:24]) # Repeat today's prices for tomorrow - - if start_time is None: - start_time = datetime.now(time_zone).replace(minute=0, second=0, microsecond=0) - current_hour = start_time.hour - extended_prices = prices[current_hour : current_hour + tgt_duration] - - if len(extended_prices) < tgt_duration: - remaining_hours = tgt_duration - len(extended_prices) - extended_prices.extend(prices[:remaining_hours]) - logger.info("[PRICES] Prices from TIBBER fetched successfully.") - return extended_prices +battery_interface = BatteryInterface( + config_manager.config.get("battery", {}).get("source", ""), + config_manager.config.get("battery", {}).get("url", ""), + config_manager.config.get("battery", {}).get("soc_sensor", ""), + config_manager.config.get("battery", {}).get("access_token", ""), + config_manager.config.get("battery", {}).get("max_charge_power_w", ""), +) +price_interface = PriceInterface( + config_manager.config["price"]["source"], + config_manager.config["price"]["token"], + config_manager.config["price"]["feed_in_price"], + config_manager.config["price"]["negative_price_switch"], +) def create_forecast_request(pv_config_entry): """ @@ -307,7 +208,11 @@ def get_pv_forecast(tgt_value="power", pv_config_entry=None, tgt_duration=24): for forecast in forecast_entry: entry_time = datetime.fromisoformat(forecast["datetime"]).astimezone() if current_time <= entry_time < end_time: - forecast_values.append(forecast.get(tgt_value, 0)) + value = forecast.get(tgt_value, 0) + # if power is negative, set it to 0 (fixing wrong values form api) + if tgt_value == "power" and value < 0: + value = 0 + forecast_values.append(value) request_type = "PV forecast" pv_config_name = "for " + pv_config_entry["name"] if tgt_value == "temperature": @@ -372,62 +277,8 @@ def create_optimize_request(): def get_ems_data(): return { "pv_prognose_wh": get_summarized_pv_forecast(EOS_TGT_DURATION), - "strompreis_euro_pro_wh": get_prices( - EOS_TGT_DURATION, - datetime.now(time_zone).replace( - hour=0, minute=0, second=0, microsecond=0 - ), - ), - "einspeiseverguetung_euro_pro_wh": [ - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - ], + "strompreis_euro_pro_wh": price_interface.get_current_prices(), + "einspeiseverguetung_euro_pro_wh": price_interface.get_current_feedin_prices(), "preis_euro_pro_wh_akku": 0, "gesamtlast": load_interface.get_load_profile(EOS_TGT_DURATION), } @@ -444,7 +295,9 @@ def get_pv_akku_data(): "max_charge_power_w": config_manager.config["battery"][ "max_charge_power_w" ], - "initial_soc_percentage": round(battery_interface.battery_request_current_soc()), + "initial_soc_percentage": round( + battery_interface.battery_request_current_soc() + ), "min_soc_percentage": config_manager.config["battery"][ "min_soc_percentage" ], @@ -508,9 +361,9 @@ def get_dishwasher_data(): class OptimizationScheduler: - ''' + """ A scheduler class that manages the periodic execution of an optimization process - in a background thread. The class is responsible for starting, stopping, and + in a background thread. The class is responsible for starting, stopping, and managing the lifecycle of the optimization service. Attributes: update_interval (int): The interval in seconds between optimization runs. @@ -521,7 +374,8 @@ class OptimizationScheduler: shutdown(): _update_state_loop(): run_optimization(): - ''' + """ + def __init__(self, update_interval): self.update_interval = update_interval self._update_thread = None @@ -570,27 +424,32 @@ def _update_state_loop(self): def run_optimization(self): """ - Executes the optimization process by creating an optimization request, - sending it to the EOS interface, processing the response, and scheduling + Executes the optimization process by creating an optimization request, + sending it to the EOS interface, processing the response, and scheduling the next optimization run. The method performs the following steps: 1. Logs the start of a new optimization run. 2. Creates an optimization request in JSON format and saves it to a file. 3. Sends the optimization request to the EOS interface and retrieves the response. 4. Adds a timestamp to the response and saves it to a file. - 5. Extracts control data from the response and, if no error is detected, + 5. Extracts control data from the response and, if no error is detected, applies the control settings and updates the control state. 6. Calculates the time for the next optimization run and logs the sleep duration. Raises: - Any exceptions raised during file operations, JSON serialization, + Any exceptions raised during file operations, JSON serialization, or EOS interface communication will propagate to the caller. Notes: - - The method assumes the presence of global variables or objects such as + - The method assumes the presence of global variables or objects such as `logger`, `base_path`, `eos_interface`, `config_manager`, and `time_zone`. - - The `config_manager.config` dictionary is expected to contain the + - The `config_manager.config` dictionary is expected to contain the necessary configuration values for "eos.timeout" and "refresh_time". """ logger.info("[Main] start new run") + # update prices + price_interface.update_prices( + EOS_TGT_DURATION, + datetime.now(time_zone).replace(hour=0, minute=0, second=0, microsecond=0), + ) # create optimize request json_optimize_input = create_optimize_request() @@ -632,112 +491,11 @@ def run_optimization(self): ) -################################################################################################### -LOGLEVEL = logging.DEBUG # start before reading the config file -logger = logging.getLogger(__name__) -formatter = logging.Formatter( - "%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S" -) -streamhandler = logging.StreamHandler(sys.stdout) - -streamhandler.setFormatter(formatter) -logger.addHandler(streamhandler) -logger.setLevel(LOGLEVEL) -logger.info("[Main] Starting eos_connect") -################################################################################################### -base_path = os.path.dirname(os.path.abspath(__file__)) -# get param to set a specific path -if len(sys.argv) > 1: - current_dir = sys.argv[1] -else: - current_dir = base_path -################################################################################################### -config_manager = ConfigManager(current_dir) -time_zone = pytz.timezone(config_manager.config["time_zone"]) - -LOGLEVEL = config_manager.config["log_level"].upper() -logger.setLevel(LOGLEVEL) -formatter = TimezoneFormatter( - "%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S", tz=time_zone -) -streamhandler.setFormatter(formatter) -logger.info( - "[Main] set user defined time zone to %s and loglevel to %s", - config_manager.config["time_zone"], - LOGLEVEL, -) -# initialize eos interface -eos_interface = EosInterface( - eos_server=config_manager.config["eos"]["server"], - eos_port=config_manager.config["eos"]["port"], - timezone=time_zone, -) -# initialize base control -base_control = BaseControl(config_manager.config, time_zone) -# initialize the inverter interface -inverter_interface = None -if config_manager.config["inverter"]["type"] == "fronius_gen24": - inverter_config = { - "address": config_manager.config["inverter"]["address"], - "max_grid_charge_rate": config_manager.config["inverter"][ - "max_grid_charge_rate" - ], - "max_pv_charge_rate": config_manager.config["inverter"]["max_pv_charge_rate"], - "user": config_manager.config["inverter"]["user"], - "password": config_manager.config["inverter"]["password"], - } - inverter_interface = FroniusWR(inverter_config) -else: - logger.info( - "[Inverter] Inverter type %s - no external connection." - + " Changing to show only mode.", - config_manager.config["inverter"]["type"], - ) - optimization_scheduler = OptimizationScheduler( config_manager.config["refresh_time"] * 60 # convert to seconds ) -# callback function for evcc interface -def charging_state_callback(new_state): - """ - Callback function that gets triggered when the charging state changes. - """ - logger.info("[MAIN] EVCC Event - Charging state changed to: %s", new_state) - change_control_state() - - -evcc_interface = EvccInterface( - url=config_manager.config["evcc"]["url"], - update_interval=10, - on_charging_state_change=charging_state_callback, -) - -# time.sleep(120) - -# evcc_interface.shutdown() - -# sys.exit(0) - -# intialize the load interface -load_interface = LoadInterface( - config_manager.config.get("load", {}).get("source", ""), - config_manager.config.get("load", {}).get("url", ""), - config_manager.config.get("load", {}).get("load_sensor", ""), - config_manager.config.get("load", {}).get("car_charge_load_sensor", ""), - config_manager.config.get("load", {}).get("access_token", ""), - time_zone, -) - -battery_interface = BatteryInterface( - config_manager.config.get("battery", {}).get("source", ""), - config_manager.config.get("battery", {}).get("url", ""), - config_manager.config.get("battery", {}).get("soc_sensor", ""), - config_manager.config.get("battery", {}).get("access_token", ""), -) - - def setting_control_data(ac_charge_demand_rel, dc_charge_demand_rel, discharge_allowed): """ Process the optimized response from EOS and update the load interface. @@ -783,13 +541,17 @@ def change_control_state(): logger.debug("[Main] Overall state changed recently") # MODE_CHARGE_FROM_GRID if base_control.get_current_overall_state() == 0: + # get the current ac charge demand and set it to the inverter according + # to the max dynamic charge power of the battery based on SOC + tgt_charge_power = min( + base_control.get_current_ac_charge_demand(), + round(battery_interface.get_max_charge_power_dyn()) + ) if inverter_en: - inverter_interface.set_mode_force_charge( - base_control.get_current_ac_charge_demand() - ) + inverter_interface.set_mode_force_charge(tgt_charge_power) logger.info( "[Main] Inverter mode set to charge from grid with %s W (_____|||||_____)", - base_control.get_current_ac_charge_demand(), + tgt_charge_power, ) # MODE_AVOID_DISCHARGE elif base_control.get_current_overall_state() == 1: @@ -908,6 +670,9 @@ def serve_current_demands(): current_inverter_mode = base_control.get_current_overall_state(False) current_battery_soc = battery_interface.get_current_soc() base_control.set_current_battery_soc(current_battery_soc) + current_inverter_mode = base_control.get_current_overall_state(False) + current_battery_soc = battery_interface.get_current_soc() + base_control.set_current_battery_soc(current_battery_soc) response_data = { "current_states": { "current_ac_charge_demand": current_ac_charge_demand, @@ -917,58 +682,13 @@ def serve_current_demands(): "evcc_charging_state": base_control.get_current_evcc_charging_state(), }, "battery_soc": current_battery_soc, + "battery_max_charge_power_dyn": battery_interface.get_max_charge_power_dyn(), "timestamp": datetime.now(time_zone).isoformat(), } return Response(json.dumps(response_data), content_type="application/json") if __name__ == "__main__": - # initial config - # set_config_value("latitude", 48.812) - # set_config_value("longitude", 8.907) - - # set_config_value("measurement_load0_name", "Household") - # set_config_value("loadakkudoktor_year_energy", 4600) - - # # set_config_value("pvforecast_provider", "PVForecastAkkudoktor") - # set_config_value("pvforecast_provider", "PVForecast") - # set_config_value("pvforecast0_surface_tilt", 31) - # set_config_value("pvforecast0_surface_azimuth", 13) - # set_config_value("pvforecast0_peakpower", 860.0) - # set_config_value("pvforecast0_inverter_paco", 800) - # # set_config_value("pvforecast0_userhorizon", [0,0]) - - # # persist and update config - # eos_save_config_to_config_file() - - # json_optimize_input = create_optimize_request() - - # with open( - # base_path + "/json/optimize_request.json", "w", encoding="utf-8" - # ) as file: - # json.dump(json_optimize_input, file, indent=4) - - # optimized_response = eos_interface.eos_set_optimize_request( - # json_optimize_input, config_manager.config["eos"]["timeout"] - # ) - # optimized_response["timestamp"] = datetime.now(time_zone).isoformat() - - # with open( - # base_path + "/json/optimize_response.json", "w", encoding="utf-8" - # ) as file: - # json.dump(optimized_response, file, indent=4) - - # time.sleep(30) - - # evcc_interface.shutdown() - # battery_interface.shutdown() - # if ( - # config_manager.config["inverter"]["type"] == "fronius_gen24" - # and inverter_interface is not None - # ): - # inverter_interface.shutdown() - - # sys.exit() http_server = WSGIServer( ("0.0.0.0", config_manager.config["eos_connect_web_port"]), diff --git a/src/interfaces/battery_interface.py b/src/interfaces/battery_interface.py index 989bc2c0..b2f89c96 100644 --- a/src/interfaces/battery_interface.py +++ b/src/interfaces/battery_interface.py @@ -1,8 +1,19 @@ """ -This module provides the `BatteryInterface` class, which serves as an interface for fetching +- BatteryInterface: A class to interact with SOC data sources, retrieve battery SOC, and calculate + dynamic maximum charge power based on SOC. +- threading: For managing background update services. +- time: For managing sleep intervals in the update loop. +sensor identifier, access token (if required), and maximum fixed charge power. Use the +`battery_request_current_soc` method to fetch the current SOC value or `get_max_charge_power_dyn` +to calculate the dynamic maximum charge power. + access_token=None, + max_charge_power_w=3000 +max_charge_power = battery_interface.get_max_charge_power_dyn() +print(f"Max Charge Power: {max_charge_power}W") +This module provides the `BatteryInterface` class, which serves as an interface for fetching the State of Charge (SOC) data of a battery from various sources such as OpenHAB and Home Assistant. -The `BatteryInterface` class allows users to configure the source of SOC data and retrieve the -current SOC value through its methods. It supports fetching SOC data from OpenHAB using its REST API +The `BatteryInterface` class allows users to configure the source of SOC data and retrieve the +current SOC value through its methods. It supports fetching SOC data from OpenHAB using its REST API and from Home Assistant using its API with authentication. Classes: - BatteryInterface: A class to interact with SOC data sources and retrieve battery SOC. @@ -24,6 +35,7 @@ print(f"Current SOC: {current_soc}%") ``` """ + import logging import threading import time @@ -51,11 +63,12 @@ class BatteryInterface: Fetches the current SOC of the battery based on the configured source. """ - def __init__(self, src, url, soc_sensor, access_token): + def __init__(self, src, url, soc_sensor, access_token, max_charge_power_w): self.src = src self.url = url self.soc_sensor = soc_sensor self.access_token = access_token + self.max_charge_power_fix = max_charge_power_w self.current_soc = 0 self.update_interval = 30 self._update_thread = None @@ -87,7 +100,8 @@ def fetch_soc_data_from_openhab(self): except requests.exceptions.Timeout: logger.error( "[BATTERY-IF] OPENHAB - Request timed out while fetching battery SOC. " - "Using default SOC = %s%%.", soc + "Using default SOC = %s%%.", + soc, ) return soc # Default SOC value in case of timeout except requests.exceptions.RequestException as e: @@ -96,7 +110,8 @@ def fetch_soc_data_from_openhab(self): "[BATTERY-IF] OPENHAB - Request failed while fetching battery SOC: %s. " "Using default SOC = %s%%." ), - e,soc + e, + soc, ) return soc # Default SOC value in case of request failure @@ -129,12 +144,13 @@ def fetch_soc_data_from_homeassistant(self): soc = float(entity_data["state"]) # print(f'State: {state}') logger.debug("[BATTERY-IF] successfully fetched SOC = %s %%", soc) - return round(soc,1) + return round(soc, 1) except requests.exceptions.Timeout: logger.error( ( "[BATTERY-IF] HOMEASSISTANT - Request timed out while fetching battery SOC. " - "Using default SOC = %s%%.", soc + "Using default SOC = %s%%.", + soc, ) ) return soc # Default SOC value in case of timeout @@ -144,7 +160,8 @@ def fetch_soc_data_from_homeassistant(self): "[BATTERY-IF] HOMEASSISTANT - Request failed while fetching battery SOC: %s. " "Using default SOC = %s %%." ), - e,soc + e, + soc, ) return soc # Default SOC value in case of request failure @@ -173,6 +190,32 @@ def get_current_soc(self): """ return self.current_soc + def get_max_charge_power_dyn(self, soc=None): + """ + Calculates the maximum charge power of the battery depending on the SOC. + + - If SOC < 60%, the fixed maximum charge power is used. + - If SOC >= 60% and < 95%, the maximum charge power decreases linearly. + - If SOC >= 95%, the maximum charge power is fixed at 500W. + + Args: + soc (float, optional): The state of charge to use for calculation. + If None, the current SOC is used. + + Returns: + float: The dynamically calculated maximum charge power in watts. + """ + if soc is None: + soc = self.current_soc + + if soc < 70: + return self.max_charge_power_fix + elif soc >= 70 and soc < 95: + return max(500, self.max_charge_power_fix * ((95 - soc) / 35)) + # return max(500, self.max_charge_power_fix * (1 - (soc / 100)**2)) + else: # SOC >= 95 + return 500 + def start_update_service(self): """ Starts the background thread to periodically update the state. @@ -203,7 +246,7 @@ def _update_state_loop(self): self.battery_request_current_soc() except (requests.exceptions.RequestException, ValueError, KeyError) as e: logger.error("[BATTERY-IF] Error while updating state: %s", e) - # Break the sleep interval into smaller chunks to allow immediate shutdown + # Break the sleep interval into smaller chunks to allow immediate shutdown sleep_interval = self.update_interval while sleep_interval > 0: if self._stop_event.is_set(): diff --git a/src/interfaces/price_interface.py b/src/interfaces/price_interface.py new file mode 100644 index 00000000..2b51d2af --- /dev/null +++ b/src/interfaces/price_interface.py @@ -0,0 +1,385 @@ +''' +This module provides the `PriceInterface` class, which is designed to handle electricity price +data retrieval, processing, and management from various sources. It includes methods for fetching +current prices, generating feed-in prices, and updating price data for a specified duration and +start time. The module supports integration with the Akkudoktor API and Tibber API for retrieving +electricity prices. +Classes: + PriceInterface: A class for managing electricity price data, including fetching, processing, + and generating feed-in prices. +Constants: + AKKUDOKTOR_API_PRICES (str): The URL for the Akkudoktor API to fetch electricity prices. + TIBBER_API (str): The URL for the Tibber API to fetch electricity prices. +Dependencies: + - datetime: For handling date and time operations. + - json: For parsing JSON responses from APIs. + - logging: For logging messages and errors. + - requests: For making HTTP requests to APIs. +Usage: + Create an instance of the `PriceInterface` class with the desired configuration, and use its + methods to fetch and process electricity price data. +Example: + price_interface = PriceInterface( + src="tibber", + access_token="your_access_token", + feed_in_tariff_price=5.0, + negative_price_switch=True, + timezone="Europe/Berlin" + price_interface.update_prices(tgt_duration=24, start_time=datetime.now()) + current_prices = price_interface.get_current_prices() + current_feedin_prices = price_interface.get_current_feedin_prices() +''' +from datetime import datetime, timedelta +import json +import logging +import requests + +logger = logging.getLogger("__main__") +logger.info("[PRICE-IF] loading module ") + +AKKUDOKTOR_API_PRICES = "https://api.akkudoktor.net/prices" +TIBBER_API = "https://api.tibber.com/v1-beta/gql" + + +class PriceInterface: + ''' + PriceInterface is a class designed to handle electricity price data retrieval, processing, + and management from various sources. It provides methods to fetch current prices, + generate feed-in prices, and update price data for a specified duration and start time. + Attributes: + src (str): The source of the price data (e.g., 'tibber', 'default'). + access_token (str): The access token for authenticating with the price source. + feed_in_tariff_price (float): The feed-in tariff price in cents per kWh. + negative_price_switch (bool): A flag to determine whether to set feed-in prices to 0 + for negative prices. + timezone (str): The timezone used for date and time operations. + current_prices (list): A list of current prices fetched from the price source. + current_prices_direct (list): A list of current prices without tax. + current_feedin (list): A list of current feed-in prices. + Methods: + update_prices(tgt_duration, start_time): + Updates the current prices and feed-in prices based on the target duration and + start time. + get_current_prices(): + Returns the current prices fetched from the price source. + get_current_feedin_prices(): + Returns the current feed-in prices fetched from the price source. + __create_feedin_prices(): + Creates feed-in prices based on the current prices and the configured feed-in + tariff price. + __retrieve_prices(tgt_duration, start_time=None): + Retrieves prices based on the target duration and optional start time from the + configured source. + __retrieve_prices_from_akkudoktor(tgt_duration, start_time=None): + Fetches and processes electricity prices for today and tomorrow from the + Akkudoktor API. + __retrieve_prices_from_tibber(tgt_duration, start_time=None): + Fetches and processes electricity prices for today and tomorrow from the Tibber API. + ''' + def __init__( + self, + src, + access_token, + feed_in_tariff_price=0.0, + negative_price_switch=False, + timezone="UTC", + ): + self.src = src + self.access_token = access_token + self.feed_in_tariff_price = feed_in_tariff_price + self.negative_price_switch = negative_price_switch + self.time_zone = timezone + self.current_prices = [] + self.current_prices_direct = [] # without tax + self.current_feedin = [] + + def update_prices(self, tgt_duration, start_time): + """ + Updates the current prices and feed-in prices based on the target duration + and start time provided. + + Args: + tgt_duration (int): The target duration for which prices need to be retrieved. + start_time (datetime): The starting time for retrieving prices. + + Updates: + self.current_prices: Updates with the retrieved prices for the given duration + and start time. + self.current_feedin: Updates with the generated feed-in prices. + + Logs: + Logs a debug message indicating that prices have been updated. + """ + self.current_prices = self.__retrieve_prices(tgt_duration, start_time) + self.current_feedin = self.__create_feedin_prices() + logger.debug("[PRICE-IF] Prices updated") + + def get_current_prices(self): + """ + Returns the current prices. + + This function returns the current prices fetched from the price source. + If the source is not supported, it returns an empty list. + + Returns: + list: A list of current prices. + """ + return self.current_prices + + def get_current_feedin_prices(self): + """ + Returns the current feed-in prices. + + This function returns the current feed-in prices fetched from the price source. + If the source is not supported, it returns an empty list. + + Returns: + list: A list of current feed-in prices. + """ + return self.current_feedin + + def __create_feedin_prices(self): + """ + Creates feed-in prices based on the current prices. + + This function generates feed-in prices based on the current prices and the + configured feed-in tariff price. If the negative price switch is enabled, + feed-in prices are set to 0 for negative prices. Otherwise, the feed-in tariff + price is used for all prices. + + Returns: + list: A list of feed-in prices. + """ + if self.negative_price_switch: + self.current_feedin = [ + 0 if price < 0 else round(self.feed_in_tariff_price / 1000, 9) + for price in self.current_prices_direct + ] + else: + self.current_feedin = [ + round(self.feed_in_tariff_price / 1000, 9) + for price in self.current_prices_direct + ] + return self.current_feedin + + def __retrieve_prices(self, tgt_duration, start_time=None): + """ + Retrieve prices based on the target duration and optional start time. + + This function fetches prices from different sources based on the configuration. + It supports fetching prices from 'tibber' and 'default' sources. + + Args: + tgt_duration (int): The target duration for which prices are to be fetched. + start_time (datetime, optional): The start time from which prices are to be fetched. + Defaults to None. + + Returns: + list: A list of prices for the specified duration and start time. Returns an empty list + if the price source is not supported. + """ + if self.src == "tibber": + return self.__retrieve_prices_from_tibber(tgt_duration, start_time) + if self.src == "default": + return self.__retrieve_prices_from_akkudoktor(tgt_duration, start_time) + logger.error("[PRICE-IF] Price source currently not supported.") + return [] + + def __retrieve_prices_from_akkudoktor(self, tgt_duration, start_time=None): + """ + Fetches and processes electricity prices for today and tomorrow. + + This function retrieves electricity prices for today and tomorrow from an API, + processes the prices, and returns a list of prices for the specified duration starting + from the specified start time. If tomorrow's prices are not available, today's prices are + repeated for tomorrow. + + Args: + tgt_duration (int): The target duration in hours for which the prices are needed. + start_time (datetime, optional): The start time for fetching prices. Defaults to None. + + Returns: + list: A list of electricity prices for the specified duration starting + from the specified start time. + """ + if self.src != "default": + logger.error( + "[PRICE-IF] Price source %s currently not supported.", + self.src, + ) + return [] + logger.debug("[PRICE-IF] Fetching prices from akkudoktor ...") + if start_time is None: + start_time = datetime.now(self.time_zone).replace( + minute=0, second=0, microsecond=0 + ) + current_hour = start_time.hour + request_url = ( + AKKUDOKTOR_API_PRICES + + "?start=" + + start_time.strftime("%Y-%m-%d") + + "&end=" + + (start_time + timedelta(days=1)).strftime("%Y-%m-%d") + ) + logger.debug("[PRICE-IF] Requesting prices from akkudoktor: %s", request_url) + try: + response = requests.get(request_url, timeout=10) + response.raise_for_status() + data = response.json() + except requests.exceptions.Timeout: + logger.error( + "[PRICE-IF] Request timed out while fetching prices from akkudoktor." + ) + return [] + except requests.exceptions.RequestException as e: + logger.error( + "[PRICE-IF] Request failed while fetching prices from akkudoktor: %s", e + ) + return [] + + prices = [] + for price in data["values"]: + prices.append(round(price["marketpriceEurocentPerKWh"] / 100000, 9)) + # logger.debug( + # "[Main] day 1 - price for %s -> %s", price["marketpriceEurocentPerKWh"], + # price["start"] + # ) + + if start_time is None: + start_time = datetime.now(self.time_zone).replace( + minute=0, second=0, microsecond=0 + ) + current_hour = start_time.hour + extended_prices = prices[current_hour : current_hour + tgt_duration] + + if len(extended_prices) < tgt_duration: + remaining_hours = tgt_duration - len(extended_prices) + extended_prices.extend(prices[:remaining_hours]) + logger.info("[PRICE-IF] Prices from AKKUDOKTOR fetched successfully.") + self.current_prices_direct = extended_prices.copy() + return extended_prices + + def __retrieve_prices_from_tibber(self, tgt_duration, start_time=None): + """ + Fetches and processes electricity prices for today and tomorrow. + + This function retrieves electricity prices for today and tomorrow from a web service, + processes the prices, and returns a list of prices for the specified duration starting + from the specified start time. If tomorrow's prices are not available, today's prices are + repeated for tomorrow. + + Args: + tgt_duration (int): The target duration in hours for which the prices are needed. + start_time (datetime, optional): The start time for fetching prices. Defaults to None. + + Returns: + list: A list of electricity prices for the specified duration starting + from the specified start time. + """ + logger.debug("[PRICE-IF] Prices fetching from TIBBER started") + if self.src != "tibber": + logger.error("[PRICE-IF] Price source currently not supported.") + return [] + headers = { + "Authorization": self.access_token, + "Content-Type": "application/json", + } + query = """ + { + viewer { + homes { + currentSubscription { + priceInfo { + today { + total + energy + startsAt + } + tomorrow { + total + energy + startsAt + } + } + } + } + } + } + """ + try: + response = requests.post( + TIBBER_API, headers=headers, json={"query": query}, timeout=10 + ) + response.raise_for_status() + except requests.exceptions.Timeout: + logger.error( + "[PRICE-IF] Request timed out while fetching prices from Tibber." + ) + return [] + except requests.exceptions.RequestException as e: + logger.error( + "[PRICE-IF] Request failed while fetching prices from Tibber: %s", e + ) + return [] + + response.raise_for_status() + data = response.json() + if "errors" in data and data["errors"] is not None: + logger.error( + "[PRICE-IF] Error fetching prices - tibber API response: %s", + data["errors"][0]["message"], + ) + return [] + + today_prices = json.dumps( + data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"][ + "today" + ] + ) + tomorrow_prices = json.dumps( + data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"][ + "tomorrow" + ] + ) + + today_prices_json = json.loads(today_prices) + tomorrow_prices_json = json.loads(tomorrow_prices) + prices = [] + prices_direct = [] + + for price in today_prices_json: + prices.append(round(price["total"] / 1000, 9)) + prices_direct.append(round(price["energy"] / 1000, 9)) + # logger.debug( + # "[Main] day 1 - price for %s -> %s", price["startsAt"], price["total"] + # ) + if tomorrow_prices_json: + for price in tomorrow_prices_json: + prices.append(round(price["total"] / 1000, 9)) + prices_direct.append(round(price["energy"] / 1000, 9)) + # logger.debug( + # "[Main] day 2 - price for %s -> %s", price["startsAt"], price["total"] + # ) + else: + prices.extend(prices[:24]) # Repeat today's prices for tomorrow + prices_direct.extend( + prices_direct[:24] + ) # Repeat today's prices for tomorrow + + if start_time is None: + start_time = datetime.now(self.time_zone).replace( + minute=0, second=0, microsecond=0 + ) + current_hour = start_time.hour + extended_prices = prices[current_hour : current_hour + tgt_duration] + extended_prices_direct = prices_direct[ + current_hour : current_hour + tgt_duration + ] + + if len(extended_prices) < tgt_duration: + remaining_hours = tgt_duration - len(extended_prices) + extended_prices.extend(prices[:remaining_hours]) + extended_prices_direct.extend(prices_direct[:remaining_hours]) + self.current_prices_direct = extended_prices_direct.copy() + logger.info("[PRICE-IF] Prices from TIBBER fetched successfully.") + return extended_prices diff --git a/src/web/index.html b/src/web/index.html index 907e97e6..04c6255c 100644 --- a/src/web/index.html +++ b/src/web/index.html @@ -9,6 +9,7 @@ + @@ -72,8 +73,8 @@ - ... - -- + Dynamic Max Charge Power + -- ... @@ -111,7 +112,7 @@
-
Current Power
+
eCar Charging
@@ -120,10 +121,10 @@ - + - - + + - - - - + + + + - - - - + + + + - - - - + + + +
currentCurrent SOC Price-.-- €--.- %
@@ -131,22 +132,22 @@
Grid--- WPrice-.-- €Planned Start--:--Planned End--:--
Inverter--- WPriceNeeded Amount--- kWhPrice-.-- €
Load--- WPrice-.-- €
@@ -438,6 +439,7 @@ document.getElementById('control_overall').innerText = data_controls["current_states"]["inverter_mode"]; document.getElementById('battery_soc').innerText = data_controls["battery_soc"] + " %"; + document.getElementById('current_max_charge_dyn').innerText = data_controls["battery_max_charge_power_dyn"].toFixed(0) + " W"; } function showBatteryCharge(data_request, data_response) { @@ -536,6 +538,10 @@ document.getElementById('waiting_text').innerText = "No data available..."; document.getElementById('waiting_error_text').innerText = "Error: " + data_response["error"]; } + else if(data_response["error"].includes("422 Client Error: Unprocessable Entity")){ + document.getElementById('waiting_text').innerText = "No data available..."; + document.getElementById('waiting_error_text').innerText = "Error: " + data_response["error"]; + } else { document.getElementById('waiting_text').innerText = "No data available..."; document.getElementById('waiting_error_text').innerText = "no further error information available"; @@ -570,6 +576,8 @@ await showCurrentData(); + await showCurrentData(); + var currentHour = new Date().getHours(); var next_charge_time = data_response["ac_charge"].slice(currentHour).findIndex((value) => value > 0);