Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ async def test_get_exporters(_load_kube_config_mock, list_exporters_mock: AsyncM

EXPORTERS_DEVICES_LIST_NAME = EXPORTERS_LIST_NAME


@pytest.mark.anyio
@patch.object(ExportersV1Alpha1Api, "list_exporters")
@patch.object(ExportersV1Alpha1Api, "_load_kube_config")
Expand Down Expand Up @@ -1141,6 +1142,7 @@ async def test_get_lease(_load_kube_config_mock, get_lease_mock: AsyncMock):
lease.jumpstarter.dev/82a8ac0d-d7ff-4009-8948-18a3c5c607b2
"""


@pytest.mark.anyio
@patch.object(LeasesV1Alpha1Api, "list_leases")
@patch.object(LeasesV1Alpha1Api, "_load_kube_config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def client_shell(name: str, labels, lease_name):
exit_code = 0
with config.lease(metadata_filter=MetadataFilter(labels=dict(labels)), lease_name=lease_name) as lease:
with lease.serve_unix() as path:
exit_code = launch_shell(path, "remote", config.drivers.allow, config.drivers.unsafe)
with lease.monitor():
exit_code = launch_shell(path, "remote", config.drivers.allow, config.drivers.unsafe)

sys.exit(exit_code)
10 changes: 9 additions & 1 deletion packages/jumpstarter/jumpstarter/client/grpc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import timedelta
from datetime import datetime, timedelta

import yaml
from google.protobuf import duration_pb2, field_mask_pb2, json_format
Expand Down Expand Up @@ -52,6 +52,7 @@ class Lease(BaseModel):
client: str
exporter: str
conditions: list[kubernetes_pb2.Condition]
effective_begin_time: datetime | None = None

model_config = ConfigDict(
arbitrary_types_allowed=True,
Expand All @@ -72,13 +73,20 @@ def from_protobuf(cls, data: client_pb2.Lease) -> Lease:
else:
exporter = ""

effective_begin_time = None
if data.effective_begin_time:
effective_begin_time = data.effective_begin_time.ToDatetime(
tzinfo=datetime.now().astimezone().tzinfo,
)

return cls(
namespace=namespace,
name=name,
selector=data.selector,
duration=data.duration.ToTimedelta(),
client=client,
exporter=exporter,
effective_begin_time=effective_begin_time,
conditions=data.conditions,
)

Expand Down
43 changes: 36 additions & 7 deletions packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
contextmanager,
)
from dataclasses import dataclass, field
from datetime import timedelta
from datetime import datetime, timedelta

from anyio import fail_after, sleep
from anyio import create_task_group, fail_after, sleep
from anyio.from_thread import BlockingPortal
from grpc.aio import Channel
from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc
Expand Down Expand Up @@ -62,6 +62,11 @@ async def _create(self):
).name
logger.info("Created lease request for selector %s for duration %s", selector, duration)

async def get(self):
with translate_grpc_exceptions():
svc = ClientService(channel=self.channel, namespace=self.namespace)
return await svc.GetLease(name=self.name)

def request(self):
"""Request a lease, or verifies a lease which was already created.

Expand Down Expand Up @@ -96,11 +101,7 @@ async def _acquire(self):
with fail_after(300): # TODO: configurable timeout
while True:
logger.debug("Polling Lease %s", self.name)
with translate_grpc_exceptions():
result = await self.svc.GetLease(
name=self.name,
)

result = await self.get()
# lease ready
if condition_true(result.conditions, "Ready"):
logger.debug("Lease %s acquired", self.name)
Expand Down Expand Up @@ -156,6 +157,29 @@ async def serve_unix_async(self):
async with TemporaryUnixListener(self.handle_async) as path:
yield path

@asynccontextmanager
async def monitor_async(self, threshold: timedelta = timedelta(minutes=5)):
async def _monitor():
while True:
lease = await self.get()
if lease.effective_begin_time:
end_time = lease.effective_begin_time + lease.duration
remain = end_time - datetime.now(tz=datetime.now().astimezone().tzinfo)
if remain < threshold:
logger.info("Lease {} ending soon in {} at {}".format(self.name, remain, end_time))
await sleep(threshold.total_seconds())
else:
await sleep(5)
else:
await sleep(1)

async with create_task_group() as tg:
tg.start_soon(_monitor)
try:
yield
finally:
tg.cancel_scope.cancel()

@asynccontextmanager
async def connect_async(self, stack):
async with self.serve_unix_async() as path:
Expand All @@ -172,3 +196,8 @@ def connect(self):
def serve_unix(self):
with self.portal.wrap_async_context_manager(self.serve_unix_async()) as path:
yield path

@contextmanager
def monitor(self, threshold: timedelta = timedelta(minutes=5)):
with self.portal.wrap_async_context_manager(self.monitor_async(threshold)):
yield