From 6291d51a2d6a4cbbc3c8805d9291e568c2fd42fc Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Wed, 19 Mar 2025 12:38:01 -0400 Subject: [PATCH] Notify user when the lease is about to expire --- .../jumpstarter_cli_admin/get_test.py | 2 + .../jumpstarter_cli_client/client_shell.py | 3 +- .../jumpstarter/jumpstarter/client/grpc.py | 10 ++++- .../jumpstarter/jumpstarter/client/lease.py | 43 ++++++++++++++++--- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py b/packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py index 51c6c9517..69ebbfc24 100644 --- a/packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py +++ b/packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py @@ -764,6 +764,7 @@ async def test_get_exporters(_load_kube_config_mock, list_exporters_mock: AsyncM EXPORTERS_DEVICES_LIST_NAME = EXPORTERS_LIST_NAME + @pytest.mark.anyio @patch.object(ExportersV1Alpha1Api, "list_exporters") @patch.object(ExportersV1Alpha1Api, "_load_kube_config") @@ -1141,6 +1142,7 @@ async def test_get_lease(_load_kube_config_mock, get_lease_mock: AsyncMock): lease.jumpstarter.dev/82a8ac0d-d7ff-4009-8948-18a3c5c607b2 """ + @pytest.mark.anyio @patch.object(LeasesV1Alpha1Api, "list_leases") @patch.object(LeasesV1Alpha1Api, "_load_kube_config") diff --git a/packages/jumpstarter-cli-client/jumpstarter_cli_client/client_shell.py b/packages/jumpstarter-cli-client/jumpstarter_cli_client/client_shell.py index 220df6c99..1ef884920 100644 --- a/packages/jumpstarter-cli-client/jumpstarter_cli_client/client_shell.py +++ b/packages/jumpstarter-cli-client/jumpstarter_cli_client/client_shell.py @@ -31,6 +31,7 @@ def client_shell(name: str, labels, lease_name): exit_code = 0 with config.lease(metadata_filter=MetadataFilter(labels=dict(labels)), lease_name=lease_name) as lease: with lease.serve_unix() as path: - exit_code = launch_shell(path, "remote", config.drivers.allow, config.drivers.unsafe) + with lease.monitor(): + exit_code = launch_shell(path, "remote", config.drivers.allow, config.drivers.unsafe) sys.exit(exit_code) diff --git a/packages/jumpstarter/jumpstarter/client/grpc.py b/packages/jumpstarter/jumpstarter/client/grpc.py index d91a3f200..747b0a662 100644 --- a/packages/jumpstarter/jumpstarter/client/grpc.py +++ b/packages/jumpstarter/jumpstarter/client/grpc.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from datetime import timedelta +from datetime import datetime, timedelta import yaml from google.protobuf import duration_pb2, field_mask_pb2, json_format @@ -52,6 +52,7 @@ class Lease(BaseModel): client: str exporter: str conditions: list[kubernetes_pb2.Condition] + effective_begin_time: datetime | None = None model_config = ConfigDict( arbitrary_types_allowed=True, @@ -72,6 +73,12 @@ def from_protobuf(cls, data: client_pb2.Lease) -> Lease: else: exporter = "" + effective_begin_time = None + if data.effective_begin_time: + effective_begin_time = data.effective_begin_time.ToDatetime( + tzinfo=datetime.now().astimezone().tzinfo, + ) + return cls( namespace=namespace, name=name, @@ -79,6 +86,7 @@ def from_protobuf(cls, data: client_pb2.Lease) -> Lease: duration=data.duration.ToTimedelta(), client=client, exporter=exporter, + effective_begin_time=effective_begin_time, conditions=data.conditions, ) diff --git a/packages/jumpstarter/jumpstarter/client/lease.py b/packages/jumpstarter/jumpstarter/client/lease.py index 6aed62797..b65b3e458 100644 --- a/packages/jumpstarter/jumpstarter/client/lease.py +++ b/packages/jumpstarter/jumpstarter/client/lease.py @@ -7,9 +7,9 @@ contextmanager, ) from dataclasses import dataclass, field -from datetime import timedelta +from datetime import datetime, timedelta -from anyio import fail_after, sleep +from anyio import create_task_group, fail_after, sleep from anyio.from_thread import BlockingPortal from grpc.aio import Channel from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc @@ -62,6 +62,11 @@ async def _create(self): ).name logger.info("Created lease request for selector %s for duration %s", selector, duration) + async def get(self): + with translate_grpc_exceptions(): + svc = ClientService(channel=self.channel, namespace=self.namespace) + return await svc.GetLease(name=self.name) + def request(self): """Request a lease, or verifies a lease which was already created. @@ -96,11 +101,7 @@ async def _acquire(self): with fail_after(300): # TODO: configurable timeout while True: logger.debug("Polling Lease %s", self.name) - with translate_grpc_exceptions(): - result = await self.svc.GetLease( - name=self.name, - ) - + result = await self.get() # lease ready if condition_true(result.conditions, "Ready"): logger.debug("Lease %s acquired", self.name) @@ -156,6 +157,29 @@ async def serve_unix_async(self): async with TemporaryUnixListener(self.handle_async) as path: yield path + @asynccontextmanager + async def monitor_async(self, threshold: timedelta = timedelta(minutes=5)): + async def _monitor(): + while True: + lease = await self.get() + if lease.effective_begin_time: + end_time = lease.effective_begin_time + lease.duration + remain = end_time - datetime.now(tz=datetime.now().astimezone().tzinfo) + if remain < threshold: + logger.info("Lease {} ending soon in {} at {}".format(self.name, remain, end_time)) + await sleep(threshold.total_seconds()) + else: + await sleep(5) + else: + await sleep(1) + + async with create_task_group() as tg: + tg.start_soon(_monitor) + try: + yield + finally: + tg.cancel_scope.cancel() + @asynccontextmanager async def connect_async(self, stack): async with self.serve_unix_async() as path: @@ -172,3 +196,8 @@ def connect(self): def serve_unix(self): with self.portal.wrap_async_context_manager(self.serve_unix_async()) as path: yield path + + @contextmanager + def monitor(self, threshold: timedelta = timedelta(minutes=5)): + with self.portal.wrap_async_context_manager(self.monitor_async(threshold)): + yield