Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ def _to_job(self, node):
# Don't set GPU limits if gpu isn't specified.
if k8s_deco.attributes["gpu"] is not None
},
**{
"aws.amazon.com/neuron": str(k8s_deco.attributes["trainium"])
for k in [0]
if k8s_deco.attributes.get("trainium") is not None
},
Comment thread
emattia marked this conversation as resolved.
**{
"vpc.amazonaws.com/efa": str(k8s_deco.attributes["efa"])
for k in [0]
if k8s_deco.attributes.get("efa") is not None
},
},
)

Expand Down Expand Up @@ -501,6 +511,18 @@ def _to_job(self, node):
retry_exponential_backoff=False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will.
reattach_on_restart=False,
secrets=[],
tolerations=(
[
{
"key": "aws.amazon.com/neuron",
"operator": "Exists",
"effect": "NoSchedule",
}
]
if k8s_deco.attributes.get("trainium") is not None
else []
)
+ (k8s_deco.attributes.get("tolerations") or []),
)
k8s_operator_args["in_cluster"] = True
if AIRFLOW_KUBERNETES_CONN_ID is not None:
Expand Down
31 changes: 30 additions & 1 deletion metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2636,6 +2636,8 @@ def _container_templates(self):
disk=str(resources["disk"]),
gpu=resources["gpu"],
gpu_vendor=str(resources["gpu_vendor"]),
trainium=resources.get("trainium"),
efa=resources.get("efa"),
tolerations=resources["tolerations"],
use_tmpfs=use_tmpfs,
tmpfs_tempdir=tmpfs_tempdir,
Expand Down Expand Up @@ -2800,7 +2802,20 @@ def _container_templates(self):
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
.tolerations(
(resources.get("tolerations") or [])
+ (
[
{
"key": "aws.amazon.com/neuron",
"operator": "Exists",
"effect": "NoSchedule",
}
]
if resources.get("trainium") is not None
else []
)
)
# Set image pull secrets if present. We need to use pod_spec_patch due to Argo not supporting this on a template level.
.pod_spec_patch(
{
Expand Down Expand Up @@ -2874,6 +2889,20 @@ def _container_templates(self):
for k in [0]
if resources["gpu"] is not None
},
**{
"aws.amazon.com/neuron": str(
resources["trainium"]
)
for k in [0]
if resources.get("trainium") is not None
},
**{
"vpc.amazonaws.com/efa": str(
resources["efa"]
)
for k in [0]
if resources.get("efa") is not None
},
},
),
# Configure secrets
Expand Down
8 changes: 8 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def create_jobset(
cpu=None,
gpu=None,
gpu_vendor=None,
trainium=None,
efa=None,
disk=None,
memory=None,
use_tmpfs=None,
Expand Down Expand Up @@ -215,6 +217,8 @@ def create_jobset(
disk=disk,
gpu=gpu,
gpu_vendor=gpu_vendor,
trainium=trainium,
efa=efa,
timeout_in_seconds=run_time_limit,
# Retries are handled by Metaflow runtime
retries=0,
Expand Down Expand Up @@ -482,6 +486,8 @@ def create_job_object(
cpu=None,
gpu=None,
gpu_vendor=None,
trainium=None,
efa=None,
disk=None,
memory=None,
use_tmpfs=None,
Expand Down Expand Up @@ -528,6 +534,8 @@ def create_job_object(
disk=disk,
gpu=gpu,
gpu_vendor=gpu_vendor,
trainium=trainium,
efa=efa,
timeout_in_seconds=run_time_limit,
# Retries are handled by Metaflow runtime
retries=0,
Expand Down
12 changes: 12 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def kubernetes():
@click.option("--memory", help="Memory requirement for Kubernetes pod.")
@click.option("--gpu", help="GPU requirement for Kubernetes pod.")
@click.option("--gpu-vendor", help="GPU vendor requirement for Kubernetes pod.")
@click.option(
"--trainium",
help="AWS Trainium/Inferentia Neuron device requirement for Kubernetes pod.",
)
@click.option(
"--efa",
help="Number of Elastic Fabric Adapter network interfaces for Kubernetes pod.",
)
@click.option("--run-id", help="Passed to the top-level 'step'.")
@click.option("--task-id", help="Passed to the top-level 'step'.")
@click.option("--input-paths", help="Passed to the top-level 'step'.")
Expand Down Expand Up @@ -178,6 +186,8 @@ def step(
memory=None,
gpu=None,
gpu_vendor=None,
trainium=None,
efa=None,
use_tmpfs=None,
tmpfs_tempdir=None,
tmpfs_size=None,
Expand Down Expand Up @@ -323,6 +333,8 @@ def _sync_metadata():
memory=memory,
gpu=gpu,
gpu_vendor=gpu_vendor,
trainium=trainium,
efa=efa,
use_tmpfs=use_tmpfs,
tmpfs_tempdir=tmpfs_tempdir,
tmpfs_size=tmpfs_size,
Expand Down
66 changes: 66 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ class KubernetesDecorator(StepDecorator):
the scheduled node should not have GPUs.
gpu_vendor : str, default KUBERNETES_GPU_VENDOR
The vendor of the GPUs to be used for this step.
trainium : int, optional, default None
Number of AWS Trainium / Inferentia Neuron devices required for this
step. Maps to the `aws.amazon.com/neuron` Kubernetes resource managed
by the AWS Neuron device plugin -- same resource regardless of whether
the underlying chip is Trainium or Inferentia, since they share the
device-plugin / AMI / runtime stack.
inferentia : int, optional, default None
Alias for `trainium`. Use only one of the two. Provided for API
consistency with `@batch(inferentia=...)`.
efa : int, optional, default None
Number of AWS Elastic Fabric Adapter network interfaces required for
this step. Maps to the `vpc.amazonaws.com/efa` Kubernetes resource
managed by the AWS EFA device plugin. Only valid on EFA-capable
instance types where the pool was provisioned with EFA NICs.
tolerations : List[Dict[str,str]], default []
The default is extracted from METAFLOW_KUBERNETES_TOLERATIONS.
Kubernetes tolerations to use when launching pod in Kubernetes.
Expand Down Expand Up @@ -152,6 +166,9 @@ class KubernetesDecorator(StepDecorator):
"namespace": None,
"gpu": None, # value of 0 implies that the scheduled node should not have GPUs
"gpu_vendor": None,
"trainium": None, # number of AWS Trainium/Inferentia Neuron devices
"inferentia": None, # alias for trainium; both map to aws.amazon.com/neuron
"efa": None, # number of Elastic Fabric Adapter network interfaces
"tolerations": None, # e.g., [{"key": "arch", "operator": "Equal", "value": "amd"},
# {"key": "foo", "operator": "Equal", "value": "bar"}]
"labels": None, # e.g. {"test-label": "value", "another-label":"value2"}
Expand Down Expand Up @@ -382,6 +399,33 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
max(float(my_val or 0), float(v or 0))
)

# Alias inferentia to trainium and check that both are not in use.
# `trainium` is canonical on @kubernetes (the underlying Neuron device
# plugin advertises a single `aws.amazon.com/neuron` resource for both
# chip families). `inferentia` is provided for API consistency with
# `@batch(inferentia=...)` -- it collapses into `trainium` here.
if (
self.attributes.get("inferentia") is not None
and self.attributes.get("trainium") is not None
):
raise KubernetesException(
"only specify a value for 'inferentia' or 'trainium', not both."
)
if self.attributes.get("inferentia") is not None:
self.attributes["trainium"] = self.attributes["inferentia"]
self.attributes["inferentia"] = None

# Validate mutually exclusive: gpu and trainium cannot both be set.
if (
self.attributes["trainium"] is not None
and self.attributes["gpu"] is not None
):
raise KubernetesException(
"Cannot specify both 'gpu' and 'trainium' for step *{step}*.".format(
step=step
)
)

# Check GPU vendor.
if self.attributes["gpu_vendor"].lower() not in ("amd", "nvidia"):
raise KubernetesException(
Expand Down Expand Up @@ -412,6 +456,28 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
)
)

if self.attributes["trainium"] is not None and not (
isinstance(self.attributes["trainium"], (int, unicode, basestring))
and float(self.attributes["trainium"]).is_integer()
and int(float(self.attributes["trainium"])) > 0
):
raise KubernetesException(
"Invalid trainium value *{}* for step *{step}*; it should be a positive integer".format(
self.attributes["trainium"], step=step
)
)

if self.attributes["efa"] is not None and not (
isinstance(self.attributes["efa"], (int, unicode, basestring))
and float(self.attributes["efa"]).is_integer()
and int(float(self.attributes["efa"])) > 0
):
raise KubernetesException(
"Invalid efa value *{}* for step *{step}*; it should be a positive integer".format(
self.attributes["efa"], step=step
)
)
Comment thread
emattia marked this conversation as resolved.

if self.attributes["tmpfs_size"]:
if not (
isinstance(self.attributes["tmpfs_size"], (int, unicode, basestring))
Expand Down
27 changes: 26 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ def create_job_spec(self):
# Don't set GPU limits if gpu isn't specified.
if self._kwargs["gpu"] is not None
},
**{
"aws.amazon.com/neuron": str(
self._kwargs["trainium"]
)
for k in [0]
if self._kwargs.get("trainium") is not None
},
**{
"vpc.amazonaws.com/efa": str(
self._kwargs["efa"]
)
for k in [0]
if self._kwargs.get("efa") is not None
},
},
),
volume_mounts=(
Expand Down Expand Up @@ -236,7 +250,18 @@ def create_job_spec(self):
tolerations=[
client.V1Toleration(**toleration)
for toleration in self._kwargs.get("tolerations") or []
],
]
+ (
[
client.V1Toleration(
key="aws.amazon.com/neuron",
operator="Exists",
effect="NoSchedule",
)
]
if self._kwargs.get("trainium") is not None
else []
),
volumes=(
[
client.V1Volume(
Expand Down
29 changes: 28 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,22 @@ def dump(self):
# Don't set GPU limits if gpu isn't specified.
if self._kwargs["gpu"] is not None
},
**{
"aws.amazon.com/neuron": str(
self._kwargs["trainium"]
)
for k in [0]
if self._kwargs.get("trainium")
is not None
},
**{
"vpc.amazonaws.com/efa": str(
self._kwargs["efa"]
)
for k in [0]
if self._kwargs.get("efa")
is not None
},
},
),
volume_mounts=(
Expand Down Expand Up @@ -740,7 +756,18 @@ def dump(self):
client.V1Toleration(**toleration)
for toleration in self._kwargs.get("tolerations")
or []
],
]
+ (
[
client.V1Toleration(
key="aws.amazon.com/neuron",
operator="Exists",
effect="NoSchedule",
)
]
if self._kwargs.get("trainium") is not None
else []
),
volumes=(
[
client.V1Volume(
Expand Down
Loading