-
Notifications
You must be signed in to change notification settings - Fork 18
close the session on lease expire #757
Changes from all commits
fea8bea
5fec1e0
63368c3
a8d0e9a
85bcca1
17de08c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| import logging | ||
| import os | ||
| import sys | ||
| from collections.abc import AsyncGenerator, Generator | ||
| from collections.abc import AsyncGenerator, Callable, Generator | ||
| from contextlib import ( | ||
| ExitStack, | ||
| asynccontextmanager, | ||
|
|
@@ -54,6 +54,9 @@ class Lease(ContextManagerMixin, AsyncContextManagerMixin): | |
| grpc_options: dict[str, Any] = field(default_factory=dict) | ||
| acquisition_timeout: int = field(default=7200) # Timeout in seconds for lease acquisition, polled in 5s intervals | ||
| exporter_name: str = field(default="remote", init=False) # Populated during acquisition | ||
| lease_ending_callback: Callable[[Self, timedelta], None] | None = field( | ||
| default=None, init=False | ||
| ) # Called when lease is ending | ||
|
|
||
| def __post_init__(self): | ||
| if hasattr(super(), "__post_init__"): | ||
|
|
@@ -208,11 +211,14 @@ async def __asynccontextmanager__(self) -> AsyncGenerator[Self]: | |
| yield value | ||
| finally: | ||
| if self.release and self.name: | ||
| logger.info("Releasing Lease %s", self.name) | ||
| # Shield cleanup from cancellation to ensure it completes | ||
| with CancelScope(shield=True): | ||
| try: | ||
| with fail_after(30): | ||
| # skip the message if the lease is already expired | ||
| lease = await self.get() | ||
| if not lease.effective_end_time: | ||
| logger.info("Releasing Lease %s", self.name) | ||
| await self.svc.DeleteLease( | ||
| name=self.name, | ||
| ) | ||
|
|
@@ -280,6 +286,8 @@ async def _monitor(): | |
| if remain < timedelta(0): | ||
| # lease already expired, stopping monitor | ||
| logger.info("Lease {} ended at {}".format(self.name, end_time)) | ||
| if self.lease_ending_callback is not None: | ||
| self.lease_ending_callback(self, timedelta(0)) | ||
| break | ||
| # Log once when entering the threshold window | ||
| if threshold - timedelta(seconds=check_interval) <= remain < threshold: | ||
|
|
@@ -288,6 +296,9 @@ async def _monitor(): | |
| self.name, int((remain.total_seconds() + 30) // 60), end_time | ||
| ) | ||
| ) | ||
| # Notify callback about approaching expiration | ||
| if self.lease_ending_callback is not None: | ||
| self.lease_ending_callback(self, remain) | ||
|
Comment on lines
+299
to
+301
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling around callback invocation. Similar to the expired case, if the callback raises an exception during the warning phase, it will crash the monitor task before the lease actually expires, preventing proper expiration handling. Apply this diff to add error handling: # Notify callback about approaching expiration
if self.lease_ending_callback is not None:
- self.lease_ending_callback(self, remain)
+ try:
+ self.lease_ending_callback(self, remain)
+ except Exception as e:
+ logger.error("Error in lease ending callback for lease %s: %s", self.name, e)🤖 Prompt for AI Agents |
||
| await sleep(min(remain.total_seconds(), check_interval)) | ||
| else: | ||
| await sleep(1) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,9 @@ | ||
| import os | ||
| import signal | ||
| import sys | ||
| from contextlib import ExitStack, asynccontextmanager, contextmanager | ||
| from datetime import timedelta | ||
| from functools import partial | ||
| from subprocess import Popen | ||
|
|
||
| from anyio.from_thread import BlockingPortal, start_blocking_portal | ||
|
|
@@ -46,6 +49,34 @@ def serve(root_device: Driver): | |
| PROMPT_CWD = "\\W" | ||
|
|
||
|
|
||
| def lease_ending_handler(process: Popen, lease, remaining_time) -> None: | ||
| """Lease ending handler to terminate a process when lease ends. | ||
|
|
||
| Args: | ||
| process: The process to terminate | ||
| lease: The lease instance | ||
| remaining_time: Time remaining until lease expiration | ||
| """ | ||
|
|
||
| if remaining_time <= timedelta(0): | ||
| try: | ||
| process.send_signal(signal.SIGHUP) | ||
| except (ProcessLookupError, OSError): | ||
| pass # Process already terminated | ||
|
|
||
|
|
||
| def _run_process( | ||
| cmd: list[str], | ||
| env: dict[str, str], | ||
| lease=None, | ||
| ) -> int: | ||
| """Helper to run a process with an option to set a lease ending callback.""" | ||
| process = Popen(cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=env) | ||
| if lease is not None: | ||
| lease.lease_ending_callback = partial(lease_ending_handler, process) | ||
| return process.wait() | ||
|
Comment on lines
+74
to
+77
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # Search for SIGHUP, process group, and related signal handling
rg -n "SIGHUP|process.?group|setpgrp|setsid|signal|Popen" --type py -C 2Repository: jumpstarter-dev/jumpstarter Length of output: 198 🏁 Script executed: # Read the full context of the file to understand the function and its purpose
wc -l packages/jumpstarter/jumpstarter/common/utils.pyRepository: jumpstarter-dev/jumpstarter Length of output: 198 🏁 Script executed: # Read the relevant section with more context
sed -n '60,90p' packages/jumpstarter/jumpstarter/common/utils.pyRepository: jumpstarter-dev/jumpstarter Length of output: 198 🌐 Web query:
💡 Result: Short answer
Recommended options
Caveats
Key references If you want, I can paste a minimal code example for (a) setsid + killpg or (b) prctl-based pdeathsig. 🌐 Web query:
💡 Result: Short answer: Zombies happen when a child has exited but its parent never reaped its exit status. With subprocess.Popen this occurs if you never call wait()/communicate() (or otherwise call waitpid()) on the child, or if your parent is blocked/ignores reaping. Fixes: explicitly reap children or let the kernel auto-reap them. Sources: [1][2][3][4]. What causes zombies (quick):
Practical fixes (pick what's appropriate):
Caveats and gotchas
Selected references
If you want, tell me your Python version and a short snippet of how you spawn processes and I’ll give a specific fix you can apply. Reconsider reliance on shell cleanup; explicit signal handling recommended. SIGHUP is not automatically propagated to grandchildren spawned by the shell—this depends on shell session configuration and process group setup. While For reliable cleanup on lease expiry, consider using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tried the suggested solution, however using it introduced a bug where sending a SIGINT (with ctrl+c for example) is causing a crash and leaves the terminal in a misbehaving state.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @evakhoni, thank you for testing the process group approach and reporting the issue. The SIGINT problem you encountered is expected behavior—when you use Given your testing shows SIGHUP correctly propagates to subprocesses in zsh (and likely bash), and since this implementation is specifically for handling lease expiry rather than being a general-purpose process manager, the current approach appears reasonable. The key constraints here are:
Your local testing is the right validation approach. For production confidence, you might want to verify behavior across the shell types you support (bash, zsh, fish) and document any known edge cases where subprocesses might not receive SIGHUP (like when users explicitly use The current implementation balances the need for lease-based cleanup with maintaining a working interactive shell, which seems appropriate for this use case. ✏️ Learnings added
🧠 Learnings used |
||
|
|
||
|
|
||
| def launch_shell( | ||
| host: str, | ||
| context: str, | ||
|
|
@@ -54,6 +85,7 @@ def launch_shell( | |
| use_profiles: bool, | ||
| *, | ||
| command: tuple[str, ...] | None = None, | ||
| lease=None, | ||
| ) -> int: | ||
| """Launch a shell with a custom prompt indicating the exporter type. | ||
|
|
||
|
|
@@ -62,6 +94,12 @@ def launch_shell( | |
| context: The context of the shell (e.g. "local" or exporter name) | ||
| allow: List of allowed drivers | ||
| unsafe: Whether to allow drivers outside of the allow list | ||
| use_profiles: Whether to load shell profile files | ||
| command: Optional command to run instead of launching an interactive shell | ||
| lease: Optional Lease object to set up lease ending callback | ||
|
|
||
| Returns: | ||
| The exit code of the shell or command process | ||
| """ | ||
|
|
||
| shell = os.environ.get("SHELL", "bash") | ||
|
|
@@ -73,19 +111,16 @@ def launch_shell( | |
| } | ||
|
|
||
| if command: | ||
| process = Popen(command, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=common_env) | ||
| return process.wait() | ||
| return _run_process(list(command), common_env, lease) | ||
|
|
||
| if shell_name.endswith("bash"): | ||
| env = common_env | { | ||
| "PS1": f"{ANSI_GRAY}{PROMPT_CWD} {ANSI_YELLOW}⚡{ANSI_WHITE}{context} {ANSI_YELLOW}➤{ANSI_RESET} ", | ||
| } | ||
|
|
||
| cmd = [shell] | ||
| if not use_profiles: | ||
| cmd.extend(["--norc", "--noprofile"]) | ||
| process = Popen(cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=env) | ||
| return process.wait() | ||
| return _run_process(cmd, env, lease) | ||
|
|
||
| elif shell_name == "fish": | ||
| fish_fn = ( | ||
|
|
@@ -102,26 +137,20 @@ def launch_shell( | |
| "end" | ||
| ) | ||
| cmd = [shell, "--init-command", fish_fn] | ||
| process = Popen(cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=common_env) | ||
| return process.wait() | ||
| return _run_process(cmd, common_env, lease) | ||
|
|
||
| elif shell_name == "zsh": | ||
| env = common_env | { | ||
| "PS1": f"%F{{8}}%1~ %F{{yellow}}⚡%F{{white}}{context} %F{{yellow}}➤%f ", | ||
| } | ||
|
|
||
| if "HISTFILE" not in env: | ||
| env["HISTFILE"] = os.path.join(os.path.expanduser("~"), ".zsh_history") | ||
|
|
||
| cmd = [shell] | ||
| if not use_profiles: | ||
| cmd.append("--no-rcs") | ||
|
|
||
| cmd.extend(["-o", "inc_append_history", "-o", "share_history"]) | ||
|
|
||
| process = Popen(cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=env) | ||
| return process.wait() | ||
| return _run_process(cmd, env, lease) | ||
|
|
||
| else: | ||
| process = Popen([shell], stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, env=common_env) | ||
| return process.wait() | ||
| return _run_process([shell], common_env, lease) | ||
Uh oh!
There was an error while loading. Please reload this page.