diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 43ef65efacb..09ac2646d74 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -449,6 +449,16 @@ def _to_job(self, node): # Don't set GPU limits if gpu isn't specified. if k8s_deco.attributes["gpu"] is not None }, + **{ + "aws.amazon.com/neuron": str(k8s_deco.attributes["trainium"]) + for k in [0] + if k8s_deco.attributes.get("trainium") is not None + }, + **{ + "vpc.amazonaws.com/efa": str(k8s_deco.attributes["efa"]) + for k in [0] + if k8s_deco.attributes.get("efa") is not None + }, }, ) @@ -501,6 +511,18 @@ def _to_job(self, node): retry_exponential_backoff=False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will. reattach_on_restart=False, secrets=[], + tolerations=( + [ + { + "key": "aws.amazon.com/neuron", + "operator": "Exists", + "effect": "NoSchedule", + } + ] + if k8s_deco.attributes.get("trainium") is not None + else [] + ) + + (k8s_deco.attributes.get("tolerations") or []), ) k8s_operator_args["in_cluster"] = True if AIRFLOW_KUBERNETES_CONN_ID is not None: diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index b49c0f252f6..075ff49b5f5 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -2636,6 +2636,8 @@ def _container_templates(self): disk=str(resources["disk"]), gpu=resources["gpu"], gpu_vendor=str(resources["gpu_vendor"]), + trainium=resources.get("trainium"), + efa=resources.get("efa"), tolerations=resources["tolerations"], use_tmpfs=use_tmpfs, tmpfs_tempdir=tmpfs_tempdir, @@ -2800,7 +2802,20 @@ def _container_templates(self): # Set node selectors .node_selectors(resources.get("node_selector")) # Set tolerations - .tolerations(resources.get("tolerations")) + .tolerations( + (resources.get("tolerations") or []) + + ( + [ + { + "key": "aws.amazon.com/neuron", + "operator": "Exists", + "effect": "NoSchedule", + } + ] + if resources.get("trainium") is not None + else [] + ) + ) # Set image pull secrets if present. We need to use pod_spec_patch due to Argo not supporting this on a template level. .pod_spec_patch( { @@ -2874,6 +2889,20 @@ def _container_templates(self): for k in [0] if resources["gpu"] is not None }, + **{ + "aws.amazon.com/neuron": str( + resources["trainium"] + ) + for k in [0] + if resources.get("trainium") is not None + }, + **{ + "vpc.amazonaws.com/efa": str( + resources["efa"] + ) + for k in [0] + if resources.get("efa") is not None + }, }, ), # Configure secrets diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index c19b3efe3b9..83fa17767c6 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -181,6 +181,8 @@ def create_jobset( cpu=None, gpu=None, gpu_vendor=None, + trainium=None, + efa=None, disk=None, memory=None, use_tmpfs=None, @@ -215,6 +217,8 @@ def create_jobset( disk=disk, gpu=gpu, gpu_vendor=gpu_vendor, + trainium=trainium, + efa=efa, timeout_in_seconds=run_time_limit, # Retries are handled by Metaflow runtime retries=0, @@ -482,6 +486,8 @@ def create_job_object( cpu=None, gpu=None, gpu_vendor=None, + trainium=None, + efa=None, disk=None, memory=None, use_tmpfs=None, @@ -528,6 +534,8 @@ def create_job_object( disk=disk, gpu=gpu, gpu_vendor=gpu_vendor, + trainium=trainium, + efa=efa, timeout_in_seconds=run_time_limit, # Retries are handled by Metaflow runtime retries=0, diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index e15f7b06cb9..3237fd6d4e3 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -89,6 +89,14 @@ def kubernetes(): @click.option("--memory", help="Memory requirement for Kubernetes pod.") @click.option("--gpu", help="GPU requirement for Kubernetes pod.") @click.option("--gpu-vendor", help="GPU vendor requirement for Kubernetes pod.") +@click.option( + "--trainium", + help="AWS Trainium/Inferentia Neuron device requirement for Kubernetes pod.", +) +@click.option( + "--efa", + help="Number of Elastic Fabric Adapter network interfaces for Kubernetes pod.", +) @click.option("--run-id", help="Passed to the top-level 'step'.") @click.option("--task-id", help="Passed to the top-level 'step'.") @click.option("--input-paths", help="Passed to the top-level 'step'.") @@ -178,6 +186,8 @@ def step( memory=None, gpu=None, gpu_vendor=None, + trainium=None, + efa=None, use_tmpfs=None, tmpfs_tempdir=None, tmpfs_size=None, @@ -323,6 +333,8 @@ def _sync_metadata(): memory=memory, gpu=gpu, gpu_vendor=gpu_vendor, + trainium=trainium, + efa=efa, use_tmpfs=use_tmpfs, tmpfs_tempdir=tmpfs_tempdir, tmpfs_size=tmpfs_size, diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index bd3ae7e12c4..c33feeae8f7 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -96,6 +96,20 @@ class KubernetesDecorator(StepDecorator): the scheduled node should not have GPUs. gpu_vendor : str, default KUBERNETES_GPU_VENDOR The vendor of the GPUs to be used for this step. + trainium : int, optional, default None + Number of AWS Trainium / Inferentia Neuron devices required for this + step. Maps to the `aws.amazon.com/neuron` Kubernetes resource managed + by the AWS Neuron device plugin -- same resource regardless of whether + the underlying chip is Trainium or Inferentia, since they share the + device-plugin / AMI / runtime stack. + inferentia : int, optional, default None + Alias for `trainium`. Use only one of the two. Provided for API + consistency with `@batch(inferentia=...)`. + efa : int, optional, default None + Number of AWS Elastic Fabric Adapter network interfaces required for + this step. Maps to the `vpc.amazonaws.com/efa` Kubernetes resource + managed by the AWS EFA device plugin. Only valid on EFA-capable + instance types where the pool was provisioned with EFA NICs. tolerations : List[Dict[str,str]], default [] The default is extracted from METAFLOW_KUBERNETES_TOLERATIONS. Kubernetes tolerations to use when launching pod in Kubernetes. @@ -152,6 +166,9 @@ class KubernetesDecorator(StepDecorator): "namespace": None, "gpu": None, # value of 0 implies that the scheduled node should not have GPUs "gpu_vendor": None, + "trainium": None, # number of AWS Trainium/Inferentia Neuron devices + "inferentia": None, # alias for trainium; both map to aws.amazon.com/neuron + "efa": None, # number of Elastic Fabric Adapter network interfaces "tolerations": None, # e.g., [{"key": "arch", "operator": "Equal", "value": "amd"}, # {"key": "foo", "operator": "Equal", "value": "bar"}] "labels": None, # e.g. {"test-label": "value", "another-label":"value2"} @@ -382,6 +399,33 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge max(float(my_val or 0), float(v or 0)) ) + # Alias inferentia to trainium and check that both are not in use. + # `trainium` is canonical on @kubernetes (the underlying Neuron device + # plugin advertises a single `aws.amazon.com/neuron` resource for both + # chip families). `inferentia` is provided for API consistency with + # `@batch(inferentia=...)` -- it collapses into `trainium` here. + if ( + self.attributes.get("inferentia") is not None + and self.attributes.get("trainium") is not None + ): + raise KubernetesException( + "only specify a value for 'inferentia' or 'trainium', not both." + ) + if self.attributes.get("inferentia") is not None: + self.attributes["trainium"] = self.attributes["inferentia"] + self.attributes["inferentia"] = None + + # Validate mutually exclusive: gpu and trainium cannot both be set. + if ( + self.attributes["trainium"] is not None + and self.attributes["gpu"] is not None + ): + raise KubernetesException( + "Cannot specify both 'gpu' and 'trainium' for step *{step}*.".format( + step=step + ) + ) + # Check GPU vendor. if self.attributes["gpu_vendor"].lower() not in ("amd", "nvidia"): raise KubernetesException( @@ -412,6 +456,28 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge ) ) + if self.attributes["trainium"] is not None and not ( + isinstance(self.attributes["trainium"], (int, unicode, basestring)) + and float(self.attributes["trainium"]).is_integer() + and int(float(self.attributes["trainium"])) > 0 + ): + raise KubernetesException( + "Invalid trainium value *{}* for step *{step}*; it should be a positive integer".format( + self.attributes["trainium"], step=step + ) + ) + + if self.attributes["efa"] is not None and not ( + isinstance(self.attributes["efa"], (int, unicode, basestring)) + and float(self.attributes["efa"]).is_integer() + and int(float(self.attributes["efa"])) > 0 + ): + raise KubernetesException( + "Invalid efa value *{}* for step *{step}*; it should be a positive integer".format( + self.attributes["efa"], step=step + ) + ) + if self.attributes["tmpfs_size"]: if not ( isinstance(self.attributes["tmpfs_size"], (int, unicode, basestring)) diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index b81777bcc7b..eae406a3985 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -182,6 +182,20 @@ def create_job_spec(self): # Don't set GPU limits if gpu isn't specified. if self._kwargs["gpu"] is not None }, + **{ + "aws.amazon.com/neuron": str( + self._kwargs["trainium"] + ) + for k in [0] + if self._kwargs.get("trainium") is not None + }, + **{ + "vpc.amazonaws.com/efa": str( + self._kwargs["efa"] + ) + for k in [0] + if self._kwargs.get("efa") is not None + }, }, ), volume_mounts=( @@ -236,7 +250,18 @@ def create_job_spec(self): tolerations=[ client.V1Toleration(**toleration) for toleration in self._kwargs.get("tolerations") or [] - ], + ] + + ( + [ + client.V1Toleration( + key="aws.amazon.com/neuron", + operator="Exists", + effect="NoSchedule", + ) + ] + if self._kwargs.get("trainium") is not None + else [] + ), volumes=( [ client.V1Volume( diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index da0f0fc3130..9b814de422b 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -679,6 +679,22 @@ def dump(self): # Don't set GPU limits if gpu isn't specified. if self._kwargs["gpu"] is not None }, + **{ + "aws.amazon.com/neuron": str( + self._kwargs["trainium"] + ) + for k in [0] + if self._kwargs.get("trainium") + is not None + }, + **{ + "vpc.amazonaws.com/efa": str( + self._kwargs["efa"] + ) + for k in [0] + if self._kwargs.get("efa") + is not None + }, }, ), volume_mounts=( @@ -740,7 +756,18 @@ def dump(self): client.V1Toleration(**toleration) for toleration in self._kwargs.get("tolerations") or [] - ], + ] + + ( + [ + client.V1Toleration( + key="aws.amazon.com/neuron", + operator="Exists", + effect="NoSchedule", + ) + ] + if self._kwargs.get("trainium") is not None + else [] + ), volumes=( [ client.V1Volume(