diff --git a/carbonserver/carbonserver/api/infra/repositories/repository_runs.py b/carbonserver/carbonserver/api/infra/repositories/repository_runs.py index 4084aacca..b36976906 100644 --- a/carbonserver/carbonserver/api/infra/repositories/repository_runs.py +++ b/carbonserver/carbonserver/api/infra/repositories/repository_runs.py @@ -1,10 +1,11 @@ import uuid from contextlib import AbstractContextManager +from math import ceil from typing import List, Union from dependency_injector.providers import Callable from fastapi import HTTPException -from sqlalchemy import and_, func +from sqlalchemy import and_, func, literal from carbonserver.api.domain.runs import Runs from carbonserver.api.errors import EmptyResultException @@ -12,7 +13,7 @@ from carbonserver.api.infra.database.sql_models import Experiment as SqlModelExperiment from carbonserver.api.infra.database.sql_models import Project as SqlModelProject from carbonserver.api.infra.database.sql_models import Run as SqlModelRun -from carbonserver.api.schemas import Run, RunCreate, RunReport +from carbonserver.api.schemas import Run, RunBucketReport, RunCreate, RunReport from carbonserver.logger import logger """ @@ -174,6 +175,47 @@ def get_experiment_detailed_sums_by_run( ) return res or [] + def get_experiment_bucketed_sums_by_run( + self, experiment_id, start_date, end_date, max_points + ) -> List[RunBucketReport]: + """Find the runs of an experiment in database between two dates and return + a report containing the sum of their emissions bucketed by time""" + + # Calculate the bucket size in seconds based on the time range and max_points + bucket_seconds = max( + 1, ceil((end_date - start_date).total_seconds() / max_points) + ) + epoch = func.extract("epoch", SqlModelRun.timestamp) + # Create a timestamp for the start of each bucket by flooring the epoch time to the nearest bucket size + bucket_timestamp = func.to_timestamp( + func.floor(epoch / bucket_seconds) * bucket_seconds + ).label("timestamp") + + with self.session_factory() as session: + res = ( + session.query( + bucket_timestamp, + literal(bucket_seconds).label("bucket_seconds"), + func.count(func.distinct(SqlModelRun.id)).label("run_count"), + func.sum(SqlModelEmission.emissions_sum).label("emissions"), + func.sum(SqlModelEmission.energy_consumed).label("energy_consumed"), + func.sum(SqlModelEmission.duration).label("duration"), + ) + .join( + SqlModelEmission, + and_( + SqlModelRun.id == SqlModelEmission.run_id, + SqlModelEmission.timestamp >= start_date, + SqlModelEmission.timestamp <= end_date, + ), + ) + .filter(SqlModelRun.experiment_id == experiment_id) + .group_by(bucket_timestamp) + .order_by(bucket_timestamp) + .all() + ) + return res or [] + def get_project_last_run(self, project_id, start_date, end_date) -> Union[Run]: """Find the last run of a project in database between two dates and return it diff --git a/carbonserver/carbonserver/api/routers/runs.py b/carbonserver/carbonserver/api/routers/runs.py index 6f2ec21ef..1d3a7b783 100644 --- a/carbonserver/carbonserver/api/routers/runs.py +++ b/carbonserver/carbonserver/api/routers/runs.py @@ -3,11 +3,18 @@ import dateutil.relativedelta from dependency_injector.wiring import Provide, inject -from fastapi import APIRouter, Depends, Header +from fastapi import APIRouter, Depends, Header, Query from starlette import status from carbonserver.api.errors import EmptyResultException -from carbonserver.api.schemas import AccessLevel, Empty, Run, RunCreate, RunReport +from carbonserver.api.schemas import ( + AccessLevel, + Empty, + Run, + RunBucketReport, + RunCreate, + RunReport, +) from carbonserver.api.services.project_token_service import ProjectTokenService from carbonserver.api.services.run_service import RunService from carbonserver.api.usecases.run.experiment_sum_by_run import ( @@ -96,10 +103,12 @@ def read_experiment_detailed_sums_by_run( experiment_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, + # Optional so existing callers still receive one row per run. + max_points: Optional[int] = Query(None, ge=1, le=2000), experiment_global_sum_by_run_usecase: ExperimentSumsByRunUsecase = Depends( Provide[ServerContainer.experiment_sums_by_run_usecase] ), -) -> List[RunReport]: +) -> List[Union[RunReport, RunBucketReport]]: start_date = ( start_date if start_date @@ -107,7 +116,7 @@ def read_experiment_detailed_sums_by_run( ) end_date = end_date if end_date else datetime.now() + timedelta(days=1) return experiment_global_sum_by_run_usecase.compute_detailed_sum( - experiment_id, start_date, end_date + experiment_id, start_date, end_date, max_points ) diff --git a/carbonserver/carbonserver/api/schemas.py b/carbonserver/carbonserver/api/schemas.py index a3343929d..463dc33da 100644 --- a/carbonserver/carbonserver/api/schemas.py +++ b/carbonserver/carbonserver/api/schemas.py @@ -198,6 +198,16 @@ class RunReport(RunBase): ram_utilization_percent: Optional[float] = None +class RunBucketReport(BaseModel): + run_id: Optional[UUID] = None + timestamp: datetime + bucket_seconds: Optional[int] = None + run_count: int + emissions: float + energy_consumed: float + duration: float + + class ExperimentBase(BaseModel): model_config = ConfigDict( from_attributes=True, diff --git a/carbonserver/carbonserver/api/usecases/run/experiment_sum_by_run.py b/carbonserver/carbonserver/api/usecases/run/experiment_sum_by_run.py index ee8f6add4..fd599d01a 100644 --- a/carbonserver/carbonserver/api/usecases/run/experiment_sum_by_run.py +++ b/carbonserver/carbonserver/api/usecases/run/experiment_sum_by_run.py @@ -1,7 +1,7 @@ -from typing import List +from typing import List, Optional, Union from carbonserver.api.infra.repositories.repository_runs import SqlAlchemyRepository -from carbonserver.api.schemas import RunReport +from carbonserver.api.schemas import RunBucketReport, RunReport class ExperimentSumsByRunUsecase: @@ -9,8 +9,16 @@ def __init__(self, run_repository: SqlAlchemyRepository) -> None: self._run_repository = run_repository def compute_detailed_sum( - self, experiment_id: str, start_date, end_date - ) -> List[RunReport]: + self, experiment_id: str, start_date, end_date, max_points: Optional[int] = None + ) -> List[Union[RunReport, RunBucketReport]]: + if max_points: + return self._run_repository.get_experiment_bucketed_sums_by_run( + experiment_id, + start_date, + end_date, + max_points, + ) + sums = self._run_repository.get_experiment_detailed_sums_by_run( experiment_id, start_date, diff --git a/carbonserver/tests/api/usecase/run/test_experiment_sum_by_run_usecase.py b/carbonserver/tests/api/usecase/run/test_experiment_sum_by_run_usecase.py index c31b27f56..c995c6f4e 100644 --- a/carbonserver/tests/api/usecase/run/test_experiment_sum_by_run_usecase.py +++ b/carbonserver/tests/api/usecase/run/test_experiment_sum_by_run_usecase.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta from unittest import mock import dateutil.relativedelta @@ -59,6 +59,40 @@ def test_detailed_sum_computes_for_experiment_id(): assert actual_experiment_sum_by_run[0]["emissions"] == expected_emission_sum +def test_detailed_sum_with_max_points_buckets_result(): + repository_mock: SqlAlchemyRepository = mock.Mock(spec=SqlAlchemyRepository) + repository_mock.get_experiment_bucketed_sums_by_run.return_value = [ + {"timestamp": START_DATE, "run_count": 1000} + ] + experiment_sum_by_run_usecase = ExperimentSumsByRunUsecase(repository_mock) + + actual = experiment_sum_by_run_usecase.compute_detailed_sum( + EXPERIMENT_ID, START_DATE, END_DATE, max_points=300 + ) + + assert actual == [{"timestamp": START_DATE, "run_count": 1000}] + repository_mock.get_experiment_detailed_sums_by_run.assert_not_called() + + +def test_detailed_sum_uses_max_points_as_bucket_measure(): + repository_mock: SqlAlchemyRepository = mock.Mock(spec=SqlAlchemyRepository) + experiment_sum_by_run_usecase = ExperimentSumsByRunUsecase(repository_mock) + + experiment_sum_by_run_usecase.compute_detailed_sum( + EXPERIMENT_ID, + datetime(2026, 1, 1), + datetime(2026, 1, 1) + timedelta(seconds=1000), + max_points=300, + ) + + repository_mock.get_experiment_bucketed_sums_by_run.assert_called_once_with( + EXPERIMENT_ID, + datetime(2026, 1, 1), + datetime(2026, 1, 1) + timedelta(seconds=1000), + 300, + ) + + def test_detailed_sum_query_excludes_runs_without_emissions_in_date_range(): # Use the real repository with a mocked SQLAlchemy session context so this test # covers the join built by get_experiment_detailed_sums_by_run. Mocking the