From 0cfdca4ed2dfa77f430433741504e8cda638db72 Mon Sep 17 00:00:00 2001 From: Wouter Devriendt Date: Wed, 4 Feb 2026 13:07:05 -0800 Subject: [PATCH] feat: add orphaned disk cleanup and lambda improvements - Add cleanup_orphaned_disks() to expiry lambda to handle cases where nodes went offline unexpectedly and pods were never properly cleaned - Update availability and expiry terraform configs - Add development dependencies to pyproject.toml Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 37 +++++ terraform-gpu-devservers/availability.tf | 6 +- terraform-gpu-devservers/expiry.tf | 6 +- terraform-gpu-devservers/lambda.tf | 7 +- .../lambda/reservation_expiry/index.py | 129 +++++++++++++++++- .../lambda/reservation_processor/index.py | 14 +- terraform-gpu-devservers/main.tf | 5 +- 7 files changed, 187 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2a964bd1..0f1f3261 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,3 +33,40 @@ package-dir = {"" = "cli-tools/gpu-dev-cli"} [tool.setuptools.package-data] gpu_dev_cli = ["py.typed"] + +[project.optional-dependencies] +test = [ + "pytest>=8.0.0", + "pytest-cov>=4.1.0", + "pytest-asyncio>=0.23.0", + "pytest-timeout>=2.2.0", + "moto[all]>=5.0.0", + "freezegun>=1.2.0", + "responses>=0.25.0", +] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +addopts = "-v --tb=short" +markers = [ + "unit: Unit tests (fast, mocked)", + "e2e: End-to-end tests (require AWS dev cluster)", + "slow: Slow tests that can be skipped", +] +filterwarnings = [ + "ignore::DeprecationWarning", +] + +[tool.coverage.run] +source = ["cli-tools/gpu-dev-cli/gpu_dev_cli", "terraform-gpu-devservers/lambda"] +omit = ["*/tests/*", "*/__pycache__/*"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "if TYPE_CHECKING:", + "raise NotImplementedError", +] diff --git a/terraform-gpu-devservers/availability.tf b/terraform-gpu-devservers/availability.tf index daf87fec..0510349b 100644 --- a/terraform-gpu-devservers/availability.tf +++ b/terraform-gpu-devservers/availability.tf @@ -224,10 +224,10 @@ resource "null_resource" "availability_updater_build" { rm -rf package *.zip mkdir -p package - # Install dependencies if requirements.txt exists + # Install dependencies using Docker for Linux x86_64 compatibility if [ -f requirements.txt ]; then - python3 -m pip install --upgrade pip - python3 -m pip install -r requirements.txt --target package/ --force-reinstall + docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \ + install -r requirements.txt --target package/ --upgrade fi # Copy source code and shared modules diff --git a/terraform-gpu-devservers/expiry.tf b/terraform-gpu-devservers/expiry.tf index 9e21252d..ea3e0893 100644 --- a/terraform-gpu-devservers/expiry.tf +++ b/terraform-gpu-devservers/expiry.tf @@ -172,9 +172,9 @@ resource "null_resource" "reservation_expiry_build" { rm -rf package *.zip mkdir -p package - # Install dependencies with specific Python version - python3 -m pip install --upgrade pip - python3 -m pip install -r requirements.txt --target package/ --force-reinstall + # Install dependencies using Docker for Linux x86_64 compatibility + docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \ + install -r requirements.txt --target package/ --upgrade # Copy source code and shared modules cp index.py package/ diff --git a/terraform-gpu-devservers/lambda.tf b/terraform-gpu-devservers/lambda.tf index de79723e..71ba28f9 100644 --- a/terraform-gpu-devservers/lambda.tf +++ b/terraform-gpu-devservers/lambda.tf @@ -228,9 +228,10 @@ resource "null_resource" "reservation_processor_build" { rm -rf package *.zip mkdir -p package - # Install dependencies with specific Python version - python3 -m pip install --upgrade pip - python3 -m pip install -r requirements.txt --target package/ --force-reinstall + # Install dependencies using Docker for Linux x86_64 compatibility + # This ensures native extensions (cryptography) are built for Lambda's Linux environment + docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \ + install -r requirements.txt --target package/ --upgrade # Copy source code and shared modules cp index.py package/ diff --git a/terraform-gpu-devservers/lambda/reservation_expiry/index.py b/terraform-gpu-devservers/lambda/reservation_expiry/index.py index a2a04f1d..d397d342 100644 --- a/terraform-gpu-devservers/lambda/reservation_expiry/index.py +++ b/terraform-gpu-devservers/lambda/reservation_expiry/index.py @@ -861,6 +861,15 @@ def handler(event, context): logger.error(f"Error cleaning up soft-deleted snapshots: {e}") deleted_snapshot_count = 0 + # Clean up orphaned disks (in_use=True but pod/reservation gone) + # This handles cases where nodes went offline unexpectedly + try: + orphaned_disks_cleaned = cleanup_orphaned_disks() + logger.info(f"Cleaned up {orphaned_disks_cleaned} orphaned disks") + except Exception as e: + logger.error(f"Error cleaning up orphaned disks: {e}") + orphaned_disks_cleaned = 0 + return { "statusCode": 200, "body": json.dumps( @@ -873,6 +882,7 @@ def handler(event, context): "deleted_snapshots": deleted_snapshot_count, "tagged_snapshots": tagged_snapshot_count, "synced_disks": synced_disk_count, + "orphaned_disks_cleaned": orphaned_disks_cleaned, } ), } @@ -1017,6 +1027,108 @@ def find_disk_by_reservation(user_id: str, reservation_id: str) -> str | None: return None +def cleanup_orphaned_disks() -> int: + """ + Find and clean up disks marked as in_use where the associated pod/reservation is gone. + This handles cases where a node went offline and the pod was never properly cleaned up. + Returns count of disks cleaned up. + """ + cleaned_count = 0 + + try: + disks_table = dynamodb.Table(DISKS_TABLE) + reservations_table = dynamodb.Table(RESERVATIONS_TABLE) + + # Scan for disks marked as in_use + response = disks_table.scan( + FilterExpression="in_use = :true", + ExpressionAttributeValues={":true": True} + ) + + in_use_disks = response.get('Items', []) + + # Handle pagination + while 'LastEvaluatedKey' in response: + response = disks_table.scan( + FilterExpression="in_use = :true", + ExpressionAttributeValues={":true": True}, + ExclusiveStartKey=response['LastEvaluatedKey'] + ) + in_use_disks.extend(response.get('Items', [])) + + if not in_use_disks: + logger.debug("No disks marked as in_use found") + return 0 + + logger.info(f"Found {len(in_use_disks)} disks marked as in_use, checking for orphans") + + for disk in in_use_disks: + user_id = disk.get('user_id') + disk_name = disk.get('disk_name') + attached_reservation = disk.get('attached_to_reservation') + + if not user_id or not disk_name: + continue + + # If no reservation attached, it's orphaned - clean it up + if not attached_reservation: + logger.info(f"Disk '{disk_name}' for user {user_id} has no attached reservation - cleaning up") + try: + mark_disk_not_in_use(user_id, disk_name) + cleaned_count += 1 + except Exception as e: + logger.warning(f"Failed to clean up orphaned disk '{disk_name}': {e}") + continue + + # Check if the attached reservation still exists and is active + try: + res_response = reservations_table.get_item( + Key={'reservation_id': attached_reservation} + ) + + if 'Item' not in res_response: + # Reservation doesn't exist - disk is orphaned + logger.info(f"Disk '{disk_name}' attached to non-existent reservation {attached_reservation[:8]} - cleaning up") + mark_disk_not_in_use(user_id, disk_name) + cleaned_count += 1 + continue + + reservation = res_response['Item'] + status = reservation.get('status', '') + + # If reservation is in a terminal state, clean up the disk + if status in ['expired', 'cancelled', 'failed']: + logger.info(f"Disk '{disk_name}' attached to {status} reservation {attached_reservation[:8]} - cleaning up") + mark_disk_not_in_use(user_id, disk_name) + cleaned_count += 1 + continue + + # If reservation is active/preparing, check if pod actually exists + if status in ['active', 'preparing']: + pod_name = reservation.get('pod_name') + if pod_name and not check_pod_exists(pod_name): + # Pod is gone but reservation shows active - node likely went offline + logger.info(f"Disk '{disk_name}' attached to reservation {attached_reservation[:8]} with missing pod {pod_name} - cleaning up") + mark_disk_not_in_use(user_id, disk_name) + cleaned_count += 1 + + # Also expire the reservation since pod is gone + try: + expire_reservation_due_to_missing_pod(reservation) + logger.info(f"Also expired reservation {attached_reservation[:8]} due to missing pod") + except Exception as expire_error: + logger.warning(f"Failed to expire reservation {attached_reservation[:8]}: {expire_error}") + + except Exception as res_error: + logger.warning(f"Error checking reservation for disk '{disk_name}': {res_error}") + + return cleaned_count + + except Exception as e: + logger.error(f"Error in cleanup_orphaned_disks: {e}") + return cleaned_count + + def handle_oom_event(reservation: dict, oom_info: dict) -> bool: """ Handle an OOM event for a reservation. @@ -1193,9 +1305,10 @@ def warn_user_expiring(reservation: dict[str, Any], warning_minutes: int) -> Non def expire_reservation_due_to_missing_pod(reservation: dict[str, Any]) -> None: - """Mark reservation as expired when pod is missing (likely manually deleted)""" + """Mark reservation as expired when pod is missing (likely manually deleted or node went offline)""" try: reservation_id = reservation["reservation_id"] + user_id = reservation.get("user_id") logger.info( f"Marking reservation {reservation_id} as expired due to missing pod" @@ -1220,6 +1333,20 @@ def expire_reservation_due_to_missing_pod(reservation: dict[str, Any]) -> None: f"Successfully marked reservation {reservation_id} as expired due to missing pod" ) + # Clean up disk in_use flag immediately (don't wait for next expiry run) + disk_name = reservation.get("disk_name") + + # Fallback: if disk_name not in reservation, look it up from disks table + if user_id and not disk_name: + disk_name = find_disk_by_reservation(user_id, reservation_id) + + if user_id and disk_name: + try: + mark_disk_not_in_use(user_id, disk_name) + logger.info(f"Cleared disk '{disk_name}' in_use flag for missing-pod reservation {reservation_id[:8]}") + except Exception as disk_error: + logger.warning(f"Failed to clear disk in_use flag for {reservation_id[:8]}: {disk_error}") + except Exception as e: logger.error( f"Error marking reservation {reservation.get('reservation_id')} as expired: {str(e)}" diff --git a/terraform-gpu-devservers/lambda/reservation_processor/index.py b/terraform-gpu-devservers/lambda/reservation_processor/index.py index 40e3c586..f8f132d6 100644 --- a/terraform-gpu-devservers/lambda/reservation_processor/index.py +++ b/terraform-gpu-devservers/lambda/reservation_processor/index.py @@ -3417,12 +3417,18 @@ def get_pod_resource_requests(gpu_count: int, gpu_type: str, is_multinode: bool def _pod_uses_efa(gpu_count: int, gpu_type: str, is_multinode: bool = False) -> bool: - """Check if pod will use EFA based on configuration""" + """Check if pod will use EFA based on configuration. + + EFA is enabled for full-node allocations on high-end GPU types (H100, H200, B200, A100) + that have EFA hardware available. This enables RDMA/high-bandwidth networking for + both single-node and multi-node workloads. + """ config = GPU_CONFIG.get(gpu_type, GPU_CONFIG_DEFAULT) + # GPU types that support EFA (have EFA hardware on their instance types) + efa_supported_types = {"h100", "h200", "b200", "a100"} return ( - gpu_type != "t4-small" and - is_multinode and - gpu_count == config["max_gpus"] + gpu_type in efa_supported_types and + gpu_count == config["max_gpus"] # Full node allocation only ) diff --git a/terraform-gpu-devservers/main.tf b/terraform-gpu-devservers/main.tf index ac0eea9b..b67f75db 100644 --- a/terraform-gpu-devservers/main.tf +++ b/terraform-gpu-devservers/main.tf @@ -259,8 +259,7 @@ locals { { id = null, instance_count = 1 } # A100 on-demand (1 instance) ] h100 = [ - { id = "cr-0a7caa7414866615a", instance_count = 4 }, # H100 reservation us-east-2c (p5.48xlarge) - { id = null, instance_count = 2 } # H100 on-demand (2 instances) + { id = "cr-0a0a39a4c51068e30", instance_count = 4 }, # H100 reservation us-east-2c (p5.48xlarge) ] h200 = [ { id = "cr-0f6d0766f5d3339e6", instance_count = 2 }, # H200 reservation us-east-2c (p5e.48xlarge) @@ -319,7 +318,7 @@ locals { "cr-0f6d0766f5d3339e6" = "tertiary" # us-east-2c (p5e.48xlarge) "cr-06c9c978dea756a26" = "tertiary" # us-east-2c # H100 capacity reservation - "cr-0a7caa7414866615a" = "tertiary" # us-east-2c (p5.48xlarge) + "cr-0a0a39a4c51068e30" = "tertiary" # us-east-2c (p5.48xlarge) # A100 capacity reservation "cr-01cc0f00f28b095af" = "primary" # us-east-2a }