Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions crates/api-model/src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2685,7 +2685,7 @@ pub enum HardwareHealthReportsConfig {
pub fn dpf_based_dpu_provisioning_possible(
state: &ManagedHostStateSnapshot,
dpf_enabled_at_site: bool,
reprovisioing_case: bool,
reprovisioning_case: bool,
) -> bool {
// DPF is disabled at site.
if !dpf_enabled_at_site {
Expand All @@ -2701,11 +2701,19 @@ pub fn dpf_based_dpu_provisioning_possible(
return false;
}

// if it is reprovisioing case, initial ingestion should be done with dpf to continue
// reprovision.
if reprovisioing_case && !state.host_snapshot.dpf.used_for_ingestion {
// if it is reprovisioning case, initial ingestion should be done with dpf
// to continue or we should be trying to reprovision all the dpus (switching
// to DPF). Reprovisioning only a subset of DPUs cannot flip the host to DPF.
if reprovisioning_case
&& !state.host_snapshot.dpf.used_for_ingestion
&& !state
.dpu_snapshots
.iter()
.all(|dpu| dpu.reprovision_requested.is_some())
{
tracing::info!(
"DPF based DPU reprovisioning is not possible because initial ingestion is not done with DPF - host {}.",
"DPF based DPU reprovisioning is not possible for host {} because initial ingestion is not done with DPF \
and not all DPUs are being reprovisioned.",
state.host_snapshot.id
);
return false;
Expand Down
37 changes: 37 additions & 0 deletions crates/api/src/tests/dpf/reprovisioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use carbide_dpf::DpuPhase;
use carbide_dpf::types::{DpuDeviceSummary, DpuNodeSummary, HostDpfSnapshot};
use carbide_machine_controller::dpf::{DpfOperations, MockDpfOperations};
use carbide_uuid::machine::MachineId;
use model::machine::{
Expand All @@ -40,16 +41,46 @@ use crate::tests::common::api_fixtures::{

const TEST_TIMEOUT: Duration = Duration::from_secs(30);

fn snapshot_with_crs_present(dpu_count: usize) -> HostDpfSnapshot {
HostDpfSnapshot {
dpu_node: Some(DpuNodeSummary {
name: "node-mock".to_string(),
labels: Default::default(),
annotations: Default::default(),
dpu_device_refs: (0..dpu_count).map(|i| format!("device-{i}")).collect(),
}),
dpu_devices: (0..dpu_count)
.map(|i| DpuDeviceSummary {
name: format!("device-{i}"),
labels: Default::default(),
bmc_ip: None,
bmc_port: None,
serial_number: String::new(),
})
.collect(),
dpus: vec![],
}
}

/// Build a `MockDpfOperations` with only the expectations needed for the
/// initial provisioning flow triggered by `create_managed_host_with_dpf`.
/// `dpu_ready` controls whether `get_dpu_phase` returns `Ready` or `Provisioning`.
fn provisioning_mock(dpu_ready: Arc<AtomicBool>) -> MockDpfOperations {
provisioning_mock_with_dpu_count(dpu_ready, 1)
}

fn provisioning_mock_with_dpu_count(
dpu_ready: Arc<AtomicBool>,
dpu_count: usize,
) -> MockDpfOperations {
let mut mock = MockDpfOperations::new();
mock.expect_register_dpu_device().returning(|_| Ok(()));
mock.expect_register_dpu_node().returning(|_| Ok(()));
mock.expect_release_maintenance_hold().returning(|_| Ok(()));
mock.expect_is_reboot_required().returning(|_| Ok(false));
mock.expect_verify_node_labels().returning(|_| Ok(true));
mock.expect_snapshot_host()
.returning(move |_| Ok(snapshot_with_crs_present(dpu_count)));
mock.expect_get_dpu_phase().returning(move |_, _| {
if dpu_ready.load(Ordering::SeqCst) {
Ok(DpuPhase::Ready)
Expand Down Expand Up @@ -333,6 +364,7 @@ fn capturing_mock(
dpu_ready: Arc<AtomicBool>,
registered_devices: Arc<Mutex<Vec<String>>>,
reprovisioned_devices: Arc<Mutex<Vec<String>>>,
dpu_count: usize,
) -> MockDpfOperations {
let mut mock = MockDpfOperations::new();

Expand All @@ -345,6 +377,8 @@ fn capturing_mock(
mock.expect_release_maintenance_hold().returning(|_| Ok(()));
mock.expect_is_reboot_required().returning(|_| Ok(false));
mock.expect_verify_node_labels().returning(|_| Ok(true));
mock.expect_snapshot_host()
.returning(move |_| Ok(snapshot_with_crs_present(dpu_count)));

let reprovisioned_for_ready = reprovisioned_devices.clone();
mock.expect_get_dpu_phase()
Expand Down Expand Up @@ -386,6 +420,7 @@ async fn test_multi_dpu_provisioning_registers_all_devices(pool: sqlx::PgPool) {
device_ready.clone(),
registered_devices.clone(),
reprovisioned_devices.clone(),
2,
));
let mut config = get_config();
config.dpf = dpf_config();
Expand Down Expand Up @@ -440,6 +475,7 @@ async fn test_multi_dpu_reprovisioning_calls_all_dpus(pool: sqlx::PgPool) {
device_ready.clone(),
registered_devices,
reprovisioned_devices.clone(),
2,
));
let mut config = get_config();
config.dpf = dpf_config();
Expand Down Expand Up @@ -635,6 +671,7 @@ async fn test_multi_dpu_reprovisioning_per_dpu(pool: sqlx::PgPool) {
device_ready.clone(),
registered_devices,
reprovisioned_devices.clone(),
2,
));
let mut config = get_config();
config.dpf = dpf_config();
Expand Down
44 changes: 44 additions & 0 deletions crates/machine-controller/src/dpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use carbide_dpf::{
};
use carbide_uuid::machine::MachineId;
use model::dpu_machine_update::OutdatedDpfDpu;
use model::machine::ManagedHostStateSnapshot;
use sqlx::PgPool;
use state_controller::controller::Enqueuer;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -106,6 +107,49 @@ pub trait DpfOperations: Send + Sync + std::fmt::Debug {
async fn find_outdated_dpus_dpf(&self) -> Result<Vec<OutdatedDpfDpu>, DpfError>;
}

/// Check whether the DPUNode and DPUDevice CRs are missing for the given host.
/// Registration is all-or-nothing: `create_and_register_dpudevices_and_dpunode`
/// creates every DPUDevice followed by the DPUNode in one pass, and we never
/// delete a subset. A partial CR set (node without all devices, or devices
/// without a node) therefore indicates external tampering or a half-completed
/// force-delete and requires operator intervention -- it is reported as
/// `InvalidState` rather than silently re-registered.
/// - `Ok(true)` : neither the node nor any devices exist -- safe to register.
/// - `Ok(false)` : node exists and device count matches DPU count -- nothing to do.
/// - `Err` : host has no DPF id, OR the CR set is partial/mismatched.
pub async fn dpf_dpudevices_and_dpunode_crs_noexist(
managed_host_state: &ManagedHostStateSnapshot,
dpf_sdk: &dyn DpfOperations,
) -> Result<bool, DpfError> {
let managed_host = &managed_host_state.host_snapshot;
let Some(dpf_id) = managed_host.dpf_id() else {
return Err(DpfError::InvalidState(format!(
"Host {} is missing a DPF id",
managed_host.id
)));
};

let dpu_count = managed_host_state.dpu_snapshots.len();

let node_name = carbide_dpf::dpu_node_cr_name(&dpf_id);
let dpf_sdk_host_snapshot = dpf_sdk.snapshot_host(&node_name).await?;
let dpunode_cr_exists = dpf_sdk_host_snapshot.dpu_node.is_some();
let dpfdevice_cr_count = dpf_sdk_host_snapshot.dpu_devices.len();

if !dpunode_cr_exists && dpfdevice_cr_count == 0 {
return Ok(true);
}

if dpunode_cr_exists && dpfdevice_cr_count == dpu_count {
return Ok(false);
}

Err(DpfError::InvalidState(format!(
"Host {} has inconsistent DPF CRs for {} DPU(s): dpu_node_present={}, dpu_device_count={}",
managed_host.id, dpu_count, dpunode_cr_exists, dpfdevice_cr_count,
)))
}

/// Applies carbide-specific labels to DPF resources.
///
/// Label inheritance in DPF:
Expand Down
98 changes: 64 additions & 34 deletions crates/machine-controller/src/handler/dpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,10 @@ fn waiting_for_ready_exit_state(
}
}

/// Handle DpfState::Provisioning: register all DPU devices and the node, then
/// transition all DPUs to WaitingForReady.
async fn handle_dpf_provisioning(
async fn create_and_register_dpudevices_and_dpunode(
state: &ManagedHostStateSnapshot,
dpf_sdk: &dyn DpfOperations,
) -> Result<StateHandlerOutcome<ManagedHostState>, StateHandlerError> {
) -> Result<(), StateHandlerError> {
let primary_dpu_id = state
.host_snapshot
.interfaces
Expand All @@ -204,21 +202,10 @@ async fn handle_dpf_provisioning(
dpu_machine_id: dpu.id.to_string(),
is_primary: dpu.id == primary_dpu_id,
};
if let Err(err) = dpf_sdk.register_dpu_device(device_info).await {
return Ok(StateHandlerOutcome::transition(ManagedHostState::Failed {
details: FailureDetails {
cause: FailureCause::DpfProvisioning {
err: format!(
"DPUDevice creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}"
),
},
failed_at: chrono::Utc::now(),
source: FailureSource::StateMachineArea(StateMachineArea::MainFlow),
},
machine_id: dpu.id,
retry_count: 0,
}));
}
dpf_sdk
.register_dpu_device(device_info)
.await
.map_err(dpf_error)?;
}

let device_ids: Vec<String> = state
Expand All @@ -231,20 +218,41 @@ async fn handle_dpf_provisioning(
host_bmc_ip: bmc_ip(&state.host_snapshot)?.to_string(),
device_ids,
};
if let Err(err) = dpf_sdk.register_dpu_node(node_info).await {
return Ok(StateHandlerOutcome::transition(ManagedHostState::Failed {
details: FailureDetails {
cause: FailureCause::DpfProvisioning {
err: format!(
"DPUNode creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}"
),
},
failed_at: chrono::Utc::now(),
source: FailureSource::StateMachineArea(StateMachineArea::MainFlow),
dpf_sdk
.register_dpu_node(node_info)
.await
.map_err(dpf_error)?;

Ok(())
}

fn dpf_cr_creation_failed(
machine_id: MachineId,
err: &StateHandlerError,
) -> StateHandlerOutcome<ManagedHostState> {
StateHandlerOutcome::transition(ManagedHostState::Failed {
details: FailureDetails {
cause: FailureCause::DpfProvisioning {
err: format!(
"DPUDevice/DPUNode creation failed. Force-delete again to clean old values. Wait until DPU CR are deleted. {err}"
),
},
machine_id: state.host_snapshot.id,
retry_count: 0,
}));
failed_at: chrono::Utc::now(),
source: FailureSource::StateMachineArea(StateMachineArea::MainFlow),
},
machine_id,
retry_count: 0,
})
}

/// Handle DpfState::Provisioning: register all DPU devices and the node, then
/// transition all DPUs to WaitingForReady.
async fn handle_dpf_provisioning(
state: &ManagedHostStateSnapshot,
dpf_sdk: &dyn DpfOperations,
) -> Result<StateHandlerOutcome<ManagedHostState>, StateHandlerError> {
if let Err(err) = create_and_register_dpudevices_and_dpunode(state, dpf_sdk).await {
return Ok(dpf_cr_creation_failed(state.host_snapshot.id, &err));
}

let next =
Expand Down Expand Up @@ -385,14 +393,36 @@ fn handle_dpf_device_ready(
Ok(StateHandlerOutcome::transition(next))
}

/// Handle DpfState::Reprovisioning for a single DPU: call reprovision_dpu,
/// then transition that DPU to WaitingForReady.
/// Handle DpfState::Reprovisioning
/// If the DPUNode and DPUDevice CRs do not exist, then create them
/// and transition to the next state to reprovision all DPUs to DPF.
/// Else handle the reprovisioning of a single DPU
async fn handle_dpf_reprovisioning(
state: &ManagedHostStateSnapshot,
dpu_snapshot: &Machine,
dpf_sdk: &dyn DpfOperations,
) -> Result<StateHandlerOutcome<ManagedHostState>, StateHandlerError> {
let node_name = dpu_node_cr_name(&dpf_id(&state.host_snapshot)?);
let dpf_dpudevices_and_dpunode_crs_noexist =
crate::dpf::dpf_dpudevices_and_dpunode_crs_noexist(state, dpf_sdk)
.await
.map_err(dpf_error)?;
if dpf_dpudevices_and_dpunode_crs_noexist {
tracing::info!(
host = %state.host_snapshot.id,
"DPUDevice/DPUNode CRs do not exist, creating them before reprovisioning"
);
if let Err(err) = create_and_register_dpudevices_and_dpunode(state, dpf_sdk).await {
return Ok(dpf_cr_creation_failed(state.host_snapshot.id, &err));
}
let next = transition_all_dpus_to_dpf_state(
DpfState::WaitingForReady { phase_detail: None },
state,
)?;
return Ok(StateHandlerOutcome::transition(next));
}

tracing::info!("DPF initiate reprovision of DPU {}", dpu_snapshot.id);
dpf_sdk
Comment thread
aadvani-nvidia marked this conversation as resolved.
.reprovision_dpu(&dpf_id(dpu_snapshot)?, &node_name)
.await
Expand Down
Loading