From 13330948ece159e10a93c6feab28366c7f42a9b0 Mon Sep 17 00:00:00 2001 From: aadvani Date: Wed, 27 May 2026 03:46:48 +0000 Subject: [PATCH] Support for creating DPF objects when first switching to DPF via reprovisioning Signed-off-by: aadvani Modifications to DPF reprovisioning fix based on review comments Signed-off-by: aadvani Add fixes to the reprovisioning tests Signed-off-by: aadvani lint fixes Signed-off-by: aadvani --- crates/api-model/src/machine/mod.rs | 18 +++- crates/api/src/tests/dpf/reprovisioning.rs | 37 ++++++++ crates/machine-controller/src/dpf.rs | 44 +++++++++ crates/machine-controller/src/handler/dpf.rs | 98 +++++++++++++------- 4 files changed, 158 insertions(+), 39 deletions(-) diff --git a/crates/api-model/src/machine/mod.rs b/crates/api-model/src/machine/mod.rs index d87d80cfea..86fe907ef8 100644 --- a/crates/api-model/src/machine/mod.rs +++ b/crates/api-model/src/machine/mod.rs @@ -2685,7 +2685,7 @@ pub enum HardwareHealthReportsConfig { pub fn dpf_based_dpu_provisioning_possible( state: &ManagedHostStateSnapshot, dpf_enabled_at_site: bool, - reprovisioing_case: bool, + reprovisioning_case: bool, ) -> bool { // DPF is disabled at site. if !dpf_enabled_at_site { @@ -2701,11 +2701,19 @@ pub fn dpf_based_dpu_provisioning_possible( return false; } - // if it is reprovisioing case, initial ingestion should be done with dpf to continue - // reprovision. - if reprovisioing_case && !state.host_snapshot.dpf.used_for_ingestion { + // if it is reprovisioning case, initial ingestion should be done with dpf + // to continue or we should be trying to reprovision all the dpus (switching + // to DPF). Reprovisioning only a subset of DPUs cannot flip the host to DPF. + if reprovisioning_case + && !state.host_snapshot.dpf.used_for_ingestion + && !state + .dpu_snapshots + .iter() + .all(|dpu| dpu.reprovision_requested.is_some()) + { tracing::info!( - "DPF based DPU reprovisioning is not possible because initial ingestion is not done with DPF - host {}.", + "DPF based DPU reprovisioning is not possible for host {} because initial ingestion is not done with DPF \ + and not all DPUs are being reprovisioned.", state.host_snapshot.id ); return false; diff --git a/crates/api/src/tests/dpf/reprovisioning.rs b/crates/api/src/tests/dpf/reprovisioning.rs index 4bfdcf63e4..9297f6ac05 100644 --- a/crates/api/src/tests/dpf/reprovisioning.rs +++ b/crates/api/src/tests/dpf/reprovisioning.rs @@ -26,6 +26,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use carbide_dpf::DpuPhase; +use carbide_dpf::types::{DpuDeviceSummary, DpuNodeSummary, HostDpfSnapshot}; use carbide_machine_controller::dpf::{DpfOperations, MockDpfOperations}; use carbide_uuid::machine::MachineId; use model::machine::{ @@ -40,16 +41,46 @@ use crate::tests::common::api_fixtures::{ const TEST_TIMEOUT: Duration = Duration::from_secs(30); +fn snapshot_with_crs_present(dpu_count: usize) -> HostDpfSnapshot { + HostDpfSnapshot { + dpu_node: Some(DpuNodeSummary { + name: "node-mock".to_string(), + labels: Default::default(), + annotations: Default::default(), + dpu_device_refs: (0..dpu_count).map(|i| format!("device-{i}")).collect(), + }), + dpu_devices: (0..dpu_count) + .map(|i| DpuDeviceSummary { + name: format!("device-{i}"), + labels: Default::default(), + bmc_ip: None, + bmc_port: None, + serial_number: String::new(), + }) + .collect(), + dpus: vec![], + } +} + /// Build a `MockDpfOperations` with only the expectations needed for the /// initial provisioning flow triggered by `create_managed_host_with_dpf`. /// `dpu_ready` controls whether `get_dpu_phase` returns `Ready` or `Provisioning`. fn provisioning_mock(dpu_ready: Arc) -> MockDpfOperations { + provisioning_mock_with_dpu_count(dpu_ready, 1) +} + +fn provisioning_mock_with_dpu_count( + dpu_ready: Arc, + dpu_count: usize, +) -> MockDpfOperations { let mut mock = MockDpfOperations::new(); mock.expect_register_dpu_device().returning(|_| Ok(())); mock.expect_register_dpu_node().returning(|_| Ok(())); mock.expect_release_maintenance_hold().returning(|_| Ok(())); mock.expect_is_reboot_required().returning(|_| Ok(false)); mock.expect_verify_node_labels().returning(|_| Ok(true)); + mock.expect_snapshot_host() + .returning(move |_| Ok(snapshot_with_crs_present(dpu_count))); mock.expect_get_dpu_phase().returning(move |_, _| { if dpu_ready.load(Ordering::SeqCst) { Ok(DpuPhase::Ready) @@ -333,6 +364,7 @@ fn capturing_mock( dpu_ready: Arc, registered_devices: Arc>>, reprovisioned_devices: Arc>>, + dpu_count: usize, ) -> MockDpfOperations { let mut mock = MockDpfOperations::new(); @@ -345,6 +377,8 @@ fn capturing_mock( mock.expect_release_maintenance_hold().returning(|_| Ok(())); mock.expect_is_reboot_required().returning(|_| Ok(false)); mock.expect_verify_node_labels().returning(|_| Ok(true)); + mock.expect_snapshot_host() + .returning(move |_| Ok(snapshot_with_crs_present(dpu_count))); let reprovisioned_for_ready = reprovisioned_devices.clone(); mock.expect_get_dpu_phase() @@ -386,6 +420,7 @@ async fn test_multi_dpu_provisioning_registers_all_devices(pool: sqlx::PgPool) { device_ready.clone(), registered_devices.clone(), reprovisioned_devices.clone(), + 2, )); let mut config = get_config(); config.dpf = dpf_config(); @@ -440,6 +475,7 @@ async fn test_multi_dpu_reprovisioning_calls_all_dpus(pool: sqlx::PgPool) { device_ready.clone(), registered_devices, reprovisioned_devices.clone(), + 2, )); let mut config = get_config(); config.dpf = dpf_config(); @@ -635,6 +671,7 @@ async fn test_multi_dpu_reprovisioning_per_dpu(pool: sqlx::PgPool) { device_ready.clone(), registered_devices, reprovisioned_devices.clone(), + 2, )); let mut config = get_config(); config.dpf = dpf_config(); diff --git a/crates/machine-controller/src/dpf.rs b/crates/machine-controller/src/dpf.rs index 93b955c2be..9bd54e1819 100644 --- a/crates/machine-controller/src/dpf.rs +++ b/crates/machine-controller/src/dpf.rs @@ -28,6 +28,7 @@ use carbide_dpf::{ }; use carbide_uuid::machine::MachineId; use model::dpu_machine_update::OutdatedDpfDpu; +use model::machine::ManagedHostStateSnapshot; use sqlx::PgPool; use state_controller::controller::Enqueuer; use tokio::task::JoinSet; @@ -106,6 +107,49 @@ pub trait DpfOperations: Send + Sync + std::fmt::Debug { async fn find_outdated_dpus_dpf(&self) -> Result, DpfError>; } +/// Check whether the DPUNode and DPUDevice CRs are missing for the given host. +/// Registration is all-or-nothing: `create_and_register_dpudevices_and_dpunode` +/// creates every DPUDevice followed by the DPUNode in one pass, and we never +/// delete a subset. A partial CR set (node without all devices, or devices +/// without a node) therefore indicates external tampering or a half-completed +/// force-delete and requires operator intervention -- it is reported as +/// `InvalidState` rather than silently re-registered. +/// - `Ok(true)` : neither the node nor any devices exist -- safe to register. +/// - `Ok(false)` : node exists and device count matches DPU count -- nothing to do. +/// - `Err` : host has no DPF id, OR the CR set is partial/mismatched. +pub async fn dpf_dpudevices_and_dpunode_crs_noexist( + managed_host_state: &ManagedHostStateSnapshot, + dpf_sdk: &dyn DpfOperations, +) -> Result { + let managed_host = &managed_host_state.host_snapshot; + let Some(dpf_id) = managed_host.dpf_id() else { + return Err(DpfError::InvalidState(format!( + "Host {} is missing a DPF id", + managed_host.id + ))); + }; + + let dpu_count = managed_host_state.dpu_snapshots.len(); + + let node_name = carbide_dpf::dpu_node_cr_name(&dpf_id); + let dpf_sdk_host_snapshot = dpf_sdk.snapshot_host(&node_name).await?; + let dpunode_cr_exists = dpf_sdk_host_snapshot.dpu_node.is_some(); + let dpfdevice_cr_count = dpf_sdk_host_snapshot.dpu_devices.len(); + + if !dpunode_cr_exists && dpfdevice_cr_count == 0 { + return Ok(true); + } + + if dpunode_cr_exists && dpfdevice_cr_count == dpu_count { + return Ok(false); + } + + Err(DpfError::InvalidState(format!( + "Host {} has inconsistent DPF CRs for {} DPU(s): dpu_node_present={}, dpu_device_count={}", + managed_host.id, dpu_count, dpunode_cr_exists, dpfdevice_cr_count, + ))) +} + /// Applies carbide-specific labels to DPF resources. /// /// Label inheritance in DPF: diff --git a/crates/machine-controller/src/handler/dpf.rs b/crates/machine-controller/src/handler/dpf.rs index c16afbd4a4..93bffe368d 100644 --- a/crates/machine-controller/src/handler/dpf.rs +++ b/crates/machine-controller/src/handler/dpf.rs @@ -172,12 +172,10 @@ fn waiting_for_ready_exit_state( } } -/// Handle DpfState::Provisioning: register all DPU devices and the node, then -/// transition all DPUs to WaitingForReady. -async fn handle_dpf_provisioning( +async fn create_and_register_dpudevices_and_dpunode( state: &ManagedHostStateSnapshot, dpf_sdk: &dyn DpfOperations, -) -> Result, StateHandlerError> { +) -> Result<(), StateHandlerError> { let primary_dpu_id = state .host_snapshot .interfaces @@ -204,21 +202,10 @@ async fn handle_dpf_provisioning( dpu_machine_id: dpu.id.to_string(), is_primary: dpu.id == primary_dpu_id, }; - if let Err(err) = dpf_sdk.register_dpu_device(device_info).await { - return Ok(StateHandlerOutcome::transition(ManagedHostState::Failed { - details: FailureDetails { - cause: FailureCause::DpfProvisioning { - err: format!( - "DPUDevice creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}" - ), - }, - failed_at: chrono::Utc::now(), - source: FailureSource::StateMachineArea(StateMachineArea::MainFlow), - }, - machine_id: dpu.id, - retry_count: 0, - })); - } + dpf_sdk + .register_dpu_device(device_info) + .await + .map_err(dpf_error)?; } let device_ids: Vec = state @@ -231,20 +218,41 @@ async fn handle_dpf_provisioning( host_bmc_ip: bmc_ip(&state.host_snapshot)?.to_string(), device_ids, }; - if let Err(err) = dpf_sdk.register_dpu_node(node_info).await { - return Ok(StateHandlerOutcome::transition(ManagedHostState::Failed { - details: FailureDetails { - cause: FailureCause::DpfProvisioning { - err: format!( - "DPUNode creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}" - ), - }, - failed_at: chrono::Utc::now(), - source: FailureSource::StateMachineArea(StateMachineArea::MainFlow), + dpf_sdk + .register_dpu_node(node_info) + .await + .map_err(dpf_error)?; + + Ok(()) +} + +fn dpf_cr_creation_failed( + machine_id: MachineId, + err: &StateHandlerError, +) -> StateHandlerOutcome { + StateHandlerOutcome::transition(ManagedHostState::Failed { + details: FailureDetails { + cause: FailureCause::DpfProvisioning { + err: format!( + "DPUDevice/DPUNode creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}" + ), }, - machine_id: state.host_snapshot.id, - retry_count: 0, - })); + failed_at: chrono::Utc::now(), + source: FailureSource::StateMachineArea(StateMachineArea::MainFlow), + }, + machine_id, + retry_count: 0, + }) +} + +/// Handle DpfState::Provisioning: register all DPU devices and the node, then +/// transition all DPUs to WaitingForReady. +async fn handle_dpf_provisioning( + state: &ManagedHostStateSnapshot, + dpf_sdk: &dyn DpfOperations, +) -> Result, StateHandlerError> { + if let Err(err) = create_and_register_dpudevices_and_dpunode(state, dpf_sdk).await { + return Ok(dpf_cr_creation_failed(state.host_snapshot.id, &err)); } let next = @@ -385,14 +393,36 @@ fn handle_dpf_device_ready( Ok(StateHandlerOutcome::transition(next)) } -/// Handle DpfState::Reprovisioning for a single DPU: call reprovision_dpu, -/// then transition that DPU to WaitingForReady. +/// Handle DpfState::Reprovisioning +/// If the DPUNode and DPUDevice CRs do not exist, then create them +/// and transition to the next state to reprovision all DPUs to DPF. +/// Else handle the reprovisioning of a single DPU async fn handle_dpf_reprovisioning( state: &ManagedHostStateSnapshot, dpu_snapshot: &Machine, dpf_sdk: &dyn DpfOperations, ) -> Result, StateHandlerError> { let node_name = dpu_node_cr_name(&dpf_id(&state.host_snapshot)?); + let dpf_dpudevices_and_dpunode_crs_noexist = + crate::dpf::dpf_dpudevices_and_dpunode_crs_noexist(state, dpf_sdk) + .await + .map_err(dpf_error)?; + if dpf_dpudevices_and_dpunode_crs_noexist { + tracing::info!( + host = %state.host_snapshot.id, + "DPUDevice/DPUNode CRs do not exist, creating them before reprovisioning" + ); + if let Err(err) = create_and_register_dpudevices_and_dpunode(state, dpf_sdk).await { + return Ok(dpf_cr_creation_failed(state.host_snapshot.id, &err)); + } + let next = transition_all_dpus_to_dpf_state( + DpfState::WaitingForReady { phase_detail: None }, + state, + )?; + return Ok(StateHandlerOutcome::transition(next)); + } + + tracing::info!("DPF initiate reprovision of DPU {}", dpu_snapshot.id); dpf_sdk .reprovision_dpu(&dpf_id(dpu_snapshot)?, &node_name) .await