Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import uuid
from contextlib import AbstractContextManager
from math import ceil
from typing import List, Union

from dependency_injector.providers import Callable
from fastapi import HTTPException
from sqlalchemy import and_, func
from sqlalchemy import and_, func, literal

from carbonserver.api.domain.runs import Runs
from carbonserver.api.errors import EmptyResultException
from carbonserver.api.infra.database.sql_models import Emission as SqlModelEmission
from carbonserver.api.infra.database.sql_models import Experiment as SqlModelExperiment
from carbonserver.api.infra.database.sql_models import Project as SqlModelProject
from carbonserver.api.infra.database.sql_models import Run as SqlModelRun
from carbonserver.api.schemas import Run, RunCreate, RunReport
from carbonserver.api.schemas import Run, RunBucketReport, RunCreate, RunReport
from carbonserver.logger import logger

"""
Expand Down Expand Up @@ -174,6 +175,47 @@ def get_experiment_detailed_sums_by_run(
)
return res or []

def get_experiment_bucketed_sums_by_run(
self, experiment_id, start_date, end_date, max_points
) -> List[RunBucketReport]:
"""Find the runs of an experiment in database between two dates and return
a report containing the sum of their emissions bucketed by time"""

# Calculate the bucket size in seconds based on the time range and max_points
bucket_seconds = max(
1, ceil((end_date - start_date).total_seconds() / max_points)
)
epoch = func.extract("epoch", SqlModelRun.timestamp)
# Create a timestamp for the start of each bucket by flooring the epoch time to the nearest bucket size
bucket_timestamp = func.to_timestamp(
func.floor(epoch / bucket_seconds) * bucket_seconds
).label("timestamp")

with self.session_factory() as session:
res = (
session.query(
bucket_timestamp,
literal(bucket_seconds).label("bucket_seconds"),
func.count(func.distinct(SqlModelRun.id)).label("run_count"),
func.sum(SqlModelEmission.emissions_sum).label("emissions"),
func.sum(SqlModelEmission.energy_consumed).label("energy_consumed"),
func.sum(SqlModelEmission.duration).label("duration"),
)
.join(
SqlModelEmission,
and_(
SqlModelRun.id == SqlModelEmission.run_id,
SqlModelEmission.timestamp >= start_date,
SqlModelEmission.timestamp <= end_date,
),
)
.filter(SqlModelRun.experiment_id == experiment_id)
.group_by(bucket_timestamp)
.order_by(bucket_timestamp)
.all()
)
return res or []

def get_project_last_run(self, project_id, start_date, end_date) -> Union[Run]:
"""Find the last run of a project in database between two dates and return it

Expand Down
17 changes: 13 additions & 4 deletions carbonserver/carbonserver/api/routers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@

import dateutil.relativedelta
from dependency_injector.wiring import Provide, inject
from fastapi import APIRouter, Depends, Header
from fastapi import APIRouter, Depends, Header, Query
from starlette import status

from carbonserver.api.errors import EmptyResultException
from carbonserver.api.schemas import AccessLevel, Empty, Run, RunCreate, RunReport
from carbonserver.api.schemas import (
AccessLevel,
Empty,
Run,
RunBucketReport,
RunCreate,
RunReport,
)
from carbonserver.api.services.project_token_service import ProjectTokenService
from carbonserver.api.services.run_service import RunService
from carbonserver.api.usecases.run.experiment_sum_by_run import (
Expand Down Expand Up @@ -96,18 +103,20 @@ def read_experiment_detailed_sums_by_run(
experiment_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
# Optional so existing callers still receive one row per run.
max_points: Optional[int] = Query(None, ge=1, le=2000),
experiment_global_sum_by_run_usecase: ExperimentSumsByRunUsecase = Depends(
Provide[ServerContainer.experiment_sums_by_run_usecase]
),
) -> List[RunReport]:
) -> List[Union[RunReport, RunBucketReport]]:
start_date = (
start_date
if start_date
else datetime.now() - dateutil.relativedelta.relativedelta(months=3)
)
end_date = end_date if end_date else datetime.now() + timedelta(days=1)
return experiment_global_sum_by_run_usecase.compute_detailed_sum(
experiment_id, start_date, end_date
experiment_id, start_date, end_date, max_points
)


Expand Down
10 changes: 10 additions & 0 deletions carbonserver/carbonserver/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ class RunReport(RunBase):
ram_utilization_percent: Optional[float] = None


class RunBucketReport(BaseModel):
run_id: Optional[UUID] = None
timestamp: datetime
bucket_seconds: Optional[int] = None
run_count: int
emissions: float
energy_consumed: float
duration: float


class ExperimentBase(BaseModel):
model_config = ConfigDict(
from_attributes=True,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from typing import List
from typing import List, Optional, Union

from carbonserver.api.infra.repositories.repository_runs import SqlAlchemyRepository
from carbonserver.api.schemas import RunReport
from carbonserver.api.schemas import RunBucketReport, RunReport


class ExperimentSumsByRunUsecase:
def __init__(self, run_repository: SqlAlchemyRepository) -> None:
self._run_repository = run_repository

def compute_detailed_sum(
self, experiment_id: str, start_date, end_date
) -> List[RunReport]:
self, experiment_id: str, start_date, end_date, max_points: Optional[int] = None
) -> List[Union[RunReport, RunBucketReport]]:
if max_points:
return self._run_repository.get_experiment_bucketed_sums_by_run(
experiment_id,
start_date,
end_date,
max_points,
)

sums = self._run_repository.get_experiment_detailed_sums_by_run(
experiment_id,
start_date,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from unittest import mock

import dateutil.relativedelta
Expand Down Expand Up @@ -59,6 +59,40 @@ def test_detailed_sum_computes_for_experiment_id():
assert actual_experiment_sum_by_run[0]["emissions"] == expected_emission_sum


def test_detailed_sum_with_max_points_buckets_result():
repository_mock: SqlAlchemyRepository = mock.Mock(spec=SqlAlchemyRepository)
repository_mock.get_experiment_bucketed_sums_by_run.return_value = [
{"timestamp": START_DATE, "run_count": 1000}
]
experiment_sum_by_run_usecase = ExperimentSumsByRunUsecase(repository_mock)

actual = experiment_sum_by_run_usecase.compute_detailed_sum(
EXPERIMENT_ID, START_DATE, END_DATE, max_points=300
)

assert actual == [{"timestamp": START_DATE, "run_count": 1000}]
repository_mock.get_experiment_detailed_sums_by_run.assert_not_called()


def test_detailed_sum_uses_max_points_as_bucket_measure():
repository_mock: SqlAlchemyRepository = mock.Mock(spec=SqlAlchemyRepository)
experiment_sum_by_run_usecase = ExperimentSumsByRunUsecase(repository_mock)

experiment_sum_by_run_usecase.compute_detailed_sum(
EXPERIMENT_ID,
datetime(2026, 1, 1),
datetime(2026, 1, 1) + timedelta(seconds=1000),
max_points=300,
)

repository_mock.get_experiment_bucketed_sums_by_run.assert_called_once_with(
EXPERIMENT_ID,
datetime(2026, 1, 1),
datetime(2026, 1, 1) + timedelta(seconds=1000),
300,
)


def test_detailed_sum_query_excludes_runs_without_emissions_in_date_range():
# Use the real repository with a mocked SQLAlchemy session context so this test
# covers the join built by get_experiment_detailed_sums_by_run. Mocking the
Expand Down
Loading