From a9d8db073777a7fc7c3455a437285c66bfb53504 Mon Sep 17 00:00:00 2001 From: Wouter Devriendt Date: Mon, 9 Mar 2026 01:17:08 -0700 Subject: [PATCH] perf: parallelize EBS disk creation with GitHub keys + EFS setup Split disk creation into start_disk_creation() (non-blocking) and wait_for_disk_ready() (blocking). The create_volume API call returns immediately while AWS creates the volume in the background. We now do GitHub key fetching and EFS setup during that time, then wait for the volume only right before pod creation. Also: - Store ebs_volume_id in DynamoDB immediately after create_volume so cancel/cleanup can always find and clean up orphaned volumes - Add orphan volume cleanup in the outer except block when allocation fails after volume creation - Reduce SSH daemon poll interval from 10s to 3s (60 retries) since default image has openssh-server pre-installed --- .../lambda/reservation_processor/index.py | 352 +++++++++++++++--- 1 file changed, 302 insertions(+), 50 deletions(-) diff --git a/terraform-gpu-devservers/lambda/reservation_processor/index.py b/terraform-gpu-devservers/lambda/reservation_processor/index.py index 0a90f9d0..b1f08ec2 100644 --- a/terraform-gpu-devservers/lambda/reservation_processor/index.py +++ b/terraform-gpu-devservers/lambda/reservation_processor/index.py @@ -2464,47 +2464,35 @@ def progress_callback(progress_message): elif dockerimage: logger.info(f"Custom Docker image specified: {dockerimage}") - record_trace_event(trace_data, "github_keys_fetch_start") - github_public_key = get_github_public_key(github_user, validate=True) - record_trace_event(trace_data, "github_keys_fetch_end") - if not github_public_key: - raise ValueError( - f"Could not fetch GitHub public key for GitHub user '{github_user}'" - ) - - # Check if user should get persistent disk - # Check if user explicitly requested no persistent disk (e.g., confirmed continuing without disk when another reservation has it) + # Determine persistent disk eligibility early so we can start creation ASAP no_persistent_disk_requested = request.get("no_persistent_disk", False) if no_persistent_disk_requested: - # User explicitly requested no persistent disk - skip all persistent disk logic use_persistent_disk = False logger.info( f"User explicitly requested no persistent disk for reservation {reservation_id} - skipping all disk logic") elif is_multinode and node_index > 0: - # For multinode: only node 0 gets persistent disk, others get EFS shared storage - use_persistent_disk = False # Only master node gets persistent disk + use_persistent_disk = False logger.info( f"Multinode node {node_index + 1}/{total_nodes}: using EFS shared storage instead of persistent disk") elif disk_name: - # NEW: If disk_name is specified, ALWAYS use persistent disk (named disk system allows multiple disks) use_persistent_disk = True logger.info( f"Named disk '{disk_name}' requested for reservation {reservation_id} - will use persistent disk") else: - # OLD logic: check if user has other active reservations with persistent disks use_persistent_disk = should_use_persistent_disk( user_id, reservation_id) persistent_volume_id = None device_name = None - target_az = None # Initialize target_az for use in connection info update - is_new_disk = False # Initialize is_new_disk for all code paths + target_az = None + is_new_disk = False + disk_warning = None - # If we're using persistent disk, immediately mark this reservation as having a volume - # to prevent race conditions with concurrent reservations + # === PHASE 1: Start disk creation early (non-blocking) === + # create_volume returns immediately with a volume_id while AWS creates it in the background. + # We do GitHub keys + EFS in parallel, then wait_for_disk_ready before pod creation. if use_persistent_disk: try: - # Reserve the volume ID slot in DynamoDB immediately to prevent race conditions update_reservation_fields( reservation_id, ebs_volume_reserved=True) update_reservation_status( @@ -2517,15 +2505,12 @@ def progress_callback(progress_message): if use_persistent_disk: try: - # NEW snapshot-first workflow (replaces old migration logic below) - # Always recreate volume from latest snapshot or create empty update_reservation_status( reservation_id, "preparing", detailed_status="Setting up persistent disk" + (f" '{disk_name}'" if disk_name else "") ) - # Determine target AZ for this reservation target_az = get_target_az_for_reservation(gpu_type, gpu_count) if not target_az: raise ValueError(f"Could not determine target AZ for {gpu_type} GPUs") @@ -2533,42 +2518,34 @@ def progress_callback(progress_message): logger.info(f"Target AZ for reservation: {target_az}") logger.info(f"Creating persistent disk for user {user_id}, disk_name={disk_name or 'default'}") - # Use new snapshot-first function + # Start volume creation (returns immediately, volume still 'creating') record_trace_event(trace_data, "disk_create_start") - persistent_volume_id, is_new_disk, disk_warning = create_disk_from_snapshot_or_empty( + persistent_volume_id, is_new_disk, disk_warning = start_disk_creation( user_id=user_id, availability_zone=target_az, disk_name=disk_name, reservation_id=reservation_id ) - record_trace_event(trace_data, "disk_create_end") + logger.info(f"Disk creation initiated: {persistent_volume_id} (is_new={is_new_disk})") - logger.info(f"Persistent disk ready: {persistent_volume_id} (is_new={is_new_disk})") - - # Mark disk as in_use in disks table (prevents CLI from showing as available) - # Use "default" as fallback when no explicit disk_name provided + # Store volume_id in DynamoDB immediately so cancel/cleanup can find it effective_disk_name = disk_name or "default" + update_reservation_fields(reservation_id, ebs_volume_id=persistent_volume_id, disk_name=effective_disk_name) + try: mark_disk_in_use(user_id, effective_disk_name, True, reservation_id) logger.info(f"Marked disk '{effective_disk_name}' as in_use for reservation {reservation_id[:8]}") except Exception as mark_error: logger.warning(f"Failed to mark disk as in_use: {mark_error}") - # Store disk_name in DynamoDB for tracking (ALWAYS store, using "default" as fallback) - # This is required for expiry cleanup to know which disk to mark as not in use - update_reservation_fields(reservation_id, disk_name=effective_disk_name) - - # Store warning if any if disk_warning: update_reservation_fields(reservation_id, warning=disk_warning) logger.warning(f"Stored warning for reservation {reservation_id}: {disk_warning}") except Exception as disk_error: - logger.error(f"Failed to set up persistent disk: {disk_error}") + logger.error(f"Failed to start persistent disk creation: {disk_error}") - # Check if this is a "disk in use" error - these should fail the reservation error_msg = str(disk_error) if "is currently in use" in error_msg or "already in use" in error_msg: - # Don't fall back - fail the reservation with clear error update_reservation_status( reservation_id, "failed", @@ -2576,25 +2553,33 @@ def progress_callback(progress_message): ) raise RuntimeError(f"Cannot create reservation: {error_msg}") - # For other errors, continue without persistent disk (backwards compatibility) logger.warning(f"Falling back to non-persistent storage due to disk error: {disk_error}") use_persistent_disk = False - persistent_volume_id = None # Clear any volume that was set before the error - is_new_disk = True # EmptyDir volume will need shell environment setup + persistent_volume_id = None + is_new_disk = True update_reservation_status( reservation_id, "preparing", "Persistent disk setup failed - continuing without persistent storage", ) else: - logger.info( - f"User {user_id} has existing reservations - no persistent disk") - # Non-persistent reservations always need shell environment setup + if not no_persistent_disk_requested and not (is_multinode and node_index > 0): + logger.info( + f"User {user_id} has existing reservations - no persistent disk") is_new_disk = True logger.info( "Non-persistent reservation - will always set up shell environment (CREATE_SH_ENV=true)") - # Set up shared EFS storage for user + # === PHASE 2: Do other work while volume is being created by AWS === + # GitHub keys fetch and EFS setup happen while the EBS volume transitions to 'available' + record_trace_event(trace_data, "github_keys_fetch_start") + github_public_key = get_github_public_key(github_user, validate=True) + record_trace_event(trace_data, "github_keys_fetch_end") + if not github_public_key: + raise ValueError( + f"Could not fetch GitHub public key for GitHub user '{github_user}'" + ) + efs_filesystem_id = None try: if EFS_SECURITY_GROUP_ID and EFS_SUBNET_IDS: @@ -2613,9 +2598,34 @@ def progress_callback(progress_message): "EFS configuration missing - skipping shared storage setup") except Exception as efs_error: logger.error(f"Failed to set up EFS: {efs_error}") - # Continue without EFS rather than failing efs_filesystem_id = None + # === PHASE 3: Wait for volume to be available (likely already done) === + if use_persistent_disk and persistent_volume_id: + try: + record_trace_event(trace_data, "disk_wait_start") + wait_for_disk_ready(persistent_volume_id) + record_trace_event(trace_data, "disk_create_end") + logger.info(f"Persistent disk ready: {persistent_volume_id}") + except Exception as wait_error: + logger.error(f"Volume {persistent_volume_id} failed to become available: {wait_error}") + # Clean up the orphaned volume + try: + ec2_client.delete_volume(VolumeId=persistent_volume_id) + logger.info(f"Cleaned up orphaned volume {persistent_volume_id}") + except Exception as cleanup_err: + logger.warning(f"Failed to clean up orphaned volume {persistent_volume_id}: {cleanup_err}") + + logger.warning("Falling back to non-persistent storage") + use_persistent_disk = False + persistent_volume_id = None + is_new_disk = True + update_reservation_status( + reservation_id, + "preparing", + "Persistent disk setup failed - continuing without persistent storage", + ) + # Update status: Creating Kubernetes resources disk_status = "with persistent disk" if use_persistent_disk else "without persistent disk" shared_status = "and shared storage" if efs_filesystem_id else "" @@ -2736,10 +2746,11 @@ def progress_callback(progress_message): v1 = client.CoreV1Api(k8s_client) # Try multiple times to find SSH daemon in logs (custom images may take longer) - # For minimal images like ubuntu:latest, apt-get install openssh-server + sudo can take 60+ seconds - # 18 retries = up to 180 seconds total (3 minutes) - max_retries = 18 - retry_delay = 10 # seconds between retries + # Default image has openssh-server pre-installed so SSH starts in ~2-5s + # Custom/minimal images may need apt-get install which takes longer + # 60 retries * 3s = 180 seconds total (3 minutes) - same max but much faster detection + max_retries = 60 + retry_delay = 3 # seconds between retries for attempt in range(max_retries): logs = v1.read_namespaced_pod_log( @@ -2921,6 +2932,20 @@ def progress_callback(progress_message): except Exception as e: logger.error(f"Error allocating GPU resources: {str(e)}") + + # Clean up orphaned EBS volume if we created one but pod creation failed + if persistent_volume_id: + try: + vol_response = ec2_client.describe_volumes(VolumeIds=[persistent_volume_id]) + vol_state = vol_response['Volumes'][0]['State'] if vol_response.get('Volumes') else None + if vol_state in ('available', 'creating'): + ec2_client.delete_volume(VolumeId=persistent_volume_id) + logger.info(f"Cleaned up orphaned volume {persistent_volume_id} (was {vol_state})") + else: + logger.warning(f"Orphaned volume {persistent_volume_id} in state '{vol_state}' - skipping delete") + except Exception as cleanup_err: + logger.warning(f"Failed to clean up orphaned volume {persistent_volume_id}: {cleanup_err}") + # Store trace data even on failure if tracing is enabled record_trace_event(trace_data, "allocate_failed") if trace_data: @@ -5235,6 +5260,233 @@ def mark_disk_in_use(user_id: str, disk_name: str, in_use: bool, reservation_id: raise +def start_disk_creation(user_id: str, availability_zone: str, disk_name: str = None, reservation_id: str = None) -> tuple[str, bool, str]: + """ + Non-blocking phase of disk creation: resolves snapshot waits, calls create_volume, + and returns immediately with the volume_id (state will be 'creating' or 'available'). + Returns (volume_id, is_new_disk, warning_message). + The caller must call wait_for_disk_ready(volume_id) before using the volume. + """ + try: + from shared.snapshot_utils import get_latest_snapshot + + logger.info(f"[DISK-START] Starting disk creation for user {user_id} in AZ {availability_zone}" + (f", disk_name={disk_name}" if disk_name else "")) + + # Step 1: Check for in-use volumes (prevent concurrent use) + if disk_name: + filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "tag:disk_name", "Values": [disk_name]}, + {"Name": "status", "Values": ["in-use", "available"]}, + ] + + max_wait_seconds = 120 + check_interval = 10 + waited = 0 + + while waited < max_wait_seconds: + response = ec2_client.describe_volumes(Filters=filters) + in_use_volumes = [v for v in response.get("Volumes", []) if v["State"] == "in-use"] + + if not in_use_volumes: + if waited > 0: + logger.info(f"Disk '{disk_name}' is now available after waiting {waited}s") + break + + volume_id = in_use_volumes[0]["VolumeId"] + + if waited == 0: + logger.info(f"Disk '{disk_name}' (volume {volume_id}) is in use - waiting for cleanup to complete") + if reservation_id: + update_reservation_status( + reservation_id, + "preparing", + detailed_status=f"Waiting for disk '{disk_name}' to be released from previous reservation" + ) + + time.sleep(check_interval) + waited += check_interval + logger.info(f"Still waiting for disk '{disk_name}' to be released... ({waited}s/{max_wait_seconds}s)") + + response = ec2_client.describe_volumes(Filters=filters) + in_use_volumes = [v for v in response.get("Volumes", []) if v["State"] == "in-use"] + + if in_use_volumes: + volume_id = in_use_volumes[0]["VolumeId"] + error_msg = f"Disk '{disk_name}' is still in use after waiting {max_wait_seconds}s (volume {volume_id}). The previous reservation may not have cleaned up properly." + logger.error(error_msg) + raise RuntimeError(error_msg) + + # Step 2: Wait for pending snapshots + pending_filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "status", "Values": ["pending"]}, + ] + if disk_name: + pending_filters.append({"Name": "tag:disk_name", "Values": [disk_name]}) + + pending_response = ec2_client.describe_snapshots( + OwnerIds=["self"], + Filters=pending_filters + ) + + pending_snapshots = pending_response.get('Snapshots', []) + if pending_snapshots: + latest_pending = max(pending_snapshots, key=lambda s: s['StartTime']) + snapshot_id = latest_pending['SnapshotId'] + logger.warning(f"Found pending snapshot {snapshot_id} for disk '{disk_name or 'default'}' - waiting for completion") + + if reservation_id: + update_reservation_status( + reservation_id, + "preparing", + f"Waiting for disk snapshot to complete (from previous session)" + ) + + try: + waiter = ec2_client.get_waiter('snapshot_completed') + waiter.wait( + SnapshotIds=[snapshot_id], + WaiterConfig={ + 'Delay': 15, + 'MaxAttempts': 120 + } + ) + logger.info(f"Pending snapshot {snapshot_id} completed, proceeding with disk creation") + except Exception as wait_error: + logger.error(f"Timeout waiting for snapshot {snapshot_id}: {wait_error}") + raise RuntimeError(f"Disk '{disk_name or 'default'}' snapshot is still being created from previous session. Please wait a few minutes and try again.") + + # Step 3: Find latest completed snapshot + snapshot_filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "status", "Values": ["completed"]}, + ] + if disk_name: + snapshot_filters.append({"Name": "tag:disk_name", "Values": [disk_name]}) + + paginator = ec2_client.get_paginator('describe_snapshots') + page_iterator = paginator.paginate( + OwnerIds=["self"], + Filters=snapshot_filters, + PaginationConfig={'PageSize': 100} + ) + + snapshots = [] + for page in page_iterator: + snapshots.extend(page.get('Snapshots', [])) + + active_snapshots = [] + for snap in snapshots: + tags = {tag['Key']: tag['Value'] for tag in snap.get('Tags', [])} + if 'delete-date' not in tags: + active_snapshots.append(snap) + + latest_snapshot = max(active_snapshots, key=lambda s: s['StartTime']) if active_snapshots else None + + # Check DynamoDB for clone_source_snapshot if no own snapshots + if not latest_snapshot and disk_name: + try: + disks_table_name = os.environ.get('DISKS_TABLE_NAME', 'pytorch-gpu-dev-disks') + disks_table = dynamodb.Table(disks_table_name) + disk_item = disks_table.get_item( + Key={'user_id': user_id, 'disk_name': disk_name} + ).get('Item', {}) + clone_source = disk_item.get('clone_source_snapshot') + if clone_source: + try: + snap_response = ec2_client.describe_snapshots(SnapshotIds=[clone_source]) + snap_list = snap_response.get('Snapshots', []) + if snap_list and snap_list[0]['State'] == 'completed': + latest_snapshot = snap_list[0] + logger.info(f"Using clone source snapshot {clone_source} for disk '{disk_name}'") + except Exception as snap_err: + logger.warning(f"Clone source snapshot {clone_source} not accessible: {snap_err}") + except Exception as db_err: + logger.warning(f"Could not check clone_source_snapshot for disk '{disk_name}': {db_err}") + + # Step 4: Create volume (non-blocking - returns immediately with volume_id) + if latest_snapshot: + snapshot_id = latest_snapshot['SnapshotId'] + snapshot_tags = {tag['Key']: tag['Value'] for tag in latest_snapshot.get('Tags', [])} + snapshot_type = snapshot_tags.get('SnapshotType', '') + is_initial_snapshot = (snapshot_type == 'initial') + + logger.info(f"[DISK-START] Creating volume from snapshot {snapshot_id} (type: {snapshot_type or 'user-data'}) in {availability_zone}") + + create_response = ec2_client.create_volume( + AvailabilityZone=availability_zone, + SnapshotId=snapshot_id, + Size=1024, + VolumeType="gp3", + Iops=3000, + Throughput=125, + TagSpecifications=[{ + "ResourceType": "volume", + "Tags": [ + {"Key": "gpu-dev-user", "Value": user_id}, + {"Key": "Name", "Value": f"gpu-dev-disk-{user_id.split('@')[0]}" + (f"-{disk_name}" if disk_name else "")}, + {"Key": "Project", "Value": "gpu-dev-servers"}, + {"Key": "ManagedBy", "Value": "gpu-dev-cli"}, + {"Key": "disk_name", "Value": disk_name if disk_name else "default"}, + {"Key": "created_at", "Value": str(int(time.time()))}, + {"Key": "last_used", "Value": str(int(time.time()))}, + ], + }] + ) + + volume_id = create_response["VolumeId"] + is_new_disk = is_initial_snapshot + + if is_initial_snapshot: + logger.info(f"Initial snapshot detected - will set up shell environment (CREATE_SH_ENV=true)") + + logger.info(f"[DISK-START] Volume {volume_id} creation initiated from snapshot {snapshot_id} (not yet available)") + return volume_id, is_new_disk, None + + else: + logger.info(f"[DISK-START] No snapshot found for disk '{disk_name or 'default'}' - creating empty 1TB volume") + + create_response = ec2_client.create_volume( + AvailabilityZone=availability_zone, + Size=1024, + VolumeType="gp3", + Iops=3000, + Throughput=125, + TagSpecifications=[{ + "ResourceType": "volume", + "Tags": [ + {"Key": "gpu-dev-user", "Value": user_id}, + {"Key": "Name", "Value": f"gpu-dev-disk-{user_id.split('@')[0]}" + (f"-{disk_name}" if disk_name else "")}, + {"Key": "Project", "Value": "gpu-dev-servers"}, + {"Key": "ManagedBy", "Value": "gpu-dev-cli"}, + {"Key": "disk_name", "Value": disk_name if disk_name else "default"}, + {"Key": "created_at", "Value": str(int(time.time()))}, + {"Key": "last_used", "Value": str(int(time.time()))}, + ], + }] + ) + + volume_id = create_response["VolumeId"] + logger.info(f"[DISK-START] Empty volume {volume_id} creation initiated (not yet available)") + return volume_id, True, None + + except Exception as e: + logger.error(f"Error starting disk creation for user {user_id}, disk_name={disk_name}: {str(e)}") + raise + + +def wait_for_disk_ready(volume_id: str) -> None: + """ + Blocking phase: wait for a previously created volume to reach 'available' state. + Call this after start_disk_creation() and after doing other parallel work. + """ + logger.info(f"[DISK-WAIT] Waiting for volume {volume_id} to become available...") + waiter = ec2_client.get_waiter("volume_available") + waiter.wait(VolumeIds=[volume_id], WaiterConfig={"Delay": 3, "MaxAttempts": 100}) + logger.info(f"[DISK-WAIT] Volume {volume_id} is now available") + + def create_disk_from_snapshot_or_empty(user_id: str, availability_zone: str, disk_name: str = None, reservation_id: str = None) -> tuple[str, bool, str]: """ NEW snapshot-first workflow: Always recreate disk from latest snapshot or create empty.