diff --git a/.flake8 b/.flake8 index 13cb9ba1..b32f0349 100644 --- a/.flake8 +++ b/.flake8 @@ -3,3 +3,4 @@ max-line-length = 120 # Ignore E402: module-level import not at top of file # Ignore W503: line break before binary operator (incompatible with W504) ignore = E402,W503 +exclude = .venv diff --git a/.gitignore b/.gitignore index 51130c5b..9807bf14 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,9 @@ *.sln.docstates *.env +# Environment files +.venv/ + # User-specific files (MonoDevelop/Xamarin Studio) *.userprefs diff --git a/simulator/__init__.py b/simulator/__init__.py index e69de29b..263309ff 100644 --- a/simulator/__init__.py +++ b/simulator/__init__.py @@ -0,0 +1,15 @@ +""" +Simulator package — provisioning sweeps, multi-request analysis, and plotting +on top of the model_provisioner allocation policies. + +The allocation policy implementations live in ``streamwise/model_provisioner/``. +""" +import os +import sys + +# Make model_provisioner importable for simulator modules. +_STREAMWISE_DIR = os.path.normpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "streamwise") +) +if _STREAMWISE_DIR not in sys.path: + sys.path.insert(0, _STREAMWISE_DIR) diff --git a/simulator/actions.py b/simulator/actions.py index debea677..69af1618 100644 --- a/simulator/actions.py +++ b/simulator/actions.py @@ -27,7 +27,7 @@ from sim_types import Objective from sim_types import Policy -from policies import STREAMWISE_POLICY +from model_provisioner.policies import STREAMWISE_POLICY from models import get_model_allocation diff --git a/simulator/auto_model_allocator.py b/simulator/auto_model_allocator.py index ea0fda61..3ca86cb7 100644 --- a/simulator/auto_model_allocator.py +++ b/simulator/auto_model_allocator.py @@ -19,7 +19,7 @@ from sim_types import GPUType from sim_types import Result -from policies import STREAMWISE_POLICY +from model_provisioner.policies import STREAMWISE_POLICY from model_allocator import ModelAllocator @@ -47,7 +47,7 @@ def __init__( def _build_allocator(self) -> ModelAllocator: """Create concrete allocator based on configured solver.""" if self.policy.solver == Solver.GREEDY: - from greedy import GreedyAllocator + from model_provisioner.greedy import GreedyAllocator return GreedyAllocator( workflow=self.workflow, latency_data=self.latency_data, @@ -55,7 +55,7 @@ def _build_allocator(self) -> ModelAllocator: policy=self.policy, ) if self.policy.solver == Solver.NAIVE: - from naive_baseline import NaiveAllocator + from model_provisioner.naive_baseline import NaiveAllocator return NaiveAllocator( workflow=self.workflow, latency_data=self.latency_data, @@ -63,7 +63,7 @@ def _build_allocator(self) -> ModelAllocator: policy=self.policy, ) if self.policy.solver in {Solver.GUROBI, Solver.HIGHS}: - from milp import MILPAllocator + from model_provisioner.milp import MILPAllocator return MILPAllocator( workflow=self.workflow, latency_data=self.latency_data, @@ -71,7 +71,7 @@ def _build_allocator(self) -> ModelAllocator: policy=self.policy, ) if self.policy.solver == Solver.HEXGEN: - from hexgen import HexGenAllocator + from model_provisioner.hexgen import HexGenAllocator return HexGenAllocator( workflow=self.workflow, latency_data=self.latency_data, @@ -79,7 +79,7 @@ def _build_allocator(self) -> ModelAllocator: policy=self.policy, ) if self.policy.solver == Solver.HELIX: - from helix import HelixAllocator + from model_provisioner.helix import HelixAllocator return HelixAllocator( workflow=self.workflow, latency_data=self.latency_data, diff --git a/simulator/data_loading.py b/simulator/data_loading.py index 6ee59ec5..af37e5b8 100644 --- a/simulator/data_loading.py +++ b/simulator/data_loading.py @@ -28,15 +28,17 @@ from constants import POWER_GPU_IDLE from constants import POWER_GPU_TDP +_DEFAULT_DATA_DIR = Path(__file__).resolve().parent / "data" + def load_latency_data( - data_dir: str = "data/", + data_dir: str | Path = _DEFAULT_DATA_DIR, ) -> LatencyData: """ Load latency and throughput mapping data from CSV files. Args: - data_dir (str): The directory where the CSV files are stored. + data_dir: The directory where the CSV files are stored. Returns: LatencyData: An object containing all loaded latency data. """ @@ -107,13 +109,13 @@ def load_latency_data( def load_power_data( - data_dir: str = "data/" + data_dir: str | Path = _DEFAULT_DATA_DIR ) -> PowerData: """ Load power consumption data from CSV files. Args: - data_dir (str): The directory where the CSV files are stored. + data_dir: The directory where the CSV files are stored. Returns: PowerData: An object containing all loaded power consumption data. """ @@ -216,7 +218,7 @@ def load_power_data( def load_adaptive_quality_data( - data_dir: str, + data_dir: str | Path, level: QualityLevel, ) -> LatencyData: """Load latency data for adaptive quality.""" diff --git a/simulator/model_allocator.py b/simulator/model_allocator.py index ab1c7e39..0f773a51 100644 --- a/simulator/model_allocator.py +++ b/simulator/model_allocator.py @@ -27,7 +27,7 @@ from models import UpscalerModelAllocation from models import OthersModelAllocation -from policies import NAIVE_POLICY +from model_provisioner.policies import NAIVE_POLICY class ModelAllocator(ABC): diff --git a/simulator/multirequests.py b/simulator/multirequests.py index 4fee5d55..a8d87a8b 100644 --- a/simulator/multirequests.py +++ b/simulator/multirequests.py @@ -18,7 +18,7 @@ from workflows import PODCAST_WORKFLOW -from policies import STREAMWISE_POLICY +from model_provisioner.policies import STREAMWISE_POLICY from auto_model_allocator import AutoModelAllocator diff --git a/simulator/provisioning.py b/simulator/provisioning.py index 43612b53..dd4f2a89 100644 --- a/simulator/provisioning.py +++ b/simulator/provisioning.py @@ -33,7 +33,7 @@ from auto_model_allocator import AutoModelAllocator -from policies import STREAMWISE_POLICY +from model_provisioner.policies import STREAMWISE_POLICY from constants import SECONDS_IN_HOUR diff --git a/streamwise/allocator_bridge.py b/streamwise/allocator_bridge.py new file mode 100644 index 00000000..b1e610d2 --- /dev/null +++ b/streamwise/allocator_bridge.py @@ -0,0 +1,250 @@ +""" +Bridge between the model provisioner's allocator output and StreamWise pod deployment. + +Translates ModelAllocation results (abstract Model enum + GPU counts) into concrete +container deployment parameters compatible with pod_manager.add_pod(). +""" + +from __future__ import annotations + +import os + +import model_provisioner # noqa: F401 — adds simulator/ to sys.path + +from dataclasses import dataclass +from typing import Optional + +from sim_types import GPUType +from sim_types import Model +from sim_types import Result + +from auto_model_allocator import AutoModelAllocator +from data_loading import load_latency_data +from model_provisioner.policies import STREAMWISE_POLICY +from workflows import WORKFLOWS + + +# Mapping from simulator Model enum to concrete container names used by pod_manager. +# Some Model entries map to multiple containers (e.g., OTHERS -> kokoro + yolo). +MODEL_TO_CONTAINERS: dict[Model, list[str]] = { + Model.GEMMA: ["gemma"], + Model.FLUX: ["flux"], + Model.HF: ["hunyuanframepackf1"], + Model.HF_VAE: ["hunyuanframepackvae"], + Model.FT: ["fantasytalking"], + Model.FT_VAE: [], # FT_VAE is handled within fantasytalking container + Model.UPSCALER: ["realesrgan"], + Model.OTHERS: ["kokoro", "yolo"], +} + +# Default CPU/memory/storage for each container when deployed via auto-deploy. +# Format: (cpu_cores, memory_gib, ephemeral_storage_gib) +CONTAINER_RESOURCES: dict[str, tuple[int, int, int]] = { + "gemma": (16, 192, 64), + "flux": (12, 128, 64), + "hunyuanframepackf1": (24, 128, 64), + "hunyuanframepackvae": (4, 32, 16), + "fantasytalking": (12, 192, 64), + "realesrgan": (4, 32, 16), + "kokoro": (2, 8, 16), + "yolo": (4, 8, 16), +} + +# GPU type string used by pod_manager (lowercase) +GPU_TYPE_TO_POD_STR: dict[GPUType, str] = { + GPUType.A100: "a100", + GPUType.H100: "h100", + GPUType.H200: "h200", + GPUType.GB200: "gb200", +} + +# MIG containers: these use a MIG slice instead of a full GPU +MIG_CONTAINERS: dict[str, str] = { + "kokoro": "1g.10gb", + "yolo": "1g.10gb", + "realesrgan": "1g.10gb", +} + +# Mapping from StreamWise app name to simulator workflow key +APP_TO_WORKFLOW: dict[str, str] = { + "streamcast": "podcast", + "streampersona": "slide", + "streamchat": "chat", + "streamshort": "short", + "streammovie": "movie", + "streamanimate": "story", + "streamlecture": "lecture", + "streamdub": "dubbing", + "streamedit": "editing", +} + + +@dataclass +class DeploymentSpec: + """A single container deployment specification.""" + container_name: str + cpu: int + memory_gib: int + ephemeral_storage_gib: int + gpu: int + gpu_type: Optional[str] + mig_profile: Optional[str] + + +@dataclass +class DeploymentPlan: + """Complete deployment plan produced by the auto-allocator.""" + specs: list[DeploymentSpec] + result: Result + workflow_name: str + gpu_budget: dict[str, int] + + +def _get_data_dir() -> str: + """Get the path to the simulator data directory.""" + default_path = os.path.join(os.path.dirname(__file__), "..", "simulator", "data") + return os.getenv("SIMULATOR_DATA_DIR", default_path) + + +def get_available_workflows() -> list[str]: + """Return list of available workflow names for the UI.""" + return list(APP_TO_WORKFLOW.keys()) + + +def get_available_gpu_types() -> list[str]: + """Return list of available GPU type strings for the UI.""" + return [gpu_type.value for gpu_type in GPUType] + + +def run_allocator( + gpu_budget: dict[str, int], + workflow_name: str, +) -> DeploymentPlan: + """ + Run the greedy model allocator and return a deployment plan. + + Args: + gpu_budget: GPU counts keyed by GPU type string (e.g., {"A100": 8, "H100": 0}). + workflow_name: StreamWise app name (e.g., "streamcast"). + + Returns: + DeploymentPlan with concrete container deployment specs. + + Raises: + ValueError: If workflow_name or GPU types are invalid. + """ + # Validate workflow + workflow_key = APP_TO_WORKFLOW.get(workflow_name) + if workflow_key is None: + raise ValueError( + f"Unknown workflow '{workflow_name}'. " + f"Available: {list(APP_TO_WORKFLOW.keys())}") + + workflow = WORKFLOWS[workflow_key] + + # Parse GPU budget into GPUType enum + num_gpus: dict[GPUType, int] = {} + for gpu_str, count in gpu_budget.items(): + try: + gpu_type = GPUType(gpu_str) + except ValueError: + raise ValueError( + f"Unknown GPU type '{gpu_str}'. " + f"Available: {[g.value for g in GPUType]}") + if count > 0: + num_gpus[gpu_type] = count + + if not num_gpus or sum(num_gpus.values()) < 8: + raise ValueError("Total GPU budget must be at least 8 GPUs.") + + # Load latency data and run allocator + data_dir = _get_data_dir() + latency_data = load_latency_data(data_dir=data_dir) + + allocator = AutoModelAllocator( + workflow=workflow, + latency_data=latency_data, + policy=STREAMWISE_POLICY, + ) + + result = allocator.allocate(num_gpus=num_gpus, verbose=False) + + # Convert result to deployment specs + specs = result_to_deployment_specs(result) + + return DeploymentPlan( + specs=specs, + result=result, + workflow_name=workflow_name, + gpu_budget=gpu_budget, + ) + + +def result_to_deployment_specs(result: Result) -> list[DeploymentSpec]: + """ + Convert an allocator Result into a list of DeploymentSpec objects. + + Each ModelAllocation with replicas > 0 is mapped to one or more container deployments. + """ + specs: list[DeploymentSpec] = [] + + for gpu_type, model_dict in result.models.items(): + gpu_type_str = GPU_TYPE_TO_POD_STR[gpu_type] + + for model, allocations in model_dict.items(): + containers = MODEL_TO_CONTAINERS.get(model, []) + if not containers: + continue + + for allocation in allocations: + if allocation.replicas <= 0: + continue + + for container_name in containers: + resources = CONTAINER_RESOURCES.get(container_name, (4, 16, 16)) + cpu, memory_gib, ephemeral_storage_gib = resources + + mig_profile = MIG_CONTAINERS.get(container_name) + gpu_count = allocation.devices if not mig_profile else 1 + + for _ in range(allocation.replicas): + specs.append(DeploymentSpec( + container_name=container_name, + cpu=cpu, + memory_gib=memory_gib, + ephemeral_storage_gib=ephemeral_storage_gib, + gpu=gpu_count, + gpu_type=gpu_type_str, + mig_profile=mig_profile, + )) + + return specs + + +def deployment_plan_to_json(plan: DeploymentPlan) -> dict: + """Serialize a DeploymentPlan to a JSON-friendly dict.""" + return { + "workflow_name": plan.workflow_name, + "gpu_budget": plan.gpu_budget, + "metrics": { + "total_time_s": round(plan.result.total_time_s, 2), + "ttff_s": round(plan.result.ttff_s, 2), + "cost": round(plan.result.cost, 4), + "gpus_used": { + gpu_type.value: count + for gpu_type, count in plan.result.gpus_used.items() + }, + }, + "specs": [ + { + "container_name": spec.container_name, + "cpu": spec.cpu, + "memory_gib": spec.memory_gib, + "ephemeral_storage_gib": spec.ephemeral_storage_gib, + "gpu": spec.gpu, + "gpu_type": spec.gpu_type, + "mig_profile": spec.mig_profile, + } + for spec in plan.specs + ], + } diff --git a/streamwise/model_provisioner/__init__.py b/streamwise/model_provisioner/__init__.py new file mode 100644 index 00000000..c79b0cde --- /dev/null +++ b/streamwise/model_provisioner/__init__.py @@ -0,0 +1,15 @@ +""" +Model Provisioner — allocation policy implementations for GPU resource distribution. + +Contains greedy, naive, MILP, HexGen, and Helix allocation strategies. +The foundation types (sim_types, constants, models, etc.) live in simulator/. +""" +import os +import sys + +# Add simulator/ to sys.path so policy files can import foundation modules. +_SIMULATOR_DIR = os.path.normpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "simulator") +) +if _SIMULATOR_DIR not in sys.path: + sys.path.insert(0, _SIMULATOR_DIR) diff --git a/simulator/greedy.py b/streamwise/model_provisioner/greedy.py similarity index 99% rename from simulator/greedy.py rename to streamwise/model_provisioner/greedy.py index 459742e5..8c1a1dd0 100644 --- a/simulator/greedy.py +++ b/streamwise/model_provisioner/greedy.py @@ -33,9 +33,9 @@ from model_allocator import ModelAllocator -from policies import STREAMWISE_POLICY -from policies import MAX_ITERATIONS -from policies import USE_ALL_GPUS +from .policies import STREAMWISE_POLICY +from .policies import MAX_ITERATIONS +from .policies import USE_ALL_GPUS from actions import gen_actions from actions import choose_action diff --git a/simulator/helix.py b/streamwise/model_provisioner/helix.py similarity index 99% rename from simulator/helix.py rename to streamwise/model_provisioner/helix.py index 5891538f..e8fededf 100644 --- a/simulator/helix.py +++ b/streamwise/model_provisioner/helix.py @@ -43,10 +43,10 @@ from evaluator import evaluate_model_allocation -from milp import MILPAllocator +from .milp import MILPAllocator -from policies import HELIX_POLICY -from policies import MAX_DEVICES +from .policies import HELIX_POLICY +from .policies import MAX_DEVICES from constants import DEVICE_OPTIONS diff --git a/simulator/hexgen.py b/streamwise/model_provisioner/hexgen.py similarity index 99% rename from simulator/hexgen.py rename to streamwise/model_provisioner/hexgen.py index 64c64160..4f37768a 100644 --- a/simulator/hexgen.py +++ b/streamwise/model_provisioner/hexgen.py @@ -30,15 +30,15 @@ from evaluator import calc_used_gpus from evaluator import evaluate_model_allocation -from greedy import GreedyAllocator +from .greedy import GreedyAllocator from actions import gen_actions from actions import choose_action from actions import apply_action -from policies import HEXGEN_POLICY -from policies import MAX_ITERATIONS -from policies import USE_ALL_GPUS +from .policies import HEXGEN_POLICY +from .policies import MAX_ITERATIONS +from .policies import USE_ALL_GPUS def _get_model_order(workflow: WorkflowConfig) -> list[Model]: diff --git a/simulator/milp.py b/streamwise/model_provisioner/milp.py similarity index 99% rename from simulator/milp.py rename to streamwise/model_provisioner/milp.py index 7a84e754..67749258 100644 --- a/simulator/milp.py +++ b/streamwise/model_provisioner/milp.py @@ -40,7 +40,7 @@ from constants import NUM_GPUS_PER_SERVER from constants import SECONDS_IN_HOUR -from policies import STREAMWISE_MILP_POLICY +from .policies import STREAMWISE_MILP_POLICY MAX_INSTANCES = 16 diff --git a/simulator/naive_baseline.py b/streamwise/model_provisioner/naive_baseline.py similarity index 99% rename from simulator/naive_baseline.py rename to streamwise/model_provisioner/naive_baseline.py index 9f9c550c..ec95904e 100644 --- a/simulator/naive_baseline.py +++ b/streamwise/model_provisioner/naive_baseline.py @@ -31,8 +31,8 @@ from evaluator import evaluate_model_allocation -from policies import NAIVE_POLICY -from policies import MAX_DEVICES +from .policies import NAIVE_POLICY +from .policies import MAX_DEVICES from model_allocator import ModelAllocator diff --git a/simulator/policies.py b/streamwise/model_provisioner/policies.py similarity index 100% rename from simulator/policies.py rename to streamwise/model_provisioner/policies.py diff --git a/streamwise/streamwise.py b/streamwise/streamwise.py index 1c63eacf..0ce24ac5 100644 --- a/streamwise/streamwise.py +++ b/streamwise/streamwise.py @@ -34,6 +34,7 @@ import pod_manager import node_manager import job_manager +import allocator_bridge from service_manager import get_services from service_manager import get_service_timestamps @@ -726,6 +727,123 @@ async def api_add_pod() -> QuartReturn: return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR +@route("/api/auto_deploy", methods=["POST"]) +async def api_auto_deploy() -> QuartReturn: + """Run the model allocator to produce an optimized deployment plan. + + Expects JSON body: + { + "gpu_budget": {"A100": 8, "H100": 0, ...}, + "workflow": "streamcast" + } + + Returns the deployment plan with estimated metrics and per-container specs. + """ + try: + data = await request.get_json() + if not data: + return jsonify({"error": "Request body must be JSON"}), HTTPStatus.BAD_REQUEST + + gpu_budget = data.get("gpu_budget") + workflow_name = data.get("workflow") + + if not gpu_budget or not isinstance(gpu_budget, dict): + return jsonify({"error": "Missing or invalid 'gpu_budget' field"}), HTTPStatus.BAD_REQUEST + if not workflow_name or not isinstance(workflow_name, str): + return jsonify({"error": "Missing or invalid 'workflow' field"}), HTTPStatus.BAD_REQUEST + + plan = allocator_bridge.run_allocator( + gpu_budget=gpu_budget, + workflow_name=workflow_name, + ) + return jsonify(allocator_bridge.deployment_plan_to_json(plan)), HTTPStatus.OK + + except ValueError as ve: + return jsonify({"error": str(ve)}), HTTPStatus.BAD_REQUEST + except Exception as ex: + logging.exception("Error in auto_deploy: %s", ex) + return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR + + +@route("/api/auto_deploy/confirm", methods=["POST"]) +async def api_auto_deploy_confirm() -> QuartReturn: + """Execute a deployment plan produced by /api/auto_deploy. + + Expects JSON body: + { + "specs": [ + { + "container_name": "gemma", + "cpu": 16, + "memory_gib": 192, + "ephemeral_storage_gib": 64, + "gpu": 2, + "gpu_type": "a100", + "mig_profile": null + }, + ... + ] + } + + Deploys all containers in the plan. + """ + try: + data = await request.get_json() + if not data: + return jsonify({"error": "Request body must be JSON"}), HTTPStatus.BAD_REQUEST + + specs = data.get("specs") + if not specs or not isinstance(specs, list): + return jsonify({"error": "Missing or invalid 'specs' field"}), HTTPStatus.BAD_REQUEST + + deployed: List[str] = [] + errors: List[str] = [] + + for spec in specs: + container_name = spec.get("container_name") + if not container_name: + errors.append("Spec missing 'container_name'") + continue + + try: + await pod_manager.add_pod( + container_name=container_name, + cpu=int(spec.get("cpu", 4)), + memory_gib=int(spec.get("memory_gib", 16)), + ephemeral_storage_gib=int(spec.get("ephemeral_storage_gib", 16)), + gpu=int(spec.get("gpu", 0)), + gpu_type=spec.get("gpu_type"), + mig_profile=spec.get("mig_profile"), + namespace=NAMESPACE, + k8s_cluster=k8s_cluster, + ) + deployed.append(container_name) + except Exception as pod_ex: + msg = f"Failed to deploy '{container_name}': {pod_ex}" + logging.error(msg) + errors.append(msg) + + status = HTTPStatus.OK if not errors else HTTPStatus.MULTI_STATUS + return jsonify({ + "deployed": deployed, + "errors": errors, + "message": f"Deployed {len(deployed)}/{len(specs)} containers.", + }), status + + except Exception as ex: + logging.exception("Error in auto_deploy/confirm: %s", ex) + return jsonify({"error": str(ex)}), HTTPStatus.INTERNAL_SERVER_ERROR + + +@route("/api/auto_deploy/workflows", methods=["GET"]) +async def api_auto_deploy_workflows() -> QuartReturn: + """Return available workflows and GPU types for the auto-deploy UI.""" + return jsonify({ + "workflows": allocator_bridge.get_available_workflows(), + "gpu_types": allocator_bridge.get_available_gpu_types(), + }), HTTPStatus.OK + + @route("/api/node/", methods=["DELETE"]) async def api_remove_node(node_name: str) -> QuartReturn: return await node_manager.remove_node( diff --git a/streamwise/templates/add_pod.html b/streamwise/templates/add_pod.html index d61952aa..f5496e10 100644 --- a/streamwise/templates/add_pod.html +++ b/streamwise/templates/add_pod.html @@ -384,6 +384,94 @@

🧩 Applications

{% endif %} + +

🤖 Auto Deploy

+

Specify your GPU budget and the optimizer will determine the best allocation for each component:

+ +
+
+ + 💰 GPU Budget + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ +
+ + 🎬 Workflow + +
+ + +
+
+ +
+ +
+
+ + + + + +