diff --git a/terraform-gpu-devservers/lambda/reservation_processor/index.py b/terraform-gpu-devservers/lambda/reservation_processor/index.py index 0a90f9d0..b1f08ec2 100644 --- a/terraform-gpu-devservers/lambda/reservation_processor/index.py +++ b/terraform-gpu-devservers/lambda/reservation_processor/index.py @@ -2464,47 +2464,35 @@ def progress_callback(progress_message): elif dockerimage: logger.info(f"Custom Docker image specified: {dockerimage}") - record_trace_event(trace_data, "github_keys_fetch_start") - github_public_key = get_github_public_key(github_user, validate=True) - record_trace_event(trace_data, "github_keys_fetch_end") - if not github_public_key: - raise ValueError( - f"Could not fetch GitHub public key for GitHub user '{github_user}'" - ) - - # Check if user should get persistent disk - # Check if user explicitly requested no persistent disk (e.g., confirmed continuing without disk when another reservation has it) + # Determine persistent disk eligibility early so we can start creation ASAP no_persistent_disk_requested = request.get("no_persistent_disk", False) if no_persistent_disk_requested: - # User explicitly requested no persistent disk - skip all persistent disk logic use_persistent_disk = False logger.info( f"User explicitly requested no persistent disk for reservation {reservation_id} - skipping all disk logic") elif is_multinode and node_index > 0: - # For multinode: only node 0 gets persistent disk, others get EFS shared storage - use_persistent_disk = False # Only master node gets persistent disk + use_persistent_disk = False logger.info( f"Multinode node {node_index + 1}/{total_nodes}: using EFS shared storage instead of persistent disk") elif disk_name: - # NEW: If disk_name is specified, ALWAYS use persistent disk (named disk system allows multiple disks) use_persistent_disk = True logger.info( f"Named disk '{disk_name}' requested for reservation {reservation_id} - will use persistent disk") else: - # OLD logic: check if user has other active reservations with persistent disks use_persistent_disk = should_use_persistent_disk( user_id, reservation_id) persistent_volume_id = None device_name = None - target_az = None # Initialize target_az for use in connection info update - is_new_disk = False # Initialize is_new_disk for all code paths + target_az = None + is_new_disk = False + disk_warning = None - # If we're using persistent disk, immediately mark this reservation as having a volume - # to prevent race conditions with concurrent reservations + # === PHASE 1: Start disk creation early (non-blocking) === + # create_volume returns immediately with a volume_id while AWS creates it in the background. + # We do GitHub keys + EFS in parallel, then wait_for_disk_ready before pod creation. if use_persistent_disk: try: - # Reserve the volume ID slot in DynamoDB immediately to prevent race conditions update_reservation_fields( reservation_id, ebs_volume_reserved=True) update_reservation_status( @@ -2517,15 +2505,12 @@ def progress_callback(progress_message): if use_persistent_disk: try: - # NEW snapshot-first workflow (replaces old migration logic below) - # Always recreate volume from latest snapshot or create empty update_reservation_status( reservation_id, "preparing", detailed_status="Setting up persistent disk" + (f" '{disk_name}'" if disk_name else "") ) - # Determine target AZ for this reservation target_az = get_target_az_for_reservation(gpu_type, gpu_count) if not target_az: raise ValueError(f"Could not determine target AZ for {gpu_type} GPUs") @@ -2533,42 +2518,34 @@ def progress_callback(progress_message): logger.info(f"Target AZ for reservation: {target_az}") logger.info(f"Creating persistent disk for user {user_id}, disk_name={disk_name or 'default'}") - # Use new snapshot-first function + # Start volume creation (returns immediately, volume still 'creating') record_trace_event(trace_data, "disk_create_start") - persistent_volume_id, is_new_disk, disk_warning = create_disk_from_snapshot_or_empty( + persistent_volume_id, is_new_disk, disk_warning = start_disk_creation( user_id=user_id, availability_zone=target_az, disk_name=disk_name, reservation_id=reservation_id ) - record_trace_event(trace_data, "disk_create_end") + logger.info(f"Disk creation initiated: {persistent_volume_id} (is_new={is_new_disk})") - logger.info(f"Persistent disk ready: {persistent_volume_id} (is_new={is_new_disk})") - - # Mark disk as in_use in disks table (prevents CLI from showing as available) - # Use "default" as fallback when no explicit disk_name provided + # Store volume_id in DynamoDB immediately so cancel/cleanup can find it effective_disk_name = disk_name or "default" + update_reservation_fields(reservation_id, ebs_volume_id=persistent_volume_id, disk_name=effective_disk_name) + try: mark_disk_in_use(user_id, effective_disk_name, True, reservation_id) logger.info(f"Marked disk '{effective_disk_name}' as in_use for reservation {reservation_id[:8]}") except Exception as mark_error: logger.warning(f"Failed to mark disk as in_use: {mark_error}") - # Store disk_name in DynamoDB for tracking (ALWAYS store, using "default" as fallback) - # This is required for expiry cleanup to know which disk to mark as not in use - update_reservation_fields(reservation_id, disk_name=effective_disk_name) - - # Store warning if any if disk_warning: update_reservation_fields(reservation_id, warning=disk_warning) logger.warning(f"Stored warning for reservation {reservation_id}: {disk_warning}") except Exception as disk_error: - logger.error(f"Failed to set up persistent disk: {disk_error}") + logger.error(f"Failed to start persistent disk creation: {disk_error}") - # Check if this is a "disk in use" error - these should fail the reservation error_msg = str(disk_error) if "is currently in use" in error_msg or "already in use" in error_msg: - # Don't fall back - fail the reservation with clear error update_reservation_status( reservation_id, "failed", @@ -2576,25 +2553,33 @@ def progress_callback(progress_message): ) raise RuntimeError(f"Cannot create reservation: {error_msg}") - # For other errors, continue without persistent disk (backwards compatibility) logger.warning(f"Falling back to non-persistent storage due to disk error: {disk_error}") use_persistent_disk = False - persistent_volume_id = None # Clear any volume that was set before the error - is_new_disk = True # EmptyDir volume will need shell environment setup + persistent_volume_id = None + is_new_disk = True update_reservation_status( reservation_id, "preparing", "Persistent disk setup failed - continuing without persistent storage", ) else: - logger.info( - f"User {user_id} has existing reservations - no persistent disk") - # Non-persistent reservations always need shell environment setup + if not no_persistent_disk_requested and not (is_multinode and node_index > 0): + logger.info( + f"User {user_id} has existing reservations - no persistent disk") is_new_disk = True logger.info( "Non-persistent reservation - will always set up shell environment (CREATE_SH_ENV=true)") - # Set up shared EFS storage for user + # === PHASE 2: Do other work while volume is being created by AWS === + # GitHub keys fetch and EFS setup happen while the EBS volume transitions to 'available' + record_trace_event(trace_data, "github_keys_fetch_start") + github_public_key = get_github_public_key(github_user, validate=True) + record_trace_event(trace_data, "github_keys_fetch_end") + if not github_public_key: + raise ValueError( + f"Could not fetch GitHub public key for GitHub user '{github_user}'" + ) + efs_filesystem_id = None try: if EFS_SECURITY_GROUP_ID and EFS_SUBNET_IDS: @@ -2613,9 +2598,34 @@ def progress_callback(progress_message): "EFS configuration missing - skipping shared storage setup") except Exception as efs_error: logger.error(f"Failed to set up EFS: {efs_error}") - # Continue without EFS rather than failing efs_filesystem_id = None + # === PHASE 3: Wait for volume to be available (likely already done) === + if use_persistent_disk and persistent_volume_id: + try: + record_trace_event(trace_data, "disk_wait_start") + wait_for_disk_ready(persistent_volume_id) + record_trace_event(trace_data, "disk_create_end") + logger.info(f"Persistent disk ready: {persistent_volume_id}") + except Exception as wait_error: + logger.error(f"Volume {persistent_volume_id} failed to become available: {wait_error}") + # Clean up the orphaned volume + try: + ec2_client.delete_volume(VolumeId=persistent_volume_id) + logger.info(f"Cleaned up orphaned volume {persistent_volume_id}") + except Exception as cleanup_err: + logger.warning(f"Failed to clean up orphaned volume {persistent_volume_id}: {cleanup_err}") + + logger.warning("Falling back to non-persistent storage") + use_persistent_disk = False + persistent_volume_id = None + is_new_disk = True + update_reservation_status( + reservation_id, + "preparing", + "Persistent disk setup failed - continuing without persistent storage", + ) + # Update status: Creating Kubernetes resources disk_status = "with persistent disk" if use_persistent_disk else "without persistent disk" shared_status = "and shared storage" if efs_filesystem_id else "" @@ -2736,10 +2746,11 @@ def progress_callback(progress_message): v1 = client.CoreV1Api(k8s_client) # Try multiple times to find SSH daemon in logs (custom images may take longer) - # For minimal images like ubuntu:latest, apt-get install openssh-server + sudo can take 60+ seconds - # 18 retries = up to 180 seconds total (3 minutes) - max_retries = 18 - retry_delay = 10 # seconds between retries + # Default image has openssh-server pre-installed so SSH starts in ~2-5s + # Custom/minimal images may need apt-get install which takes longer + # 60 retries * 3s = 180 seconds total (3 minutes) - same max but much faster detection + max_retries = 60 + retry_delay = 3 # seconds between retries for attempt in range(max_retries): logs = v1.read_namespaced_pod_log( @@ -2921,6 +2932,20 @@ def progress_callback(progress_message): except Exception as e: logger.error(f"Error allocating GPU resources: {str(e)}") + + # Clean up orphaned EBS volume if we created one but pod creation failed + if persistent_volume_id: + try: + vol_response = ec2_client.describe_volumes(VolumeIds=[persistent_volume_id]) + vol_state = vol_response['Volumes'][0]['State'] if vol_response.get('Volumes') else None + if vol_state in ('available', 'creating'): + ec2_client.delete_volume(VolumeId=persistent_volume_id) + logger.info(f"Cleaned up orphaned volume {persistent_volume_id} (was {vol_state})") + else: + logger.warning(f"Orphaned volume {persistent_volume_id} in state '{vol_state}' - skipping delete") + except Exception as cleanup_err: + logger.warning(f"Failed to clean up orphaned volume {persistent_volume_id}: {cleanup_err}") + # Store trace data even on failure if tracing is enabled record_trace_event(trace_data, "allocate_failed") if trace_data: @@ -5235,6 +5260,233 @@ def mark_disk_in_use(user_id: str, disk_name: str, in_use: bool, reservation_id: raise +def start_disk_creation(user_id: str, availability_zone: str, disk_name: str = None, reservation_id: str = None) -> tuple[str, bool, str]: + """ + Non-blocking phase of disk creation: resolves snapshot waits, calls create_volume, + and returns immediately with the volume_id (state will be 'creating' or 'available'). + Returns (volume_id, is_new_disk, warning_message). + The caller must call wait_for_disk_ready(volume_id) before using the volume. + """ + try: + from shared.snapshot_utils import get_latest_snapshot + + logger.info(f"[DISK-START] Starting disk creation for user {user_id} in AZ {availability_zone}" + (f", disk_name={disk_name}" if disk_name else "")) + + # Step 1: Check for in-use volumes (prevent concurrent use) + if disk_name: + filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "tag:disk_name", "Values": [disk_name]}, + {"Name": "status", "Values": ["in-use", "available"]}, + ] + + max_wait_seconds = 120 + check_interval = 10 + waited = 0 + + while waited < max_wait_seconds: + response = ec2_client.describe_volumes(Filters=filters) + in_use_volumes = [v for v in response.get("Volumes", []) if v["State"] == "in-use"] + + if not in_use_volumes: + if waited > 0: + logger.info(f"Disk '{disk_name}' is now available after waiting {waited}s") + break + + volume_id = in_use_volumes[0]["VolumeId"] + + if waited == 0: + logger.info(f"Disk '{disk_name}' (volume {volume_id}) is in use - waiting for cleanup to complete") + if reservation_id: + update_reservation_status( + reservation_id, + "preparing", + detailed_status=f"Waiting for disk '{disk_name}' to be released from previous reservation" + ) + + time.sleep(check_interval) + waited += check_interval + logger.info(f"Still waiting for disk '{disk_name}' to be released... ({waited}s/{max_wait_seconds}s)") + + response = ec2_client.describe_volumes(Filters=filters) + in_use_volumes = [v for v in response.get("Volumes", []) if v["State"] == "in-use"] + + if in_use_volumes: + volume_id = in_use_volumes[0]["VolumeId"] + error_msg = f"Disk '{disk_name}' is still in use after waiting {max_wait_seconds}s (volume {volume_id}). The previous reservation may not have cleaned up properly." + logger.error(error_msg) + raise RuntimeError(error_msg) + + # Step 2: Wait for pending snapshots + pending_filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "status", "Values": ["pending"]}, + ] + if disk_name: + pending_filters.append({"Name": "tag:disk_name", "Values": [disk_name]}) + + pending_response = ec2_client.describe_snapshots( + OwnerIds=["self"], + Filters=pending_filters + ) + + pending_snapshots = pending_response.get('Snapshots', []) + if pending_snapshots: + latest_pending = max(pending_snapshots, key=lambda s: s['StartTime']) + snapshot_id = latest_pending['SnapshotId'] + logger.warning(f"Found pending snapshot {snapshot_id} for disk '{disk_name or 'default'}' - waiting for completion") + + if reservation_id: + update_reservation_status( + reservation_id, + "preparing", + f"Waiting for disk snapshot to complete (from previous session)" + ) + + try: + waiter = ec2_client.get_waiter('snapshot_completed') + waiter.wait( + SnapshotIds=[snapshot_id], + WaiterConfig={ + 'Delay': 15, + 'MaxAttempts': 120 + } + ) + logger.info(f"Pending snapshot {snapshot_id} completed, proceeding with disk creation") + except Exception as wait_error: + logger.error(f"Timeout waiting for snapshot {snapshot_id}: {wait_error}") + raise RuntimeError(f"Disk '{disk_name or 'default'}' snapshot is still being created from previous session. Please wait a few minutes and try again.") + + # Step 3: Find latest completed snapshot + snapshot_filters = [ + {"Name": "tag:gpu-dev-user", "Values": [user_id]}, + {"Name": "status", "Values": ["completed"]}, + ] + if disk_name: + snapshot_filters.append({"Name": "tag:disk_name", "Values": [disk_name]}) + + paginator = ec2_client.get_paginator('describe_snapshots') + page_iterator = paginator.paginate( + OwnerIds=["self"], + Filters=snapshot_filters, + PaginationConfig={'PageSize': 100} + ) + + snapshots = [] + for page in page_iterator: + snapshots.extend(page.get('Snapshots', [])) + + active_snapshots = [] + for snap in snapshots: + tags = {tag['Key']: tag['Value'] for tag in snap.get('Tags', [])} + if 'delete-date' not in tags: + active_snapshots.append(snap) + + latest_snapshot = max(active_snapshots, key=lambda s: s['StartTime']) if active_snapshots else None + + # Check DynamoDB for clone_source_snapshot if no own snapshots + if not latest_snapshot and disk_name: + try: + disks_table_name = os.environ.get('DISKS_TABLE_NAME', 'pytorch-gpu-dev-disks') + disks_table = dynamodb.Table(disks_table_name) + disk_item = disks_table.get_item( + Key={'user_id': user_id, 'disk_name': disk_name} + ).get('Item', {}) + clone_source = disk_item.get('clone_source_snapshot') + if clone_source: + try: + snap_response = ec2_client.describe_snapshots(SnapshotIds=[clone_source]) + snap_list = snap_response.get('Snapshots', []) + if snap_list and snap_list[0]['State'] == 'completed': + latest_snapshot = snap_list[0] + logger.info(f"Using clone source snapshot {clone_source} for disk '{disk_name}'") + except Exception as snap_err: + logger.warning(f"Clone source snapshot {clone_source} not accessible: {snap_err}") + except Exception as db_err: + logger.warning(f"Could not check clone_source_snapshot for disk '{disk_name}': {db_err}") + + # Step 4: Create volume (non-blocking - returns immediately with volume_id) + if latest_snapshot: + snapshot_id = latest_snapshot['SnapshotId'] + snapshot_tags = {tag['Key']: tag['Value'] for tag in latest_snapshot.get('Tags', [])} + snapshot_type = snapshot_tags.get('SnapshotType', '') + is_initial_snapshot = (snapshot_type == 'initial') + + logger.info(f"[DISK-START] Creating volume from snapshot {snapshot_id} (type: {snapshot_type or 'user-data'}) in {availability_zone}") + + create_response = ec2_client.create_volume( + AvailabilityZone=availability_zone, + SnapshotId=snapshot_id, + Size=1024, + VolumeType="gp3", + Iops=3000, + Throughput=125, + TagSpecifications=[{ + "ResourceType": "volume", + "Tags": [ + {"Key": "gpu-dev-user", "Value": user_id}, + {"Key": "Name", "Value": f"gpu-dev-disk-{user_id.split('@')[0]}" + (f"-{disk_name}" if disk_name else "")}, + {"Key": "Project", "Value": "gpu-dev-servers"}, + {"Key": "ManagedBy", "Value": "gpu-dev-cli"}, + {"Key": "disk_name", "Value": disk_name if disk_name else "default"}, + {"Key": "created_at", "Value": str(int(time.time()))}, + {"Key": "last_used", "Value": str(int(time.time()))}, + ], + }] + ) + + volume_id = create_response["VolumeId"] + is_new_disk = is_initial_snapshot + + if is_initial_snapshot: + logger.info(f"Initial snapshot detected - will set up shell environment (CREATE_SH_ENV=true)") + + logger.info(f"[DISK-START] Volume {volume_id} creation initiated from snapshot {snapshot_id} (not yet available)") + return volume_id, is_new_disk, None + + else: + logger.info(f"[DISK-START] No snapshot found for disk '{disk_name or 'default'}' - creating empty 1TB volume") + + create_response = ec2_client.create_volume( + AvailabilityZone=availability_zone, + Size=1024, + VolumeType="gp3", + Iops=3000, + Throughput=125, + TagSpecifications=[{ + "ResourceType": "volume", + "Tags": [ + {"Key": "gpu-dev-user", "Value": user_id}, + {"Key": "Name", "Value": f"gpu-dev-disk-{user_id.split('@')[0]}" + (f"-{disk_name}" if disk_name else "")}, + {"Key": "Project", "Value": "gpu-dev-servers"}, + {"Key": "ManagedBy", "Value": "gpu-dev-cli"}, + {"Key": "disk_name", "Value": disk_name if disk_name else "default"}, + {"Key": "created_at", "Value": str(int(time.time()))}, + {"Key": "last_used", "Value": str(int(time.time()))}, + ], + }] + ) + + volume_id = create_response["VolumeId"] + logger.info(f"[DISK-START] Empty volume {volume_id} creation initiated (not yet available)") + return volume_id, True, None + + except Exception as e: + logger.error(f"Error starting disk creation for user {user_id}, disk_name={disk_name}: {str(e)}") + raise + + +def wait_for_disk_ready(volume_id: str) -> None: + """ + Blocking phase: wait for a previously created volume to reach 'available' state. + Call this after start_disk_creation() and after doing other parallel work. + """ + logger.info(f"[DISK-WAIT] Waiting for volume {volume_id} to become available...") + waiter = ec2_client.get_waiter("volume_available") + waiter.wait(VolumeIds=[volume_id], WaiterConfig={"Delay": 3, "MaxAttempts": 100}) + logger.info(f"[DISK-WAIT] Volume {volume_id} is now available") + + def create_disk_from_snapshot_or_empty(user_id: str, availability_zone: str, disk_name: str = None, reservation_id: str = None) -> tuple[str, bool, str]: """ NEW snapshot-first workflow: Always recreate disk from latest snapshot or create empty.