Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ huggingface = ["huggingface-hub>=0.20.0"]
otel = ["opentelemetry-api>=1.0.0"]
redis = ["redis>=4.0.0"]
autogen = ["pyautogen>=0.2.0,<0.3.0"]
all = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0", "litellm>=1.0.0", "google-genai>=1.0.0", "huggingface-hub>=0.20.0", "opentelemetry-api>=1.0.0", "redis>=4.0.0", "pyautogen>=0.2.0,<0.3.0"]
k8s = ["kubernetes>=28.0"]
all = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0", "litellm>=1.0.0", "google-genai>=1.0.0", "huggingface-hub>=0.20.0", "opentelemetry-api>=1.0.0", "redis>=4.0.0", "pyautogen>=0.2.0,<0.3.0", "kubernetes>=28.0"]
all-models = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0", "tokencost>=0.1.0"]
cli = ["click>=8.0.0"]
dev = [
Expand Down Expand Up @@ -215,6 +216,10 @@ ignore_missing_imports = true
module = ["agents", "agents.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["kubernetes", "kubernetes.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["_pytest", "_pytest.*"]
follow_imports = "skip"
Expand Down
2 changes: 2 additions & 0 deletions shekel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AgentLoopError,
BudgetConfigMismatchError,
BudgetExceededError,
BudgetPausedError,
ChainBudgetExceededError,
NodeBudgetExceededError,
SessionBudgetExceededError,
Expand All @@ -25,6 +26,7 @@
"TemporalBudget",
"with_budget",
"BudgetExceededError",
"BudgetPausedError",
"BudgetConfigMismatchError",
"ToolBudgetExceededError",
"NodeBudgetExceededError",
Expand Down
128 changes: 128 additions & 0 deletions shekel/_budget.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,36 @@ def __init__(
self._chain_budgets: dict[str, ComponentBudget] = {}
self._runtime: Any = None

# --- Kubernetes auto-discovery (SHEK-16) / spend reporting (SHEK-17) ---
self._paused_externally: bool = False
self._k8s_poller: Any = None
self._k8s_reporter: Any = None
self._per_pod_cap_usd: float | None = None
self._k8s_redis_backend: Any = None
self._k8s_redis_name: str | None = None
self._k8s_redis_window_seconds: float = 86400.0
self._k8s_flush_every_usd: float | None = None
self._k8s_flush_every_seconds: float | None = None
self._k8s_scope_mode: str | None = None
self._k8s_scope_group_by: str | None = None
self._k8s_group_value: str = ""
self._k8s_budget_name: str | None = None
self._k8s_namespace: str | None = None
self._k8s_poll_interval: float = 10.0
try:
from shekel.integrations.kubernetes import apply_k8s_config # noqa: PLC0415

apply_k8s_config(self)
except ImportError:
pass # kubernetes optional dependency not installed
except Exception:
import logging # noqa: PLC0415

logging.getLogger(__name__).warning(
"shekel[k8s]: Failed to apply Kubernetes config; K8s features disabled.",
exc_info=True,
)

# ------------------------------------------------------------------
# Internal state reset
# ------------------------------------------------------------------
Expand Down Expand Up @@ -416,6 +446,7 @@ def __enter__(self) -> Budget:

self._runtime = ShekelRuntime(self)
self._runtime.probe()
self._restart_k8s_threads()
return self

def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
Expand Down Expand Up @@ -470,6 +501,10 @@ def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
if self._runtime is not None:
self._runtime.release()
self._runtime = None
if self._k8s_reporter is not None:
self._k8s_reporter.flush_and_stop()
if self._k8s_poller is not None:
self._k8s_poller.stop()
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
_patch.remove_patches()
# returning None (not False) — never suppress exceptions

Expand Down Expand Up @@ -557,6 +592,7 @@ async def __aenter__(self) -> Budget:

self._runtime = ShekelRuntime(self)
self._runtime.probe()
self._restart_k8s_threads()
return self

async def __aexit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
Expand Down Expand Up @@ -610,6 +646,10 @@ async def __aexit__(self, exc_type: object, exc_val: object, exc_tb: object) ->
if self._runtime is not None:
self._runtime.release()
self._runtime = None
if self._k8s_reporter is not None:
self._k8s_reporter.flush_and_stop()
if self._k8s_poller is not None:
self._k8s_poller.stop()
_patch.remove_patches()

# ------------------------------------------------------------------
Expand All @@ -634,6 +674,16 @@ def reset(self) -> None:
# ------------------------------------------------------------------

def _record_spend(self, cost: float, model: str, tokens: dict[str, int]) -> None:
if self._paused_externally:
from shekel.exceptions import BudgetPausedError # noqa: PLC0415

raise BudgetPausedError(
spent=self._spent,
limit=self.max_usd or 0.0,
model=model,
tokens=tokens,
)
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

# Parent locking: cannot record spend while a child budget is active
if self.active_child is not None:
raise RuntimeError(
Expand All @@ -659,11 +709,15 @@ def _record_spend(self, cost: float, model: str, tokens: dict[str, int]) -> None
)
self._calls_made += 1
self._append_velocity_entry(cost)
if self._k8s_reporter is not None:
self._k8s_reporter.on_spend(cost)
self._check_velocity_warn()
self._check_velocity_limit()
self._check_warn()
self._check_limit()
self._check_call_limit()
self._check_per_pod_limit()
self._check_redis_limit(cost)

def _check_warn(self) -> None:
effective_limit = self._effective_limit
Expand Down Expand Up @@ -772,7 +826,81 @@ def _check_call_limit(self) -> None:
self._last_tokens,
)

def _check_per_pod_limit(self) -> None:
"""Enforce the per-pod USD cap set via ConfigMap per_pod_cap (SHEK-26)."""
if self._per_pod_cap_usd is None:
return
if self._spent > self._per_pod_cap_usd:
self._emit_budget_exceeded_event()
if self.warn_only:
self._check_warn()
return
from shekel.exceptions import BudgetExceededError # noqa: PLC0415

raise BudgetExceededError(
self._spent, self._per_pod_cap_usd, self._last_model, self._last_tokens
)
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

def _check_redis_limit(self, cost: float) -> None:
"""Distributed enforcement via Redis backend (SHEK-30)."""
if self._k8s_redis_backend is None or self._k8s_redis_name is None:
return
allowed, exceeded = self._k8s_redis_backend.check_and_add(
self._k8s_redis_name,
{"usd": cost},
{"usd": self.max_usd},
{"usd": self._k8s_redis_window_seconds},
)
if not allowed:
self._emit_budget_exceeded_event()
if self.warn_only:
self._check_warn()
return
from shekel.exceptions import BudgetExceededError # noqa: PLC0415

raise BudgetExceededError(
self._spent,
self.max_usd or 0.0,
self._last_model,
self._last_tokens,
exceeded_counter=exceeded,
)

# ------------------------------------------------------------------
def _restart_k8s_threads(self) -> None:
"""Restart stopped K8s poller/reporter on Budget re-entry (SHEK-27).

Called by __enter__ / __aenter__. No-op when not in K8s mode or when
threads are still alive (idempotent on nested/repeated enters).
"""
if self._k8s_budget_name is None:
return

if self._k8s_poller is not None and not self._k8s_poller.is_alive():
from shekel.integrations.kubernetes import KubernetesPoller # noqa: PLC0415

poller = KubernetesPoller(
self,
self._k8s_budget_name,
self._k8s_namespace or "",
self._k8s_poll_interval,
)
poller.start()
self._k8s_poller = poller

if self._k8s_reporter is not None and not self._k8s_reporter.is_alive():
from shekel.integrations.kubernetes import KubernetesSpendReporter # noqa: PLC0415

reporter = KubernetesSpendReporter(
budget_name=self._k8s_budget_name,
namespace=self._k8s_namespace or "",
flush_every_seconds=self._k8s_flush_every_seconds or 60.0,
flush_every_usd=self._k8s_flush_every_usd,
group_value=self._k8s_group_value,
)
reporter.start()
self._k8s_reporter = reporter

# Loop guard enforcement (v1.1.0)
# ------------------------------------------------------------------

Expand Down
14 changes: 14 additions & 0 deletions shekel/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ def __str__(self) -> str:
)


class BudgetPausedError(BudgetExceededError):
"""Raised when an operator has paused the budget via Kubernetes ConfigMap kill-switch.

Subclasses BudgetExceededError so existing except-clauses catch it without changes.
"""

def __str__(self) -> str:
return (
f"Budget paused by operator (${self.spent:.4f} spent)\n"
f" Last call: {self.model}\n"
f" Tip: Set paused=false in the shekel-budget ConfigMap to resume."
)


class NodeBudgetExceededError(BudgetExceededError):
"""Raised when a LangGraph node exceeds its budget cap.

Expand Down
Loading
Loading