diff --git a/Cargo.lock b/Cargo.lock index 25e1ceac38..56197c07d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4711,15 +4711,21 @@ dependencies = [ "alloy-chains", "alloy-primitives", "clap", + "http", + "http-body-util", + "hyper", + "hyper-util", "kona-genesis", "kona-registry", "libc", "libp2p", + "metrics", "metrics-exporter-prometheus 0.18.1", "metrics-process", "rstest", "serde", "thiserror 2.0.17", + "tokio", "tracing", "tracing-appender", "tracing-subscriber 0.3.22", @@ -6390,17 +6396,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda" dependencies = [ "base64", - "http-body-util", - "hyper", - "hyper-util", "indexmap 2.12.1", - "ipnet", "metrics", "metrics-util 0.20.1", "quanta", "thiserror 2.0.17", - "tokio", - "tracing", ] [[package]] diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 4e48d73e3a..9400c95795 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -61,21 +61,19 @@ impl Cli { Commands::Info(ref info) => info.init_logs(&self.global)?, } - // Initialize unified metrics - init_unified_metrics(&self.global.metrics)?; - - // Allow subcommands to initialize cli metrics. - match self.subcommand { - Commands::Node(ref node) => node.init_cli_metrics(&self.global.metrics)?, - _ => { - tracing::debug!(target: "cli", "No CLI metrics initialized for subcommand: {:?}", self.subcommand) - } - } - // Run the subcommand. + // Async commands (node, net) init metrics inside the tokio runtime. + // Sync utility commands (registry, bootstore, info) don't need metrics. match self.subcommand { - Commands::Node(node) => Self::run_until_ctrl_c(node.run(&self.global)), - Commands::Net(net) => Self::run_until_ctrl_c(net.run(&self.global)), + Commands::Node(node) => Self::run_until_ctrl_c(async move { + init_unified_metrics(&self.global.metrics).await?; + node.init_cli_metrics(&self.global.metrics)?; + node.run(&self.global).await + }), + Commands::Net(net) => Self::run_until_ctrl_c(async move { + init_unified_metrics(&self.global.metrics).await?; + net.run(&self.global).await + }), Commands::Registry(registry) => registry.run(&self.global), Commands::Bootstore(bootstore) => bootstore.run(&self.global), Commands::Info(info) => info.run(&self.global), diff --git a/bin/node/src/flags/metrics.rs b/bin/node/src/flags/metrics.rs index 16b3edecfb..8e6bf5f27e 100644 --- a/bin/node/src/flags/metrics.rs +++ b/bin/node/src/flags/metrics.rs @@ -6,11 +6,10 @@ use crate::metrics::VersionInfo; use kona_cli::MetricsArgs; /// Initializes metrics for a Kona application, including Prometheus and node-specific metrics. -/// Initialize the tracing stack and Prometheus metrics recorder. /// -/// This function should be called at the beginning of the program. -pub fn init_unified_metrics(args: &MetricsArgs) -> anyhow::Result<()> { - args.init_metrics()?; +/// Must be called from within a tokio runtime. +pub async fn init_unified_metrics(args: &MetricsArgs) -> anyhow::Result<()> { + args.init_metrics().await?; if args.enabled { kona_gossip::Metrics::init(); kona_disc::Metrics::init(); diff --git a/bin/supervisor/src/cli.rs b/bin/supervisor/src/cli.rs index 326d19acb8..25e9d9da02 100644 --- a/bin/supervisor/src/cli.rs +++ b/bin/supervisor/src/cli.rs @@ -27,13 +27,12 @@ pub struct Cli { impl Cli { /// Runs the CLI. pub fn run(self) -> Result<()> { - self.metrics.init_metrics()?; - // Register build metrics - VersionInfo::from_build().register_version_metrics(); - self.init_logs(&self.global)?; Self::run_until_ctrl_c(async move { + self.metrics.init_metrics().await?; + VersionInfo::from_build().register_version_metrics(); + let config = self.supervisor.init_config().await?; let mut service = Service::new(config); diff --git a/crates/utilities/cli/Cargo.toml b/crates/utilities/cli/Cargo.toml index d42db96aa7..9c37a549cd 100644 --- a/crates/utilities/cli/Cargo.toml +++ b/crates/utilities/cli/Cargo.toml @@ -25,9 +25,15 @@ serde = { workspace = true, features = ["derive"]} clap = { workspace = true, features = ["derive", "env"] } tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "json", "tracing-log"] } tracing-appender.workspace = true -metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] } +metrics.workspace = true +metrics-exporter-prometheus.workspace = true metrics-process.workspace = true thiserror.workspace = true +tokio = { workspace = true, features = ["net", "time"] } +hyper = { version = "1.6", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } +http-body-util.workspace = true +http.workspace = true # `secrets` feature libp2p = { workspace = true, features = ["secp256k1"], optional = true } diff --git a/crates/utilities/cli/src/error.rs b/crates/utilities/cli/src/error.rs index a99705a88d..6830fa655e 100644 --- a/crates/utilities/cli/src/error.rs +++ b/crates/utilities/cli/src/error.rs @@ -1,7 +1,19 @@ //! Error types for CLI utilities. +use std::io; use thiserror::Error; +/// Error type for prometheus server initialization. +#[derive(Debug, Error)] +pub enum PrometheusError { + /// Failed to bind to the specified address. + #[error("failed to bind to address: {0}")] + Bind(#[from] io::Error), + /// Failed to set the global metrics recorder. + #[error("failed to set global metrics recorder: {0}")] + SetRecorder(#[from] metrics::SetRecorderError), +} + /// Errors that can occur in CLI operations. #[derive(Error, Debug)] pub enum CliError { @@ -18,8 +30,8 @@ pub enum CliError { UnsafeBlockSignerNotFound(u64), /// Error initializing metrics. - #[error("Failed to initialize metrics")] - MetricsInitialization(#[from] metrics_exporter_prometheus::BuildError), + #[error("Failed to initialize metrics: {0}")] + MetricsInitialization(#[from] PrometheusError), } /// Type alias for CLI results. diff --git a/crates/utilities/cli/src/flags/metrics.rs b/crates/utilities/cli/src/flags/metrics.rs index 7333270bb2..d5d65efc03 100644 --- a/crates/utilities/cli/src/flags/metrics.rs +++ b/crates/utilities/cli/src/flags/metrics.rs @@ -3,7 +3,7 @@ use crate::{CliResult, init_prometheus_server}; use clap::{Parser, arg}; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; /// Configuration for Prometheus metrics. #[derive(Debug, Clone, Parser)] @@ -18,7 +18,7 @@ pub struct MetricsArgs { )] pub enabled: bool, - /// The port to serve Prometheus metrics on. + /// The port to serve Prometheus metrics on. Use 0 to let the OS assign a port. #[arg(long = "metrics.port", global = true, default_value = "9090", env = "KONA_METRICS_PORT")] pub port: u16, @@ -39,15 +39,26 @@ impl Default for MetricsArgs { } impl MetricsArgs { - /// Initialize the tracing stack and Prometheus metrics recorder. + /// Initialize the Prometheus metrics recorder and start the metrics server. /// - /// This function should be called at the beginning of the program. - pub fn init_metrics(&self) -> CliResult<()> { + /// Must be called from within a tokio runtime. + pub async fn init_metrics(&self) -> CliResult<()> { + self.init_metrics_with_addr().await.map(drop) + } + + /// Initialize the Prometheus metrics recorder and start the metrics server. + /// + /// Returns the actual address the server is bound to. This is useful when + /// the port is set to 0, allowing the OS to assign an available port. + /// + /// Must be called from within a tokio runtime. + pub async fn init_metrics_with_addr(&self) -> CliResult> { if self.enabled { - init_prometheus_server(self.addr, self.port)?; + let addr = SocketAddr::from((self.addr, self.port)); + Ok(Some(init_prometheus_server(addr).await?)) + } else { + Ok(None) } - - Ok(()) } } diff --git a/crates/utilities/cli/src/lib.rs b/crates/utilities/cli/src/lib.rs index 4c151dec0a..37ca2b31f8 100644 --- a/crates/utilities/cli/src/lib.rs +++ b/crates/utilities/cli/src/lib.rs @@ -6,7 +6,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod error; -pub use error::{CliError, CliResult}; +pub use error::{CliError, CliResult, PrometheusError}; mod flags; pub use flags::{GlobalArgs, LogArgs, MetricsArgs, OverrideArgs}; diff --git a/crates/utilities/cli/src/prometheus.rs b/crates/utilities/cli/src/prometheus.rs index 45eb51dc6c..1222e60f73 100644 --- a/crates/utilities/cli/src/prometheus.rs +++ b/crates/utilities/cli/src/prometheus.rs @@ -1,37 +1,104 @@ //! Utilities for spinning up a prometheus metrics server. +//! +//! # Design +//! +//! We manually serve metrics via `hyper` rather than using the built-in HTTP listener from +//! `metrics-exporter-prometheus`. This is because [`PrometheusBuilder::with_http_listener`] +//! doesn't expose the actual bound address—it only accepts a [`SocketAddr`] and returns +//! `Result<(), BuildError>`. When port 0 is used (letting the OS assign an available port), +//! there's no way to discover what port was actually assigned. +//! +//! We considered pre-binding a `TcpListener` to get the port, dropping it, then passing that +//! port to the builder—but this has a race condition where another process could grab the port +//! in between. +//! +//! Instead, we use [`PrometheusBuilder::build_recorder`] to create the recorder without an HTTP +//! server, bind our own [`TcpListener`], and serve metrics manually. This approach: +//! - Eliminates the race condition (the same listener is used throughout) +//! - Allows returning the actual bound address to callers +//! - Follows the same pattern used by [reth's metrics infrastructure][reth-metrics] +//! +//! [reth-metrics]: https://github.com/paradigmxyz/reth/blob/main/crates/node/metrics/src/server.rs -use metrics_exporter_prometheus::{BuildError, PrometheusBuilder}; +use crate::PrometheusError; +use http::{Response, header::CONTENT_TYPE}; +use http_body_util::Full; +use hyper::{body::Bytes, server::conn::http1, service::service_fn}; +use hyper_util::rt::TokioIo; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_process::Collector; -use std::{ - net::{IpAddr, SocketAddr}, - thread::{self, sleep}, - time::Duration, -}; -use tracing::info; +use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::net::TcpListener; +use tracing::{error, info}; -/// Start a Prometheus metrics server on the given port. -pub fn init_prometheus_server(addr: IpAddr, metrics_port: u16) -> Result<(), BuildError> { - let prometheus_addr = SocketAddr::from((addr, metrics_port)); - let builder = PrometheusBuilder::new().with_http_listener(prometheus_addr); +/// Start a Prometheus metrics server on the given address. +/// +/// If the port is 0, the OS will assign an available port. +/// Returns the actual address the server is bound to. +/// +/// This function must be called from within a tokio runtime. +pub async fn init_prometheus_server(addr: SocketAddr) -> Result { + let listener = TcpListener::bind(addr).await?; + let actual_addr = listener.local_addr()?; - builder.install()?; + // Build recorder without HTTP server - we serve it ourselves + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = Arc::new(recorder.handle()); - // Initialise collector for system metrics e.g. CPU, memory, etc. - let collector = Collector::default(); - collector.describe(); + // Set as global recorder + metrics::set_global_recorder(recorder)?; - thread::spawn(move || { + // Spawn task for periodic system metrics collection + tokio::spawn(async move { + let collector = Collector::default(); + collector.describe(); loop { collector.collect(); - sleep(Duration::from_secs(60)); + tokio::time::sleep(Duration::from_secs(60)).await; } }); + // Spawn task to serve metrics endpoint + tokio::spawn(serve_metrics(listener, handle)); + info!( target: "prometheus", "Serving metrics at: http://{}", - prometheus_addr + actual_addr ); - Ok(()) + Ok(actual_addr) +} + +async fn serve_metrics(listener: TcpListener, handle: Arc) { + loop { + let (stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(e) => { + error!(target: "prometheus", "failed to accept connection: {}", e); + continue; + } + }; + + let handle = Arc::clone(&handle); + tokio::spawn(async move { + let service = service_fn(move |_req| { + let metrics = handle.render(); + async move { + let response = Response::builder() + .header(CONTENT_TYPE, "text/plain; charset=utf-8") + .body(Full::new(Bytes::from(metrics))) + .unwrap(); + Ok::<_, Infallible>(response) + } + }); + + if let Err(e) = http1::Builder::new() + .serve_connection(TokioIo::new(stream), service) + .await + { + error!(target: "prometheus", "error serving metrics: {}", e); + } + }); + } }