Skip to content
Open
421 changes: 317 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ mg-common = { path = "mg-common" }
rdb-types = { path = "rdb-types" }
chrono = { version = "0.4.42", features = ["serde"] }
oxide-tokio-rt = "0.1.2"
oximeter = { git = "https://github.com/oxidecomputer/omicron", branch = "main"}
oximeter-producer = { git = "https://github.com/oxidecomputer/omicron", branch = "main"}
oximeter = { git = "https://github.com/oxidecomputer/omicron", branch = "main" }
oximeter-producer = { git = "https://github.com/oxidecomputer/omicron", branch = "main" }
oxnet = { version = "0.1.4", default-features = false, features = ["schemars", "serde"] }
omicron-common = { git = "https://github.com/oxidecomputer/omicron", branch = "main"}
omicron-common = { git = "https://github.com/oxidecomputer/omicron", branch = "main" }
poptrie = { git = "https://github.com/oxidecomputer/poptrie", branch = "main" }
uuid = { version = "1.8", features = ["serde", "v4"] }
smf = { git = "https://github.com/illumos/smf-rs", branch = "main" }
libc = "0.2"
Expand Down
162 changes: 161 additions & 1 deletion mg-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use dropshot::{
use dropshot_api_manager_types::api_versions;
use rdb::{
BfdPeerConfig, Path as RdbPath, Prefix, Prefix4, Prefix6, StaticRouteKey,
types::{AddressFamily, ProtocolFilter},
types::{AddressFamily, MulticastRoute, MulticastRouteKey, ProtocolFilter},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -40,6 +40,7 @@ api_versions!([
// | example for the next person.
// v
// (next_int, IDENT),
(4, MULTICAST_SUPPORT),
(3, SWITCH_IDENTIFIERS),
(2, IPV6_BASIC),
(1, INITIAL),
Expand Down Expand Up @@ -377,6 +378,72 @@ pub trait MgAdminApi {
async fn switch_identifiers(
ctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<SwitchIdentifiers>, HttpError>;

// ========================= MRIB: Multicast ==============================
//
// Multicast routing is API-driven with Omicron as the source of truth.
// Static route endpoints (add/delete) are intended for Nexus RPW use.
// Direct operator configuration should go through the Oxide API to
// maintain consistency with the control plane's view of group membership.

/// Get imported multicast routes (`mrib_in`).
///
/// When `group` is provided, returns a specific route.
/// When `group` is omitted, returns all routes (with optional filters).
#[endpoint { method = GET, path = "/mrib/status/imported", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn get_mrib_imported(
rqctx: RequestContext<Self::Context>,
query: Query<MribQuery>,
) -> Result<HttpResponseOk<Vec<MulticastRoute>>, HttpError>;

/// Get selected multicast routes (`mrib_loc`, RPF-validated).
///
/// When `group` is provided, returns a specific route.
/// When `group` is omitted, returns all routes (with optional filters).
#[endpoint { method = GET, path = "/mrib/status/selected", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn get_mrib_selected(
rqctx: RequestContext<Self::Context>,
query: Query<MribQuery>,
) -> Result<HttpResponseOk<Vec<MulticastRoute>>, HttpError>;

/// Add static multicast routes.
///
/// This endpoint is intended for Nexus RPW use. Operators should
/// configure multicast group membership through the Oxide API.
#[endpoint { method = PUT, path = "/static/mroute", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn static_add_mcast_route(
rqctx: RequestContext<Self::Context>,
request: TypedBody<MribAddStaticRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// Remove static multicast routes.
///
/// This endpoint is intended for Nexus RPW use. Operators should
/// configure multicast group membership through the Oxide API.
#[endpoint { method = DELETE, path = "/static/mroute", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn static_remove_mcast_route(
rqctx: RequestContext<Self::Context>,
request: TypedBody<MribDeleteStaticRequest>,
) -> Result<HttpResponseDeleted, HttpError>;

/// List all static multicast routes.
#[endpoint { method = GET, path = "/static/mroute", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn static_list_mcast_routes(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Vec<MulticastRoute>>, HttpError>;

/// Get the current RPF rebuild interval.
#[endpoint { method = GET, path = "/mrib/config/rpf/rebuild-interval", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn read_mrib_rpf_rebuild_interval(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<MribRpfRebuildIntervalResponse>, HttpError>;

/// Set the RPF rebuild interval.
#[endpoint { method = POST, path = "/mrib/config/rpf/rebuild-interval", versions = VERSION_MULTICAST_SUPPORT.. }]
async fn update_mrib_rpf_rebuild_interval(
rqctx: RequestContext<Self::Context>,
request: TypedBody<MribRpfRebuildIntervalRequest>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
}

/// Identifiers for a switch.
Expand Down Expand Up @@ -577,6 +644,99 @@ impl From<StaticRoute6> for StaticRouteKey {
}
}

// ========================= MRIB Types ==============================

/// Input for adding static multicast routes.
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
pub struct StaticMulticastRouteInput {
/// The multicast route key (S,G) or (*,G).
pub key: MulticastRouteKey,
/// Underlay unicast nexthops for multicast replication.
///
/// Unicast IPv6 addresses where encapsulated overlay multicast traffic
/// is forwarded. These are sled underlay addresses hosting VMs subscribed
/// to the multicast group. Forms the outgoing interface list (OIL).
pub underlay_nexthops: Vec<Ipv6Addr>,
/// Underlay multicast group address (ff04::X).
///
/// Admin-local scoped IPv6 multicast address corresponding to the overlay
/// multicast group. 1:1 mapped and always derived from the overlay
/// multicast group in Omicron.
pub underlay_group: Ipv6Addr,
}

/// Request body for adding static multicast routes to the MRIB.
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct MribAddStaticRequest {
/// List of static multicast routes to add.
pub routes: Vec<StaticMulticastRouteInput>,
}

/// Request body for deleting static multicast routes from the MRIB.
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct MribDeleteStaticRequest {
/// List of route keys to delete.
pub keys: Vec<MulticastRouteKey>,
}

/// Response containing the current RPF rebuild interval.
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct MribRpfRebuildIntervalResponse {
/// Minimum interval between RPF cache rebuilds in milliseconds.
pub interval_ms: u64,
}

/// Request body for setting the RPF rebuild interval.
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct MribRpfRebuildIntervalRequest {
/// Minimum interval between RPF cache rebuilds in milliseconds.
pub interval_ms: u64,
}

/// Filter for multicast route origin.
#[derive(
Debug, Clone, Copy, Deserialize, Serialize, JsonSchema, PartialEq, Eq,
)]
#[serde(rename_all = "snake_case")]
pub enum RouteOriginFilter {
/// Static routes only (operator configured).
Static,
/// Dynamic routes only (learned via IGMP, MLD, etc.).
Dynamic,
}

/// Query parameters for MRIB routes.
///
/// When `group` is provided, looks up a specific route.
/// When `group` is omitted, lists all routes (with optional filters).
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct MribQuery {
/// Multicast group address. If provided, returns a specific route.
/// If omitted, returns all routes matching the filters.
#[serde(default)]
pub group: Option<IpAddr>,
/// Source address (`None` for (*,G) routes). Only used when `group` is set.
#[serde(default)]
pub source: Option<IpAddr>,
/// VNI (defaults to 77 for fleet-scoped multicast).
/// Only used when `group` is set.
#[serde(default = "default_multicast_vni")]
pub vni: u32,
/// Filter by address family. Only used when listing all routes.
#[serde(default)]
pub address_family: Option<AddressFamily>,
/// Filter by route origin ("static" or "dynamic").
/// Only used when listing all routes.
#[serde(default)]
pub route_origin: Option<RouteOriginFilter>,
}

fn default_multicast_vni() -> u32 {
rdb::DEFAULT_MULTICAST_VNI
}

// ========================= RIB Types ==============================

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct RibQuery {
/// Filter by address family (None means all families)
Expand Down
85 changes: 57 additions & 28 deletions mg-common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use std::os::unix::io::AsRawFd;
use std::process::Command;
use std::sync::{Arc, Mutex};

pub const DEFAULT_INTERVAL: u64 = 1;
pub const DEFAULT_ITERATIONS: u64 = 30;
/// Default polling interval in milliseconds for wait_for macros.
pub const DEFAULT_INTERVAL_MS: u64 = 10;
/// Default number of iterations for wait_for macros (30 seconds total).
pub const DEFAULT_ITERATIONS: u64 = 3000;

// Note: get_test_db has been moved to rdb::test::get_test_db
// to break the circular dependency between mg-common and rdb.
Expand All @@ -37,79 +39,109 @@ impl FileLockExt for File {
}
}

/// Wait for two expressions to be equal, polling at the given interval.
///
/// # Arguments
/// - `$lhs`, `$rhs`: Expressions to compare
/// - `$interval_ms`: Polling interval in milliseconds
/// - `$count`: Maximum number of iterations
/// - `$msg`: Optional panic message
#[macro_export]
macro_rules! wait_for_eq {
($lhs:expr, $rhs:expr, $period:expr, $count:expr, $msg:tt) => {
wait_for!($lhs == $rhs, $period, $count, $msg);
($lhs:expr, $rhs:expr, $interval_ms:expr, $count:expr, $msg:tt) => {
wait_for!($lhs == $rhs, $interval_ms, $count, $msg);
};
($lhs:expr, $rhs:expr, $period:expr, $count:expr) => {
wait_for!($lhs == $rhs, $period, $count);
($lhs:expr, $rhs:expr, $interval_ms:expr, $count:expr) => {
wait_for!($lhs == $rhs, $interval_ms, $count);
};
($lhs:expr, $rhs:expr, $msg:tt) => {
wait_for!(
$lhs == $rhs,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS,
$msg
);
};
($lhs:expr, $rhs:expr) => {
wait_for!(
$lhs == $rhs,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS
);
};
}

/// Wait for two expressions to be not equal, polling at the given interval.
///
/// # Arguments
/// - `$lhs`, `$rhs`: Expressions to compare
/// - `$interval_ms`: Polling interval in milliseconds
/// - `$count`: Maximum number of iterations
/// - `$msg`: Optional panic message
#[macro_export]
macro_rules! wait_for_neq {
($lhs:expr, $rhs:expr, $period:expr, $count:expr, $msg:tt) => {
wait_for!($lhs != $rhs, $period, $count, $msg);
($lhs:expr, $rhs:expr, $interval_ms:expr, $count:expr, $msg:tt) => {
wait_for!($lhs != $rhs, $interval_ms, $count, $msg);
};
($lhs:expr, $rhs:expr, $period:expr, $count:expr) => {
wait_for!($lhs != $rhs, $period, $count);
($lhs:expr, $rhs:expr, $interval_ms:expr, $count:expr) => {
wait_for!($lhs != $rhs, $interval_ms, $count);
};
($lhs:expr, $rhs:expr, $msg:tt) => {
wait_for!(
$lhs != $rhs,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS,
$msg
);
};
($lhs:expr, $rhs:expr) => {
wait_for!(
$lhs != $rhs,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS
);
};
}

/// Wait for a condition to become true, polling at the given interval.
///
/// # Arguments
/// - `$cond`: Condition expression to poll
/// - `$interval_ms`: Polling interval in milliseconds
/// - `$count`: Maximum number of iterations
/// - `$msg`: Optional panic message
///
/// # Example
/// ```ignore
/// // Wait up to 5 seconds (10ms × 500) for condition
/// wait_for!(some_condition(), 10, 500, "condition not met");
///
/// // Use defaults (10ms × 3000 = 30 seconds)
/// wait_for!(some_condition());
/// ```
#[macro_export]
macro_rules! wait_for {
($cond:expr, $period:expr, $count:expr, $msg:tt) => {
($cond:expr, $interval_ms:expr, $count:expr, $msg:tt) => {
let mut ok = false;
for _ in 0..$count {
if $cond {
ok = true;
break;
}
std::thread::sleep(std::time::Duration::from_secs($period));
std::thread::sleep(std::time::Duration::from_millis($interval_ms));
}
if !ok {
assert!($cond, $msg);
}
};
($cond:expr, $period:expr, $count:expr) => {
($cond:expr, $interval_ms:expr, $count:expr) => {
let mut ok = false;
for _ in 0..$count {
if $cond {
ok = true;
break;
}
std::thread::sleep(std::time::Duration::from_secs($period));
std::thread::sleep(std::time::Duration::from_millis($interval_ms));
}
if !ok {
assert!($cond);
Expand All @@ -118,15 +150,15 @@ macro_rules! wait_for {
($cond:expr, $msg:tt) => {
wait_for!(
$cond,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS,
$msg
);
};
($cond:expr) => {
wait_for!(
$cond,
mg_common::test::DEFAULT_INTERVAL,
mg_common::test::DEFAULT_INTERVAL_MS,
mg_common::test::DEFAULT_ITERATIONS
);
};
Expand Down Expand Up @@ -703,7 +735,7 @@ pub fn dump_thread_stacks() -> Result<String, std::io::Error> {
{
// Use gdb to attach and dump thread backtraces
let output = Command::new("gdb")
.args(&[
.args([
"-batch",
"-ex",
"thread apply all bt",
Expand All @@ -713,13 +745,10 @@ pub fn dump_thread_stacks() -> Result<String, std::io::Error> {
.output()?;

if !output.status.success() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"gdb failed: {}",
String::from_utf8_lossy(&output.stderr)
),
));
return Err(std::io::Error::other(format!(
"gdb failed: {}",
String::from_utf8_lossy(&output.stderr)
)));
}

Ok(String::from_utf8_lossy(&output.stdout).to_string())
Expand Down
Loading