From 681760faa80a32adcbc118998380be23e292bb07 Mon Sep 17 00:00:00 2001 From: Yuan Chen Date: Tue, 23 Jun 2026 13:02:54 -0700 Subject: [PATCH] feat(validators): RoCE NET variant for nccl-all-reduce-bw (AICR_NCCL_FABRIC), default EFA MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a ConnectX RoCE path to the nccl-all-reduce-bw-net test, selected by AICR_NCCL_FABRIC=roce (default efa — every existing EFA recipe is unchanged). Fabric-keyed and accelerator-agnostic: the RoCE NET template lives at testdata/roce/{service}/ and is shared across EKS RoCE nodes rather than per-accelerator, so it does not depend on any specific accelerator constant. When fabric=roce, the NET test: - uses NCCL's built-in IB/verbs transport over the ConnectX RoCE devices (NCCL_IB_HCA=rocep, NCCL_NET_PLUGIN=none to bypass the bundled aws-ofi EFA plugin), on a CUDA-13 pytorch image (sm_103-capable; the EFA hpc-cloud image is CUDA 12 + EFA-only); - claims RoCE NICs via a DRA ResourceClaimTemplate (roce.networking.k8s.aws) instead of the vpc.amazonaws.com/efa extended resource; - disables HPC-X UCC/HCOLL and forces the ob1 PML over TCP (their team-create / UCX PML segfault during MPI_Init on this image — MPI is only bootstrap). The transport assertion (verifyTransportFromLogs) is unchanged: it already accepts any non-Socket NET plugin, so 'Using network IB' passes. AICR_NCCL_FABRIC is forwarded to the NET check pod via buildEnv (the test runs in-Job), scoped to nccl-all-reduce-bw-net. Validated end-to-end on a ConnectX-RoCE GB300 cluster: NCCL IB over 8 rocep* RoCE devices (GPUDirect RDMA), ~387 GB/s peak busbw. The env knob is interim; snapshot-based fabric auto-detection is tracked in #1413. Refs #1413, #1410. --- docs/user/validation.md | 14 +- pkg/validator/catalog/catalog_test.go | 22 ++ pkg/validator/v1/job_plan_internal.go | 27 +- pkg/validator/v1/job_plan_test.go | 102 ++++++++ recipes/validators/README.md | 2 +- recipes/validators/catalog.yaml | 2 +- .../nccl_all_reduce_bw_constraint.go | 187 +++++++++++--- .../performance/nccl_roce_apply_test.go | 161 ++++++++++++ validators/performance/nccl_test.go | 86 ++++++- .../testdata/roce/eks/roce-claim.yaml | 34 +++ .../testdata/roce/eks/runtime-net.yaml | 237 ++++++++++++++++++ 11 files changed, 839 insertions(+), 35 deletions(-) create mode 100644 validators/performance/nccl_roce_apply_test.go create mode 100644 validators/performance/testdata/roce/eks/roce-claim.yaml create mode 100644 validators/performance/testdata/roce/eks/runtime-net.yaml diff --git a/docs/user/validation.md b/docs/user/validation.md index 7655adadd..65c05387d 100644 --- a/docs/user/validation.md +++ b/docs/user/validation.md @@ -48,9 +48,21 @@ ones) that match the target fabric: | Check | Transport | When it's selected | |---|---|---| | `nccl-all-reduce-bw` | Auto-detect (whatever NCCL picks) | H100/H200 on EKS, H100 on GKE, and B200/GB200 on self-managed clusters (`service=any`). Preserves the pre-variant behavior. | -| `nccl-all-reduce-bw-net` | NET (EFA on EKS) | GB200 + EKS. Asserts EFA actually carried traffic — catches silent fallback to Socket when the NVIDIA driver is missing `NVreg_GrdmaPciTopoCheckOverride=1`. | +| `nccl-all-reduce-bw-net` | NET (EFA on EKS by default; ConnectX RoCE via `AICR_NCCL_FABRIC=roce`) | GB200 + EKS. Asserts EFA actually carried traffic — catches silent fallback to Socket when the NVIDIA driver is missing `NVreg_GrdmaPciTopoCheckOverride=1`. | | `nccl-all-reduce-bw-nvls` | NVLS (MNNVL across an NVL72 IMEX domain) | GB200 + EKS, and GB200 + OKE. Asserts the NVLS communicator actually initialized — catches silent fallback to EFA (EKS) or Socket (OKE) when the IMEX domain is misconfigured. | +The `-net` check defaults to the AWS EFA fabric. On a ConnectX **RoCE** cluster +(e.g. DGXC GB300 `p6e-gb300r`), set `AICR_NCCL_FABRIC=roce` in the `aicr +validate` environment to run the NET test over NCCL's built-in IB/verbs +transport across `roce.networking.k8s.aws` DRA devices instead. The value is +scoped to the `-net` check only; unset (or `efa`) leaves every existing recipe +on the EFA path unchanged, and any other value is rejected. The RoCE runtime +image installs `openssh-server` at startup, so the GPU nodes need apt egress; +on an air-gapped cluster the RoCE NET test cannot bootstrap. This env override is +interim — snapshot-based fabric auto-detection (and removing the runtime +package install once a CUDA-13 image ships sshd) is tracked in +[NVIDIA/aicr#1413](https://github.com/NVIDIA/aicr/issues/1413). + GB200/EKS recipes (both `training` and `inference` intents) enable `-net` and `-nvls` together rather than the auto-detect variant, because those nodes expose two inter-node fabrics simultaneously and a single auto-detect test diff --git a/pkg/validator/catalog/catalog_test.go b/pkg/validator/catalog/catalog_test.go index 65a65e2d6..478f05223 100644 --- a/pkg/validator/catalog/catalog_test.go +++ b/pkg/validator/catalog/catalog_test.go @@ -1140,6 +1140,28 @@ func TestEmbeddedCatalog_InferenceGatewayEntryExists(t *testing.T) { t.Fatalf("no embedded catalog entry named %q (AICR_REQUIRE_SCOPED_INFERENCE_GATEWAY forwarding would silently no-op)", v1.InferenceGatewayCheckName) } +// TestEmbeddedCatalog_NCCLAllReduceBWNetEntryExists locks the embedded catalog +// entry name to v1.NCCLAllReduceBWNetCheckName, which scopes AICR_NCCL_FABRIC +// forwarding (see buildEnv in pkg/validator/v1). Without this, renaming the +// "nccl-all-reduce-bw-net" catalog entry would silently disable RoCE-fabric +// forwarding — the in-Job validator would never see the env and default to EFA +// — with no other test failing. +func TestEmbeddedCatalog_NCCLAllReduceBWNetEntryExists(t *testing.T) { + cat, err := LoadWithDataProvider(context.Background(), nil, "v0.0.0-next", "") + if err != nil { + t.Fatalf("Load failed: %v", err) + } + for _, v := range cat.Validators { + if v.Name == v1.NCCLAllReduceBWNetCheckName { + if v.Phase != "performance" { + t.Errorf("%q phase = %q, want performance", v1.NCCLAllReduceBWNetCheckName, v.Phase) + } + return + } + } + t.Fatalf("no embedded catalog entry named %q (AICR_NCCL_FABRIC forwarding would silently no-op)", v1.NCCLAllReduceBWNetCheckName) +} + func TestCatalogEmbedding(t *testing.T) { // Simulate embedding in a CR spec type ValidatorCatalogSpec struct { diff --git a/pkg/validator/v1/job_plan_internal.go b/pkg/validator/v1/job_plan_internal.go index 579896812..b86678277 100644 --- a/pkg/validator/v1/job_plan_internal.go +++ b/pkg/validator/v1/job_plan_internal.go @@ -52,6 +52,23 @@ const ( requireScopedInferenceGatewayEnv = "AICR_REQUIRE_SCOPED_INFERENCE_GATEWAY" + // NCCLAllReduceBWNetCheckName is the catalog name of the NCCL all-reduce NET + // check. Used to scope AICR_NCCL_FABRIC forwarding to that validator only. + // Exported (like InferencePerfCheckName / InferenceGatewayCheckName) so the + // catalog package can lock the embedded entry name against it — a rename + // would otherwise silently no-op RoCE forwarding with no test failing. + NCCLAllReduceBWNetCheckName = "nccl-all-reduce-bw-net" + + // ncclFabricEnv selects the NET fabric (efa default | roce). Forwarded to + // the NET check pod so the in-Job validator can observe it. This is the + // orchestrator (forwarding) end; the validator-pod (reading) end defines the + // same literal as ncclFabricEnv in + // validators/performance/nccl_all_reduce_bw_constraint.go — keep the two in + // sync. The split mirrors the other forwarded validator envs (HF_TOKEN, + // AICR_REQUIRE_SCOPED_INFERENCE_GATEWAY, AICR_INFERENCE_PERF_NO_CLEANUP): the + // pod binary is a separate package that does not import this one. + ncclFabricEnv = "AICR_NCCL_FABRIC" + // inferencePerfNoCleanupEnv, when truthy, makes the inference-perf validator // leave its namespace/DGD/workers/frontend/AIPerf Job in place after the run // for post-mortem inspection. Forwarded only to the inference-perf pod. @@ -155,6 +172,14 @@ func buildEnv( env = append(env, corev1.EnvVar{Name: requireScopedInferenceGatewayEnv, Value: v}) } + // Forward the NCCL fabric selector to the nccl-all-reduce-bw-net check pod. + // The NET test runs inside the Job, so it can't observe the CLI environment + // unless forwarded here. Unset (default) leaves the check on EFA; scoped to + // the NET check so unrelated validator pods don't carry it. + if v, ok := os.LookupEnv(ncclFabricEnv); ok && v != "" && entry.Name == NCCLAllReduceBWNetCheckName { + env = append(env, corev1.EnvVar{Name: ncclFabricEnv, Value: v}) + } + // Forward the inference-perf no-cleanup debug toggle into that validator pod. // Cleanup runs inside the Job, so it can't see the CLI process environment // unless the orchestrator carries the value across. Scoped to the @@ -175,7 +200,7 @@ func buildEnv( // forwarded value (k8s takes the last duplicate), breaking that trust // boundary. for _, e := range entry.Env { - if e.Name == hfTokenEnvVar || e.Name == requireScopedInferenceGatewayEnv || e.Name == inferencePerfNoCleanupEnv { + if e.Name == hfTokenEnvVar || e.Name == requireScopedInferenceGatewayEnv || e.Name == inferencePerfNoCleanupEnv || e.Name == ncclFabricEnv { continue } env = append(env, corev1.EnvVar{Name: e.Name, Value: e.Value}) diff --git a/pkg/validator/v1/job_plan_test.go b/pkg/validator/v1/job_plan_test.go index 73ea6f03c..1c8238bf4 100644 --- a/pkg/validator/v1/job_plan_test.go +++ b/pkg/validator/v1/job_plan_test.go @@ -16,6 +16,7 @@ package v1 import ( stderrors "errors" + "os" "strings" "testing" "time" @@ -522,6 +523,107 @@ func TestBuildJobPlan_ForwardsInferencePerfNoCleanupEnv(t *testing.T) { }) } +// TestBuildJobPlan_ForwardsNCCLFabricEnv verifies the NET fabric selector is +// carried from the CLI process into the nccl-all-reduce-bw-net validator Job +// (where ncclFabric() reads it), only that validator, and that a catalog-pinned +// value can never shadow or substitute for the forwarded one. Unlike the +// no-cleanup toggle, the value is forwarded verbatim (the validator validates it). +func TestBuildJobPlan_ForwardsNCCLFabricEnv(t *testing.T) { + build := func(entry ValidatorEntry) map[string]string { + plan, err := BuildJobPlan(entry, "run-1", "ns", "1.0.0", "abc123", "sa", nil, nil, nil, "", "", nil) + if err != nil { + t.Fatalf("BuildJobPlan error: %v", err) + } + m := make(map[string]string) + for _, e := range plan.Env { + m[e.Name] = e.Value + } + return m + } + + netEntry := ValidatorEntry{Name: NCCLAllReduceBWNetCheckName, Phase: "performance", Image: "img:v1", Timeout: time.Minute} + + t.Run("forwarded verbatim to nccl-all-reduce-bw-net", func(t *testing.T) { + t.Setenv(ncclFabricEnv, "roce") + if got := build(netEntry)[ncclFabricEnv]; got != "roce" { + t.Errorf("%s env = %q, want roce", ncclFabricEnv, got) + } + }) + t.Run("empty value omitted", func(t *testing.T) { + t.Setenv(ncclFabricEnv, "") + if _, present := build(netEntry)[ncclFabricEnv]; present { + t.Errorf("%s should not be in Job env when empty", ncclFabricEnv) + } + }) + t.Run("unset omitted", func(t *testing.T) { + // Exercise the LookupEnv ok=false branch (os.Unsetenv), distinct from the + // empty-string ok=true case above. t.Setenv registers cleanup so the + // unset is restored after the test. + t.Setenv(ncclFabricEnv, "") + if err := os.Unsetenv(ncclFabricEnv); err != nil { + t.Fatalf("unsetenv: %v", err) + } + if _, present := build(netEntry)[ncclFabricEnv]; present { + t.Errorf("%s should not be in Job env when unset", ncclFabricEnv) + } + }) + t.Run("not forwarded to other validators", func(t *testing.T) { + t.Setenv(ncclFabricEnv, "roce") + other := ValidatorEntry{Name: InferencePerfCheckName, Phase: "performance", Image: "img:v1", Timeout: time.Minute} + if _, present := build(other)[ncclFabricEnv]; present { + t.Errorf("%s must not be forwarded to a non-NET validator", ncclFabricEnv) + } + }) + t.Run("env-name literal locked", func(t *testing.T) { + // Pin the orchestrator (forwarding) end of the env name. The validator-pod + // (reading) end defines the same literal independently in + // validators/performance/nccl_all_reduce_bw_constraint.go; a fat-finger in + // either redeclaration would silently no-op RoCE forwarding. Both ends + // pin to this canonical string so a typo fails its own package's test. + if ncclFabricEnv != "AICR_NCCL_FABRIC" { + t.Errorf("ncclFabricEnv = %q, want AICR_NCCL_FABRIC (keep in sync with the pod-side const)", ncclFabricEnv) + } + }) + + // values collects every occurrence of the env var (not just the last) so we + // can assert the catalog value is dropped, not merely shadowed. + values := func(entry ValidatorEntry) []string { + plan, err := BuildJobPlan(entry, "run-1", "ns", "1.0.0", "abc123", "sa", nil, nil, nil, "", "", nil) + if err != nil { + t.Fatalf("BuildJobPlan error: %v", err) + } + var got []string + for _, e := range plan.Env { + if e.Name == ncclFabricEnv { + got = append(got, e.Value) + } + } + return got + } + + t.Run("catalog value cannot override forwarded value", func(t *testing.T) { + t.Setenv(ncclFabricEnv, "roce") + entry := ValidatorEntry{ + Name: NCCLAllReduceBWNetCheckName, Phase: "performance", Image: "img:v1", Timeout: time.Minute, + Env: []EnvVar{{Name: ncclFabricEnv, Value: "efa"}}, + } + if got := values(entry); len(got) != 1 || got[0] != "roce" { + t.Errorf("%s env = %v, want exactly [roce] (catalog value must be dropped)", ncclFabricEnv, got) + } + }) + + t.Run("catalog value alone cannot select fabric", func(t *testing.T) { + t.Setenv(ncclFabricEnv, "") + entry := ValidatorEntry{ + Name: NCCLAllReduceBWNetCheckName, Phase: "performance", Image: "img:v1", Timeout: time.Minute, + Env: []EnvVar{{Name: ncclFabricEnv, Value: "roce"}}, + } + if got := values(entry); len(got) != 0 { + t.Errorf("%s env = %v, want none (catalog must not select fabric without shell env)", ncclFabricEnv, got) + } + }) +} + func TestBuildJobPlanWithDefaults(t *testing.T) { // Test with minimal entry (no custom resources, no tolerations, no node selector) entry := ValidatorEntry{ diff --git a/recipes/validators/README.md b/recipes/validators/README.md index 0e3ce55af..3d27a4b08 100644 --- a/recipes/validators/README.md +++ b/recipes/validators/README.md @@ -47,7 +47,7 @@ Applied by `catalog.Load` (`pkg/validator/catalog/catalog.go`) in order: | Name | Description | Timeout | |------|-------------|---------| | `nccl-all-reduce-bw` | Verify NCCL All Reduce Bus Bandwidth meets threshold | 30m | -| `nccl-all-reduce-bw-net` | Verify NCCL All Reduce Bus Bandwidth on the NET transport (EFA on EKS) | 30m | +| `nccl-all-reduce-bw-net` | Verify NCCL All Reduce Bus Bandwidth on the NET transport (EFA on EKS; ConnectX RoCE via `AICR_NCCL_FABRIC=roce`) | 30m | | `nccl-all-reduce-bw-nvls` | Verify NCCL All Reduce Bus Bandwidth on the NVLS transport (MNNVL across an NVL72 IMEX domain) | 30m | ### Conformance Phase diff --git a/recipes/validators/catalog.yaml b/recipes/validators/catalog.yaml index dfc9b8295..372881afe 100644 --- a/recipes/validators/catalog.yaml +++ b/recipes/validators/catalog.yaml @@ -231,7 +231,7 @@ validators: env: [] - name: nccl-all-reduce-bw-net phase: performance - description: "Verify NCCL All Reduce Bus Bandwidth on the NET transport (EFA on EKS)" + description: "Verify NCCL All Reduce Bus Bandwidth on the NET transport (EFA on EKS; ConnectX RoCE via AICR_NCCL_FABRIC=roce)" image: ghcr.io/nvidia/aicr-validators/performance:latest timeout: 30m args: ["nccl-all-reduce-bw-net"] diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index f83185c8f..d450de293 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -115,6 +115,58 @@ const ( variantNVLS ncclVariant = "nvls" ) +// ncclFabricType selects the inter-node fabric for the NET variant. Default EFA +// preserves all existing behavior; roce (AICR_NCCL_FABRIC=roce) selects the +// ConnectX RoCE NET path — NCCL's built-in IB/verbs transport over +// roce.networking.k8s.aws DRA devices. Fabric is keyed independently of the +// accelerator: the RoCE NET template is shared across EKS RoCE nodes +// (testdata/roce/{service}/...), not per-accelerator. Snapshot-based fabric +// auto-detection (so this env knob becomes an override, not the selector) is +// tracked in NVIDIA/aicr#1413. +type ncclFabricType string + +const ( + fabricEFA ncclFabricType = "efa" + fabricRoCE ncclFabricType = "roce" + // ncclFabricEnv is the validator-pod (reading) end of the fabric selector. + // The orchestrator (forwarding) end defines the same literal as ncclFabricEnv + // in pkg/validator/v1/job_plan_internal.go — keep the two in sync. The pod + // binary is a separate package and does not import the orchestrator package, + // matching how the other forwarded validator envs are split. + ncclFabricEnv = "AICR_NCCL_FABRIC" + + // ncclRoceClaimName is the RoCE DRA ResourceClaimTemplate name. Must match + // metadata.name in testdata/roce/{service}/roce-claim.yaml; used by cleanup + // to delete the claim (the validator namespace is persistent/reused). + ncclRoceClaimName = "nccl-roce-rct" +) + +// roceNETSupportedServices lists services with a testdata/roce/{service} NET +// template. RoCE NET is accelerator-agnostic, so support is keyed by service. +var roceNETSupportedServices = map[recipe.CriteriaServiceType]bool{ + recipe.CriteriaServiceEKS: true, +} + +// ncclFabric returns the configured NET fabric (default EFA when unset). Read +// from the validator pod's environment, forwarded by the CLI/orchestrator +// (buildEnv). A non-empty but unrecognized value (e.g. a typo "roc") is +// rejected rather than silently falling back to EFA, so an operator who +// intended RoCE never passes the EFA validator by accident. +func ncclFabric() (ncclFabricType, error) { + v := strings.TrimSpace(os.Getenv(ncclFabricEnv)) + switch { + case v == "": + return fabricEFA, nil + case strings.EqualFold(v, string(fabricEFA)): + return fabricEFA, nil + case strings.EqualFold(v, string(fabricRoCE)): + return fabricRoCE, nil + default: + return "", aicrErrors.New(aicrErrors.ErrCodeInvalidRequest, + fmt.Sprintf("unsupported %s=%q (expected %q or %q)", ncclFabricEnv, v, fabricEFA, fabricRoCE)) + } +} + // Transport markers emitted by NCCL when NCCL_DEBUG=INFO. Used by // verifyTransportFromLogs to assert the intended fabric actually carried // traffic. Earlier NCCL releases emitted per-channel "[send] via NET/" @@ -134,12 +186,17 @@ var ( // // variantDefault → testdata/{accelerator}/{service}/{filename} // other variants → testdata/{accelerator}/{service}/{stem}-{variant}{ext} -func templatePath(accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant, filename string) string { +func templatePath(accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant, fabric ncclFabricType, filename string) string { if variant != variantDefault { ext := filepath.Ext(filename) stem := strings.TrimSuffix(filename, ext) filename = stem + "-" + string(variant) + ext } + // RoCE NET templates are fabric-keyed and accelerator-agnostic: any EKS RoCE + // node uses testdata/roce/{service}/..., not a per-accelerator directory. + if fabric == fabricRoCE { + return filepath.Join("testdata", string(fabricRoCE), string(service), filename) + } return filepath.Join("testdata", string(accelerator), string(service), filename) } @@ -183,8 +240,19 @@ func validateNcclAllReduceBw(ctx *validators.Context, constraint recipe.Constrai service := ctx.ValidationInput.Criteria.Service accelerator := ctx.ValidationInput.Criteria.Accelerator + fabric, err := ncclFabric() + if err != nil { + return "", false, err + } + supported := false - if byService, ok := supportedNCCLCombinations[variant]; ok { + if fabric == fabricRoCE && variant == variantNET { + // RoCE NET is fabric-keyed and accelerator-agnostic — supported on any + // service with a testdata/roce/{service} template. Only NET has a RoCE + // path; NVLS (NVLink/IMEX) is fabric-independent and uses the normal + // accelerator-keyed combinations below. + supported = roceNETSupportedServices[service] + } else if byService, ok := supportedNCCLCombinations[variant]; ok { if supportedAccelerators, ok := byService[service]; ok { for _, a := range supportedAccelerators { if accelerator == a { @@ -197,7 +265,7 @@ func validateNcclAllReduceBw(ctx *validators.Context, constraint recipe.Constrai if !supported { slog.Info("Skipping NCCL All Reduce bandwidth validation: unsupported variant/service/accelerator combination", - "variant", string(variant), "service", service, "accelerator", accelerator) + "variant", string(variant), "service", service, "accelerator", accelerator, "fabric", string(fabric)) return "skipped - requires Service + Accelerator to be implemented", true, nil } @@ -227,7 +295,7 @@ func validateNcclAllReduceBw(ctx *validators.Context, constraint recipe.Constrai // On GB200/EKS the NET variant needs NVreg_GrdmaPciTopoCheckOverride=1 // on the NVIDIA driver; without it, EFA can't attach dma-buf to GPU HBM // and NCCL silently falls back to Socket. - if gb200NetPreflightApplies(variant, accelerator, service) { + if fabric == fabricEFA && gb200NetPreflightApplies(variant, accelerator, service) { if pfErr := preflightGB200NetNVregFlag(ctx, gpuConfig.Nodes); pfErr != nil { return "", false, pfErr } @@ -236,7 +304,7 @@ func validateNcclAllReduceBw(ctx *validators.Context, constraint recipe.Constrai // Run the NCCL all-reduce benchmark using Kubeflow TrainJob + MPI. // Each platform has a per-platform TrainingRuntime with all platform-specific // configuration (image, mpirun args, resources, sidecars). The TrainJob is shared. - logs, err := runNCCLTrainJob(ctx, gpuConfig, accelerator, service, variant) + logs, err := runNCCLTrainJob(ctx, gpuConfig, accelerator, service, variant, fabric) if err != nil { return "", false, err } @@ -274,7 +342,7 @@ func validateNcclAllReduceBw(ctx *validators.Context, constraint recipe.Constrai // It applies the per-platform TrainingRuntime and shared TrainJob, waits for the launcher // pod to complete, and returns the benchmark logs. func runNCCLTrainJob(ctx *validators.Context, gpuConfig *gpuConfiguration, - accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant) (string, error) { + accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant, fabric ncclFabricType) (string, error) { dynamicClient := ctx.DynamicClient @@ -297,11 +365,21 @@ func runNCCLTrainJob(ctx *validators.Context, gpuConfig *gpuConfiguration, slog.Info("Kubeflow Trainer already installed, proceeding") } + // Clean up NCCL resources on every exit path. Registered after the trainer + // install block but before the apply: defers run LIFO, so this runs *before* + // the conditional deleteTrainer above — the NCCL TrainJob/TrainingRuntime CRs + // are deleted while their CRDs still exist, rather than relying on CRD-delete + // cascade GC. Registering it before applyNCCLResources still guarantees a + // partial-apply failure (e.g. the RoCE claim is created, then the runtime or + // TrainJob apply fails) doesn't leak nccl-roce-rct into the persistent, reused + // validation namespace. cleanupNCCLResources is NotFound-tolerant for every + // resource it deletes, so running it after an early failure is safe. + defer cleanupNCCLResources(dynamicClient, gpuConfig.Namespace) + // Apply runtime and trainjob resources. - if applyErr := applyNCCLResources(ctx, dynamicClient, gpuConfig, accelerator, service, variant); applyErr != nil { + if applyErr := applyNCCLResources(ctx, dynamicClient, gpuConfig, accelerator, service, variant, fabric); applyErr != nil { return "", aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply NCCL resources", applyErr) } - defer cleanupNCCLResources(dynamicClient, gpuConfig.Namespace) podHelper := &helper.PodLifecycle{ ClientSet: ctx.Clientset, @@ -550,8 +628,8 @@ func determineGPUConfig(ctx *validators.Context, service recipe.CriteriaServiceT // YAML files with template substitution using the dynamic client. // Runtime: testdata/{accelerator}/{service}/runtime[-{variant}].yaml (per-platform+variant) // TrainJob: testdata/trainjob.yaml (shared, just runtimeRef + numNodes) -func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface, config *gpuConfiguration, accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant) error { - slog.Info("Applying NCCL test resources...", "accelerator", accelerator, "service", service, "variant", string(variant)) +func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface, config *gpuConfiguration, accelerator recipe.CriteriaAcceleratorType, service recipe.CriteriaServiceType, variant ncclVariant, fabric ncclFabricType) error { + slog.Info("Applying NCCL test resources...", "accelerator", accelerator, "service", service, "variant", string(variant), "fabric", string(fabric)) templateData := map[string]string{ "NAMESPACE": config.Namespace, @@ -589,16 +667,21 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface return err } instanceType = it - // Indentation matches the resource block position in runtime.yaml. - const efaIndent = " " - templateData["EFA_RESOURCE_LIMITS"] = buildEFAResourceLine(efaCount, efaIndent) - templateData["EFA_RESOURCE_REQUESTS"] = buildEFAResourceLine(efaCount, efaIndent) - if efaCount == 0 { - templateData["MAX_MESSAGE_SIZE"] = maxMessageSizeTCP - slog.Warn("No EFA adapters found — NCCL will use TCP (reduced bandwidth)", - "instanceType", instanceType, "maxMessageSize", maxMessageSizeTCP) - } else { - slog.Info("Discovered EKS node configuration", "instanceType", instanceType, "efaCount", efaCount) + // EFA resource wiring is fabric-specific; the RoCE path claims NICs via a + // DRA ResourceClaimTemplate below (keyed by fabric, not service) and + // leaves these EFA template vars unset. + if fabric != fabricRoCE { + // Indentation matches the resource block position in runtime.yaml. + const efaIndent = " " + templateData["EFA_RESOURCE_LIMITS"] = buildEFAResourceLine(efaCount, efaIndent) + templateData["EFA_RESOURCE_REQUESTS"] = buildEFAResourceLine(efaCount, efaIndent) + if efaCount == 0 { + templateData["MAX_MESSAGE_SIZE"] = maxMessageSizeTCP + slog.Warn("No EFA adapters found — NCCL will use TCP (reduced bandwidth)", + "instanceType", instanceType, "maxMessageSize", maxMessageSizeTCP) + } else { + slog.Info("Discovered EKS node configuration", "instanceType", instanceType, "efaCount", efaCount) + } } } @@ -629,7 +712,30 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface "(e.g., --node-selector nvidia.com/gpu.present=true)") } - runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, variant, "runtime.yaml"), templateData) + // RoCE NET: the worker pod references a RoCE DRA ResourceClaimTemplate + // (nccl-roce-rct). parseYAMLTemplate is single-document, so apply the claim + // as a standalone object before the runtime (it must exist when the TrainJob + // later creates the worker pods that reference it). + if fabric == fabricRoCE { + // Claim one ConnectX RoCE device per GPU via DRA (NCCL maps GPU->NIC); + // the per-node device pool (e.g. 8 on p6e-gb300r) is >= GPUs/node. Set + // here — keyed by fabric, not service — so adding a non-EKS RoCE service + // to roceNETSupportedServices still renders ${ROCE_DEVICE_COUNT}. + templateData["ROCE_DEVICE_COUNT"] = strconv.Itoa(config.GPUCountPerNode) + slog.Info("RoCE NET: claiming RoCE DRA devices", "count", config.GPUCountPerNode) + + // Create-or-update (not plain Create) so a stale claim left by a prior + // run that was hard-killed before its deferred cleanup ran is reclaimed + // rather than failing the apply with AlreadyExists. Matches the shared + // resourceClaimTemplateGVR pattern in inference_perf_constraint.go. + claimPath := filepath.Join("testdata", string(fabricRoCE), string(service), "roce-claim.yaml") + if cerr := createOrUpdateFromTemplate(ctx, resourceClaimTemplateGVR, config.Namespace, claimPath, templateData, nil); cerr != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply RoCE ResourceClaimTemplate", cerr) + } + slog.Info("Applied RoCE ResourceClaimTemplate", "name", ncclRoceClaimName, "count", templateData["ROCE_DEVICE_COUNT"]) + } + + runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, variant, fabric, "runtime.yaml"), templateData) if err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse training runtime template", err) } @@ -1230,20 +1336,29 @@ func cleanupNCCLResources(dynamicClient dynamic.Interface, namespace string) { cleanupCtx, cancel := context.WithTimeout(context.Background(), defaults.DiagnosticTimeout) defer cancel() - // Delete trainjob + // Delete trainjob. NotFound is expected and logged at debug: this runs as a + // deferred cleanup registered before the apply, so an early/partial-apply + // failure (or the install-trainer path, where deleteTrainer may already have + // cascade-removed the CRs) legitimately leaves no TrainJob to delete. err := dynamicClient.Resource(trainJobGVR).Namespace(namespace).Delete(cleanupCtx, ncclTrainJobName, metav1.DeleteOptions{}) - if err != nil { - slog.Warn("failed to delete TrainJob", "error", err) - } else { + switch { + case err == nil: slog.Info("Deleted TrainJob") + case apierrors.IsNotFound(err): + slog.Debug("TrainJob not present, skipping", "name", ncclTrainJobName) + default: + slog.Warn("failed to delete TrainJob", "error", err) } - // Delete runtime + // Delete runtime. NotFound is expected and logged at debug (see TrainJob above). err = dynamicClient.Resource(trainingRuntimeGVR).Namespace(namespace).Delete(cleanupCtx, ncclTrainingRuntimeName, metav1.DeleteOptions{}) - if err != nil { - slog.Warn("failed to delete TrainingRuntime", "error", err) - } else { + switch { + case err == nil: slog.Info("Deleted TrainingRuntime") + case apierrors.IsNotFound(err): + slog.Debug("TrainingRuntime not present, skipping", "name", ncclTrainingRuntimeName) + default: + slog.Warn("failed to delete TrainingRuntime", "error", err) } // Delete ComputeDomain if this was the NVLS variant. NotFound is the @@ -1258,4 +1373,18 @@ func cleanupNCCLResources(dynamicClient dynamic.Interface, namespace string) { default: slog.Warn("failed to delete ComputeDomain", "error", err, "name", ncclComputeDomainName) } + + // Delete the RoCE ResourceClaimTemplate (RoCE NET variant only). The + // validator namespace is persistent and reused across runs, so leaving it + // behind makes the next RoCE run fail with AlreadyExists when + // applyNCCLResources re-creates it. NotFound is expected for EFA/NVLS runs. + err = dynamicClient.Resource(resourceClaimTemplateGVR).Namespace(namespace).Delete(cleanupCtx, ncclRoceClaimName, metav1.DeleteOptions{}) + switch { + case err == nil: + slog.Info("Deleted RoCE ResourceClaimTemplate", "name", ncclRoceClaimName) + case apierrors.IsNotFound(err): + slog.Debug("RoCE ResourceClaimTemplate not present (non-RoCE variant), skipping", "name", ncclRoceClaimName) + default: + slog.Warn("failed to delete RoCE ResourceClaimTemplate", "error", err, "name", ncclRoceClaimName) + } } diff --git a/validators/performance/nccl_roce_apply_test.go b/validators/performance/nccl_roce_apply_test.go new file mode 100644 index 000000000..ad7d61381 --- /dev/null +++ b/validators/performance/nccl_roce_apply_test.go @@ -0,0 +1,161 @@ +// Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "path/filepath" + "testing" + + "github.com/NVIDIA/aicr/validators" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" +) + +// ncclGVRListKinds maps every GVR cleanupNCCLResources / applyNCCLResources +// touch to a fake list kind, so the dynamic fake client can serve Create/Get/ +// Update/Delete for these CRDs without a real REST mapper. +var ncclGVRListKinds = map[schema.GroupVersionResource]string{ + resourceClaimTemplateGVR: "ResourceClaimTemplateList", + trainJobGVR: "TrainJobList", + trainingRuntimeGVR: "TrainingRuntimeList", + computeDomainGVR: "ComputeDomainList", +} + +func newFakeDynamicClient(objs ...runtime.Object) dynamic.Interface { + return dynamicfake.NewSimpleDynamicClientWithCustomListKinds( + runtime.NewScheme(), ncclGVRListKinds, objs...) +} + +// roceClaimCount walks the RoCE ResourceClaimTemplate to the templated device +// count (spec.spec.devices.requests[0].exactly.count). Fails the test on any +// shape mismatch so a future template restructure is caught here. +func roceClaimCount(t *testing.T, claim *unstructured.Unstructured) int64 { + t.Helper() + requests, found, err := unstructured.NestedSlice(claim.Object, "spec", "spec", "devices", "requests") + if err != nil || !found || len(requests) == 0 { + t.Fatalf("claim has no devices.requests (found=%v err=%v)", found, err) + } + req0, ok := requests[0].(map[string]interface{}) + if !ok { + t.Fatalf("requests[0] is %T, want map", requests[0]) + } + exactly, ok := req0["exactly"].(map[string]interface{}) + if !ok { + t.Fatalf("requests[0].exactly is %T, want map", req0["exactly"]) + } + count, ok := exactly["count"].(int64) + if !ok { + t.Fatalf("requests[0].exactly.count is %T, want int64", exactly["count"]) + } + return count +} + +// TestNCCLFabricEnvNameLocked pins the validator-pod (reading) end of the fabric +// env name. The orchestrator (forwarding) end in pkg/validator/v1 defines the +// same literal independently; a fat-finger in either redeclaration would silently +// no-op RoCE forwarding (the pod would never see the value and default to EFA). +// Both ends pin to this canonical string so a typo fails its own package's test. +func TestNCCLFabricEnvNameLocked(t *testing.T) { + if ncclFabricEnv != "AICR_NCCL_FABRIC" { + t.Errorf("ncclFabricEnv = %q, want AICR_NCCL_FABRIC (keep in sync with pkg/validator/v1)", ncclFabricEnv) + } +} + +// TestCreateOrUpdateFromTemplate_RoCEClaimIdempotent is the regression guard for +// the create-or-update fix: applying the RoCE ResourceClaimTemplate twice (as a +// reused, persistent validation namespace would) must not fail with +// AlreadyExists on the second apply, and the second apply must reflect the new +// templated device count rather than erroring out. +func TestCreateOrUpdateFromTemplate_RoCEClaimIdempotent(t *testing.T) { + const ns = "aicr-validation" + claimPath := filepath.Join("testdata", "roce", "eks", "roce-claim.yaml") + + fakeClient := newFakeDynamicClient() + ctx := &validators.Context{Ctx: context.Background(), DynamicClient: fakeClient} + + // First apply: claim does not exist → plain create. + if err := createOrUpdateFromTemplate(ctx, resourceClaimTemplateGVR, ns, claimPath, + map[string]string{"NAMESPACE": ns, "ROCE_DEVICE_COUNT": "8"}, nil); err != nil { + t.Fatalf("first apply (create) failed: %v", err) + } + + // Second apply with a different count: claim already exists → must + // create-or-update (Get + Update), NOT fail with AlreadyExists. + if err := createOrUpdateFromTemplate(ctx, resourceClaimTemplateGVR, ns, claimPath, + map[string]string{"NAMESPACE": ns, "ROCE_DEVICE_COUNT": "4"}, nil); err != nil { + t.Fatalf("second apply (update) failed — create-or-update regressed to plain create: %v", err) + } + + got, err := fakeClient.Resource(resourceClaimTemplateGVR).Namespace(ns). + Get(context.Background(), ncclRoceClaimName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("claim not found after idempotent re-apply: %v", err) + } + if c := roceClaimCount(t, got); c != 4 { + t.Errorf("device count = %d after second apply, want 4 (update did not take effect)", c) + } +} + +// TestCleanupNCCLResources_ToleratesMissing verifies the deferred cleanup is +// safe to run after an early/partial-apply failure: with no resources present, +// every Delete hits NotFound and the function must complete without panicking. +func TestCleanupNCCLResources_ToleratesMissing(t *testing.T) { + const ns = "aicr-validation" + // No objects seeded — every Delete returns NotFound. + fakeClient := newFakeDynamicClient() + cleanupNCCLResources(fakeClient, ns) + + // Cleanup must tolerate the absence, not resurrect anything. + _, err := fakeClient.Resource(resourceClaimTemplateGVR).Namespace(ns). + Get(context.Background(), ncclRoceClaimName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Fatalf("claim should remain absent after cleanup of empty namespace, got err=%v", err) + } +} + +// TestCleanupNCCLResources_DeletesRoCEClaim verifies the happy path: a RoCE +// claim left in the persistent namespace is deleted by cleanup, so the next run +// does not collide with it. +func TestCleanupNCCLResources_DeletesRoCEClaim(t *testing.T) { + const ns = "aicr-validation" + + claim := &unstructured.Unstructured{} + claim.SetAPIVersion("resource.k8s.io/v1") + claim.SetKind("ResourceClaimTemplate") + claim.SetName(ncclRoceClaimName) + claim.SetNamespace(ns) + + fakeClient := newFakeDynamicClient(claim) + + // Sanity: the claim exists before cleanup. + if _, err := fakeClient.Resource(resourceClaimTemplateGVR).Namespace(ns). + Get(context.Background(), ncclRoceClaimName, metav1.GetOptions{}); err != nil { + t.Fatalf("precondition: claim should exist before cleanup: %v", err) + } + + cleanupNCCLResources(fakeClient, ns) + + _, err := fakeClient.Resource(resourceClaimTemplateGVR).Namespace(ns). + Get(context.Background(), ncclRoceClaimName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Fatalf("claim should be deleted after cleanup, got err=%v", err) + } +} diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index 6cbb64db3..d904245af 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -596,12 +596,52 @@ func TestCommonGKEAccelerator(t *testing.T) { } } +func TestNCCLFabric(t *testing.T) { + tests := []struct { + name string + env string + setEnv bool + want ncclFabricType + wantErr bool + }{ + {name: "unset defaults to efa", setEnv: false, want: fabricEFA}, + {name: "empty defaults to efa", env: "", setEnv: true, want: fabricEFA}, + {name: "efa", env: "efa", setEnv: true, want: fabricEFA}, + {name: "roce", env: "roce", setEnv: true, want: fabricRoCE}, + {name: "case-insensitive roce", env: "RoCE", setEnv: true, want: fabricRoCE}, + {name: "whitespace trimmed", env: " roce ", setEnv: true, want: fabricRoCE}, + {name: "typo rejected", env: "roc", setEnv: true, wantErr: true}, + {name: "unknown value rejected", env: "infiniband", setEnv: true, wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.setEnv { + t.Setenv(ncclFabricEnv, tt.env) + } else { + // t.Setenv requires a value; clear explicitly to assert the unset path. + t.Setenv(ncclFabricEnv, "") + if err := os.Unsetenv(ncclFabricEnv); err != nil { + t.Fatalf("unsetenv: %v", err) + } + } + got, err := ncclFabric() + if (err != nil) != tt.wantErr { + t.Fatalf("ncclFabric() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr && got != tt.want { + t.Errorf("ncclFabric() = %q, want %q", got, tt.want) + } + }) + } +} + func TestTemplatePath(t *testing.T) { tests := []struct { name string accelerator recipe.CriteriaAcceleratorType service recipe.CriteriaServiceType variant ncclVariant + fabric ncclFabricType filename string expected string }{ @@ -613,6 +653,27 @@ func TestTemplatePath(t *testing.T) { filename: "runtime.yaml", expected: filepath.Join("testdata", "h100", "eks", "runtime.yaml"), }, + { + // RoCE NET is fabric-keyed and accelerator-agnostic: the path drops + // the accelerator dir for testdata/roce/{service}/. The next two + // cases assert two *different* accelerators resolve to the same path. + name: "eks gb200 net roce -> shared roce path", + accelerator: recipe.CriteriaAcceleratorGB200, + service: recipe.CriteriaServiceEKS, + variant: variantNET, + fabric: fabricRoCE, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "roce", "eks", "runtime-net.yaml"), + }, + { + name: "eks h100 net roce -> same shared roce path (accelerator-agnostic)", + accelerator: recipe.CriteriaAcceleratorH100, + service: recipe.CriteriaServiceEKS, + variant: variantNET, + fabric: fabricRoCE, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "roce", "eks", "runtime-net.yaml"), + }, { name: "eks h200 runtime default", accelerator: recipe.CriteriaAcceleratorH200, @@ -680,7 +741,7 @@ func TestTemplatePath(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := templatePath(tt.accelerator, tt.service, tt.variant, tt.filename) + got := templatePath(tt.accelerator, tt.service, tt.variant, tt.fabric, tt.filename) if got != tt.expected { t.Errorf("templatePath() = %q, want %q", got, tt.expected) } @@ -909,6 +970,7 @@ func TestSupportedNCCLCombinationsHaveRuntimeTemplates(t *testing.T) { "MAX_MESSAGE_SIZE": maxMessageSize, "EFA_RESOURCE_LIMITS": buildEFAResourceLine(1, efaIndent), "EFA_RESOURCE_REQUESTS": buildEFAResourceLine(1, efaIndent), + "ROCE_DEVICE_COUNT": "8", "GKE_NETWORK_INTERFACES": buildGKENetworkInterfacesAnnotation([]string{ "gpu-nic-0", "gpu-nic-1", @@ -927,7 +989,7 @@ func TestSupportedNCCLCombinationsHaveRuntimeTemplates(t *testing.T) { for _, accelerator := range accelerators { name := strings.Join([]string{string(variant), string(service), string(accelerator)}, "/") t.Run(name, func(t *testing.T) { - path := templatePath(accelerator, service, variant, "runtime.yaml") + path := templatePath(accelerator, service, variant, fabricEFA, "runtime.yaml") if _, err := parseYAMLTemplate(path, data); err != nil { t.Fatalf("supported NCCL combination has no parseable runtime template %s: %v", path, err) } @@ -935,6 +997,26 @@ func TestSupportedNCCLCombinationsHaveRuntimeTemplates(t *testing.T) { } } } + + // RoCE NET templates are accelerator-agnostic and keyed by fabric, so they + // aren't covered by the accelerator-keyed loop above. Parse both the runtime + // and the standalone RoCE ResourceClaimTemplate explicitly to catch a + // malformed testdata/roce/{service}/{runtime-net,roce-claim}.yaml — the + // claim is applied separately by applyNCCLResources, so it would otherwise + // only be exercised on a live cluster. + for service := range roceNETSupportedServices { + name := strings.Join([]string{string(variantNET), string(service), string(fabricRoCE)}, "/") + t.Run(name, func(t *testing.T) { + runtimePath := templatePath(recipe.CriteriaAcceleratorH100, service, variantNET, fabricRoCE, "runtime.yaml") + if _, err := parseYAMLTemplate(runtimePath, data); err != nil { + t.Fatalf("supported RoCE NET combination has no parseable runtime template %s: %v", runtimePath, err) + } + claimPath := filepath.Join("testdata", string(fabricRoCE), string(service), "roce-claim.yaml") + if _, err := parseYAMLTemplate(claimPath, data); err != nil { + t.Fatalf("supported RoCE NET combination has no parseable claim template %s: %v", claimPath, err) + } + }) + } } // TestH200EKSRuntimeMatchesH100 enforces the "keep in sync with H100" contract diff --git a/validators/performance/testdata/roce/eks/roce-claim.yaml b/validators/performance/testdata/roce/eks/roce-claim.yaml new file mode 100644 index 000000000..55a162415 --- /dev/null +++ b/validators/performance/testdata/roce/eks/roce-claim.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# RoCE DRA ResourceClaimTemplate for the NCCL NET test (AICR_NCCL_FABRIC=roce). +# +# Requests ConnectX RoCE NICs from the AWS networking DRA driver's DeviceClass +# (roce.networking.k8s.aws, surfaced by aws-net-k8s-dra-plugin). The NCCL NET +# worker pod references this via resourceClaims. Applied as a standalone object +# by applyNCCLResources before the TrainingRuntime (parseYAMLTemplate is +# single-document). Count is templated to the per-node GPU count. +apiVersion: resource.k8s.io/v1 +kind: ResourceClaimTemplate +metadata: + name: nccl-roce-rct + namespace: ${NAMESPACE} +spec: + spec: + devices: + requests: + - name: roce + exactly: + deviceClassName: roce.networking.k8s.aws + count: ${ROCE_DEVICE_COUNT} diff --git a/validators/performance/testdata/roce/eks/runtime-net.yaml b/validators/performance/testdata/roce/eks/runtime-net.yaml new file mode 100644 index 000000000..dc4efbf53 --- /dev/null +++ b/validators/performance/testdata/roce/eks/runtime-net.yaml @@ -0,0 +1,237 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NCCL All-Reduce NET TrainingRuntime — AWS ConnectX RoCE fabric. +# +# Fabric-keyed (AICR_NCCL_FABRIC=roce), accelerator-agnostic: any EKS RoCE node +# (ConnectX VFs surfaced as roce.networking.k8s.aws DRA devices, e.g. the DGXC +# GB300 p6e-gb300r clusters) rather than a per-accelerator template. The RoCE +# DRA claim (nccl-roce-rct) is applied separately by applyNCCLResources. +# +# Differences from the EFA NET runtime: +# - Image: nvcr.io/nvidia/pytorch (CUDA 13, sm_103-capable; ships nccl-tests +# _mpi binaries + NCCL built with IB/verbs). The EFA hpc-cloud image is +# CUDA 12 and EFA-only — wrong fabric and wrong CUDA for Blackwell Ultra. +# - Transport: NCCL built-in IB/verbs over the ConnectX RoCE devices +# (NCCL_IB_HCA=rocep, the AWS RoCE kernel naming), NCCL_NET_PLUGIN=none to +# bypass the bundled aws-ofi-nccl (EFA/Libfabric) plugin, no FI_PROVIDER. +# - Resource: RoCE DRA claim (resourceClaims -> nccl-roce-rct) instead of the +# vpc.amazonaws.com/efa extended resource. +# - MPI bootstrap: HPC-X UCC/HCOLL disabled and PML forced to ob1 over TCP — +# UCC team-create and the UCX PML segfault during MPI_Init on this image; +# MPI is only the launcher/bootstrap, NCCL carries the collective data. +# +# Verified end-to-end on aicr-gb300 (8 GPUs, 2x p6e-gb300r): NCCL IB over 8 +# rocep* RoCE devices (GPUDirect RDMA), ~387 GB/s peak busbw. See NVIDIA/aicr#1413. +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: TrainingRuntime +metadata: + name: nccl-all-reduce-runtime + namespace: ${NAMESPACE} + labels: + trainer.kubeflow.org/framework: mpi +spec: + mlPolicy: + mpi: + mpiImplementation: OpenMPI + numProcPerNode: ${GPU_COUNT_PER_NODE} + runLauncherAsNode: false + sshAuthMountPath: /tmp/mpi-keys + template: + spec: + network: + enableDNSHostnames: true + publishNotReadyAddresses: true + replicatedJobs: + - name: launcher + replicas: 1 + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:26.01-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh + cp /tmp/mpi-keys/id_rsa /root/.ssh/id_rsa + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys + chmod 700 /root/.ssh + chmod 600 /root/.ssh/id_rsa /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:26.01-py3 + env: + - name: LD_LIBRARY_PATH + value: "/usr/local/nvidia/lib64:/usr/local/cuda/lib64" + command: + - /usr/local/mpi/bin/mpirun + args: + - -np + - "${GPU_COUNT}" + - --allow-run-as-root + - --mca + - plm_rsh_args + # ConnectionAttempts is generous because the worker pods install + # openssh-server at runtime (the CUDA-13 pytorch image ships no + # sshd) before starting it — the launcher must keep retrying the + # rsh connection until that apt-get + sshd startup completes. + # Requires the GPU nodes to have apt egress; on an air-gapped / + # no-egress cluster the worker install never finishes and this + # validator fails (tracked for removal once a CUDA-13 image ships + # sshd — see NVIDIA/aicr#1413). + - -o StrictHostKeyChecking=no -o ConnectionAttempts=60 + # HPC-X UCC/HCOLL collective components segfault in MPI_Init + # team-create on this image; disable them (NCCL does the + # collectives — MPI is only launcher/bootstrap). + - --mca + - coll + - ^ucc,hcoll + # UCX PML segfaults here too; force ob1 PML over TCP. + - --mca + - pml + - ob1 + - --mca + - btl + - tcp,self + - --mca + - btl_tcp_if_include + - eth0 + - --mca + - oob_tcp_if_include + - eth0 + - -x + - LD_LIBRARY_PATH + # INFO so the "NCCL INFO Using network IB" banner appears for + # verifyTransportFromLogs (any non-Socket NET plugin passes). + - -x + - NCCL_DEBUG=INFO + - -x + - NCCL_SOCKET_IFNAME=eth0 + # Use NCCL's built-in IB/verbs transport over the ConnectX RoCE + # devices; disable the bundled aws-ofi-nccl (EFA) plugin. + - -x + - NCCL_NET_PLUGIN=none + - -x + - NCCL_IB_HCA=rocep + # Force the NET fabric: disable the NVL72 NVLink paths (NVLS / + # multi-node NVLink) so traffic crosses RoCE. + - -x + - NCCL_NVLS_ENABLE=0 + - -x + - NCCL_MNNVL_ENABLE=0 + - -x + - NCCL_IGNORE_DISABLED_P2P=1 + - /usr/local/bin/${TEST_TYPE}_mpi + - -b + - ${MIN_MESSAGE_SIZE} + - -e + - ${MAX_MESSAGE_SIZE} + - -f + - "2" + - -g + - "1" + resources: + limits: + cpu: "2" + memory: 512Mi + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + volumes: + - name: ssh-config + emptyDir: {} + - name: node + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + # Only stage the SSH keys into the shared ssh-config volume. + # openssh-server is installed by the main container (init and + # main containers don't share a root filesystem, so installing + # it here would be wasted) — sshd and /var/run/sshd are set up + # there. Mirrors the launcher's key-only init container. + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:26.01-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:26.01-py3 + command: ["sh", "-c"] + args: + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/* /root/.ssh/ && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys && + /usr/sbin/sshd -De + resources: + limits: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + requests: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + claims: + - name: roce + securityContext: + capabilities: + add: ["IPC_LOCK"] + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + - name: dshm + mountPath: /dev/shm + resourceClaims: + - name: roce + resourceClaimTemplateName: nccl-roce-rct + volumes: + - name: ssh-config + emptyDir: {} + - name: dshm + emptyDir: + medium: Memory + successPolicy: + operator: All + targetReplicatedJobs: + - launcher