From 1019ae87bb4bf6cfab7ea1011a012586fed26da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Domen=20Ko=C5=BEar?= Date: Mon, 1 Dec 2025 16:26:03 +0100 Subject: [PATCH] feat(build): refactor BuildService to streaming RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the unary DoBuild RPC with a server-side streaming RPC that yields incremental BuildEvent messages. This enables real-time log streaming and progress updates during builds. Event types: - BuildStarted: emitted when build begins with build_id - LogOutput: stdout/stderr log lines streamed as they occur - RefscanResult: reference scanning results per output - BuildCompleted: final outputs with nodes and needles - BuildFailed: error message and optional exit code Build failures are reported as BuildFailed events rather than stream errors, allowing clean stream termination. Cancellation is handled via stream drop (FuseDaemon cleanup on Drop). Also adds comprehensive tests for proto conversions and DummyBuildService streaming behavior. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Change-Id: I158cb2c14828def4db6a929610219e195303896d --- snix/Cargo.lock | 1 + snix/build/Cargo.toml | 1 + snix/build/protos/build.proto | 51 +- snix/build/protos/rpc_build.proto | 4 +- snix/build/src/buildservice/build_request.rs | 56 +++ snix/build/src/buildservice/dummy.rs | 66 ++- snix/build/src/buildservice/grpc.rs | 31 +- snix/build/src/buildservice/mod.rs | 19 +- snix/build/src/buildservice/oci.rs | 306 +++++++----- .../src/proto/grpc_buildservice_wrapper.rs | 23 +- snix/build/src/proto/mod.rs | 450 +++++++++++++++--- snix/glue/src/snix_store_io.rs | 36 +- 12 files changed, 832 insertions(+), 212 deletions(-) diff --git a/snix/Cargo.lock b/snix/Cargo.lock index 8394e5cd9..e28d041b0 100644 --- a/snix/Cargo.lock +++ b/snix/Cargo.lock @@ -4095,6 +4095,7 @@ name = "snix-build" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "blake3", "bstr", "bytes", diff --git a/snix/build/Cargo.toml b/snix/build/Cargo.toml index 9b5c1d96f..3e99bffed 100644 --- a/snix/build/Cargo.toml +++ b/snix/build/Cargo.toml @@ -21,6 +21,7 @@ mimalloc.workspace = true tonic-reflection = { workspace = true, optional = true } anyhow = "1.0.79" +async-stream = "0.3" blake3 = "1.5.0" bstr = "1.6.0" data-encoding = "2.5.0" diff --git a/snix/build/protos/build.proto b/snix/build/protos/build.proto index 516d776a6..22deed86e 100644 --- a/snix/build/protos/build.proto +++ b/snix/build/protos/build.proto @@ -155,8 +155,47 @@ message BuildRequest { // TODO: allow describing something like "preferLocal", to influence composition? } -// A BuildResponse is (one possible) outcome of executing a [BuildRequest]. -message BuildResponse { +// BuildEvent represents a single event during a build process. +// The stream of events will always end with either BuildCompleted or BuildFailed. +message BuildEvent { + oneof event { + BuildStarted started = 1; + LogOutput log = 2; + RefscanResult refscan = 3; + BuildCompleted completed = 4; + BuildFailed failed = 5; + } +} + +// Emitted at the start of a build. +message BuildStarted { + // A unique identifier for this build. + string build_id = 1; +} + +// A line of log output from the build process. +message LogOutput { + enum Stream { + STREAM_UNSPECIFIED = 0; + STREAM_STDOUT = 1; + STREAM_STDERR = 2; + } + // Which output stream this line came from. + Stream stream = 1; + // The log line data (including newline). + bytes data = 2; +} + +// Reference scanning result for a single output. +message RefscanResult { + // Index of the output in the original BuildRequest.outputs. + uint32 output_index = 1; + // Indexes into [BuildRequest::refscan_needles] found in this output. + repeated uint64 found_needles = 2; +} + +// Emitted when a build completes successfully. +message BuildCompleted { // The outputs that were produced after successfully building. // They are provided in the same order as specified in the [BuildRequest]. repeated Output outputs = 1; @@ -170,8 +209,14 @@ message BuildResponse { // Indexes into the found [BuildRequest::refscan_needles] in this output. repeated uint64 needles = 2; } +} - // TODO: where did this run, how long, logs, … +// Emitted when a build fails. +message BuildFailed { + // Human-readable error message. + string message = 1; + // Exit code of the build process, if available. + optional int32 exit_code = 2; } /// TODO: check remarkable notes on constraints again diff --git a/snix/build/protos/rpc_build.proto b/snix/build/protos/rpc_build.proto index 969e1d51c..9d41f1cf6 100644 --- a/snix/build/protos/rpc_build.proto +++ b/snix/build/protos/rpc_build.proto @@ -10,5 +10,7 @@ import "snix/build/protos/build.proto"; option go_package = "snix.dev/build/proto;buildv1"; service BuildService { - rpc DoBuild(BuildRequest) returns (BuildResponse); + // Execute a build and stream events back. + // Dropping the stream signals cancellation. + rpc DoBuild(BuildRequest) returns (stream BuildEvent); } diff --git a/snix/build/src/buildservice/build_request.rs b/snix/build/src/buildservice/build_request.rs index 2f54c5700..7d45b96c0 100644 --- a/snix/build/src/buildservice/build_request.rs +++ b/snix/build/src/buildservice/build_request.rs @@ -146,3 +146,59 @@ pub struct BuildOutput { /// Indexes into the found [BuildRequest::refscan_needles] in that output. pub output_needles: BTreeSet, } + +/// An event emitted during a build process. +#[derive(Debug, Clone)] +pub enum BuildEvent { + /// Emitted at the start of a build. + Started(BuildStarted), + /// A line of log output from the build process. + Log(LogOutput), + /// Reference scanning result for an output. + RefscanResult(RefscanResultEvent), + /// The build completed successfully. + Completed(BuildResult), + /// The build failed. + Failed(BuildError), +} + +/// Emitted at the start of a build. +#[derive(Debug, Clone)] +pub struct BuildStarted { + /// A unique identifier for this build. + pub build_id: String, +} + +/// Which output stream a log line came from. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogStream { + Stdout, + Stderr, +} + +/// A line of log output from the build process. +#[derive(Debug, Clone)] +pub struct LogOutput { + /// Which output stream this line came from. + pub stream: LogStream, + /// The log line data (including newline). + pub data: Bytes, +} + +/// Reference scanning result for a single output. +#[derive(Debug, Clone)] +pub struct RefscanResultEvent { + /// Index of the output in the original BuildRequest.outputs. + pub output_index: usize, + /// Indexes into [BuildRequest::refscan_needles] found in this output. + pub found_needles: Vec, +} + +/// Describes a build failure. +#[derive(Debug, Clone)] +pub struct BuildError { + /// Human-readable error message. + pub message: String, + /// Exit code of the build process, if available. + pub exit_code: Option, +} diff --git a/snix/build/src/buildservice/dummy.rs b/snix/build/src/buildservice/dummy.rs index 2d3864e7b..9c151c50a 100644 --- a/snix/build/src/buildservice/dummy.rs +++ b/snix/build/src/buildservice/dummy.rs @@ -1,18 +1,66 @@ -use tonic::async_trait; +use futures::stream; use tracing::instrument; -use super::BuildService; -use crate::buildservice::{BuildRequest, BuildResult}; +use super::{BuildEventStream, BuildService}; +use crate::buildservice::BuildRequest; #[derive(Default)] pub struct DummyBuildService {} -#[async_trait] impl BuildService for DummyBuildService { - #[instrument(skip(self), ret, err)] - async fn do_build(&self, _request: BuildRequest) -> std::io::Result { - Err(std::io::Error::other( - "builds are not supported with DummyBuildService", - )) + #[instrument(skip(self))] + fn do_build(&self, _request: BuildRequest) -> BuildEventStream { + Box::pin(stream::once(async { + Err(std::io::Error::other( + "builds are not supported with DummyBuildService", + )) + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use std::collections::BTreeMap; + use std::path::PathBuf; + + fn make_dummy_request() -> BuildRequest { + BuildRequest { + inputs: BTreeMap::new(), + command_args: vec!["echo".to_string(), "hello".to_string()], + working_dir: PathBuf::from("build"), + scratch_paths: vec![PathBuf::from("build")], + inputs_dir: PathBuf::from("nix/store"), + outputs: vec![PathBuf::from("build/out")], + environment_vars: vec![], + constraints: Default::default(), + additional_files: vec![], + refscan_needles: vec![], + } + } + + #[tokio::test] + async fn test_dummy_build_service_returns_error_stream() { + let service = DummyBuildService::default(); + let mut stream = service.do_build(make_dummy_request()); + + // First item should be an error + let first = stream.next().await; + assert!(first.is_some(), "stream should yield at least one item"); + + let result = first.unwrap(); + assert!(result.is_err(), "should be an error"); + + let err = result.unwrap_err(); + assert!( + err.to_string() + .contains("builds are not supported with DummyBuildService"), + "error message should explain builds are not supported" + ); + + // Stream should end after the error + let second = stream.next().await; + assert!(second.is_none(), "stream should end after the error"); } } diff --git a/snix/build/src/buildservice/grpc.rs b/snix/build/src/buildservice/grpc.rs index ca735866e..3b656b700 100644 --- a/snix/build/src/buildservice/grpc.rs +++ b/snix/build/src/buildservice/grpc.rs @@ -1,9 +1,9 @@ -use tonic::{async_trait, transport::Channel}; +use tonic::transport::Channel; -use crate::buildservice::BuildRequest; +use crate::buildservice::{BuildEvent, BuildRequest}; use crate::proto::{self, build_service_client::BuildServiceClient}; -use super::{BuildResult, BuildService}; +use super::{BuildEventStream, BuildService}; pub struct GRPCBuildService { client: BuildServiceClient, @@ -16,16 +16,25 @@ impl GRPCBuildService { } } -#[async_trait] impl BuildService for GRPCBuildService { - async fn do_build(&self, request: BuildRequest) -> std::io::Result { + fn do_build(&self, request: BuildRequest) -> BuildEventStream { let mut client = self.client.clone(); - let resp = client - .do_build(Into::::into(request)) - .await - .map_err(std::io::Error::other)? - .into_inner(); + let proto_request: proto::BuildRequest = request.into(); - Ok::(resp.try_into().map_err(std::io::Error::other)?) + let stream = async_stream::try_stream! { + let response = client + .do_build(proto_request) + .await + .map_err(std::io::Error::other)?; + + let mut stream = response.into_inner(); + + while let Some(event) = stream.message().await.map_err(std::io::Error::other)? { + let event: BuildEvent = event.try_into().map_err(std::io::Error::other)?; + yield event; + } + }; + + Box::pin(stream) } } diff --git a/snix/build/src/buildservice/mod.rs b/snix/build/src/buildservice/mod.rs index dfb480076..9b5132b06 100644 --- a/snix/build/src/buildservice/mod.rs +++ b/snix/build/src/buildservice/mod.rs @@ -1,4 +1,4 @@ -use tonic::async_trait; +use futures::stream::BoxStream; pub mod build_request; pub use crate::buildservice::build_request::*; @@ -12,8 +12,19 @@ mod oci; pub use dummy::DummyBuildService; pub use from_addr::from_addr; -#[async_trait] +/// A stream of build events. +pub type BuildEventStream = BoxStream<'static, Result>; + +/// Service for executing builds. pub trait BuildService: Send + Sync { - /// TODO: document - async fn do_build(&self, request: BuildRequest) -> std::io::Result; + /// Execute a build and return a stream of events. + /// + /// The stream will emit events as the build progresses: + /// - `BuildStarted` at the beginning + /// - `Log` events for stdout/stderr output (line by line) + /// - `RefscanResult` events for each output after ingestion + /// - Either `Completed` or `Failed` at the end + /// + /// Dropping the stream signals cancellation - the build will be aborted. + fn do_build(&self, request: BuildRequest) -> BuildEventStream; } diff --git a/snix/build/src/buildservice/oci.rs b/snix/build/src/buildservice/oci.rs index ae004fa1c..9131d23cc 100644 --- a/snix/build/src/buildservice/oci.rs +++ b/snix/build/src/buildservice/oci.rs @@ -1,5 +1,7 @@ +use std::sync::Arc; + use anyhow::Context; -use bstr::BStr; +use bytes::Bytes; use snix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -7,16 +9,19 @@ use snix_castore::{ import::fs::ingest_path, refscan::{ReferencePattern, ReferenceScanner}, }; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; -use tonic::async_trait; -use tracing::{Span, debug, instrument, warn}; +use tracing::{Span, debug, instrument}; use uuid::Uuid; -use crate::buildservice::{BuildOutput, BuildRequest, BuildResult}; +use crate::buildservice::{ + BuildError, BuildEvent, BuildOutput, BuildRequest, BuildResult, BuildStarted, LogOutput, + LogStream, RefscanResultEvent, +}; use crate::oci::{get_host_output_paths, make_bundle, make_spec}; use std::{ffi::OsStr, path::PathBuf, process::Stdio}; -use super::BuildService; +use super::{BuildEventStream, BuildService}; const SANDBOX_SHELL: &str = env!("SNIX_BUILD_SANDBOX_SHELL"); const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable @@ -32,7 +37,7 @@ pub struct OCIBuildService { // semaphore to track number of concurrently running builds. // this is necessary, as otherwise we very quickly run out of open file handles. - concurrent_builds: tokio::sync::Semaphore, + concurrent_builds: Arc, } impl OCIBuildService { @@ -45,137 +50,210 @@ impl OCIBuildService { bundle_root, blob_service, directory_service, - concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS), + concurrent_builds: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS)), } } } -#[async_trait] impl BuildService for OCIBuildService where BS: BlobService + Clone + 'static, DS: DirectoryService + Clone + 'static, { - #[instrument(skip_all, err)] - async fn do_build(&self, request: BuildRequest) -> std::io::Result { - let _permit = self.concurrent_builds.acquire().await.unwrap(); + #[instrument(skip_all)] + fn do_build(&self, request: BuildRequest) -> BuildEventStream { + let bundle_root = self.bundle_root.clone(); + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + let concurrent_builds = self.concurrent_builds.clone(); + + let stream = async_stream::try_stream! { + let _permit = concurrent_builds.acquire().await.unwrap(); + + let bundle_name = Uuid::new_v4(); + let bundle_path = bundle_root.join(bundle_name.to_string()); + + let span = Span::current(); + span.record("bundle_name", bundle_name.to_string()); + + // Yield BuildStarted event + yield BuildEvent::Started(BuildStarted { + build_id: bundle_name.to_string(), + }); + + let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL) + .context("failed to create spec") + .map_err(std::io::Error::other)?; + + let linux = runtime_spec.linux().clone().unwrap(); + + runtime_spec.set_linux(Some(linux)); + + make_bundle(&request, &runtime_spec, &bundle_path) + .context("failed to produce bundle") + .map_err(std::io::Error::other)?; + + // pre-calculate the locations we want to later ingest, in the order of + // the original outputs. + // If we can't find calculate that path, don't start the build in first place. + let host_output_paths = get_host_output_paths(&request, &bundle_path) + .context("failed to calculate host output paths") + .map_err(std::io::Error::other)?; + + // assemble a BTreeMap of Nodes to pass into SnixStoreFs. + let patterns = ReferencePattern::new(request.refscan_needles); + // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount. + let _fuse_daemon = tokio::task::spawn_blocking({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + + let dest = bundle_path.join("inputs"); + + let root_nodes = Box::new(request.inputs); + move || { + let fs = snix_castore::fs::SnixStoreFs::new( + blob_service, + directory_service, + root_nodes, + true, + false, + ); + // mount the filesystem and wait for it to be unmounted. + // FUTUREWORK: make fuse daemon threads configurable? + FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon") + } + }) + .await? + .context("mounting") + .map_err(std::io::Error::other)?; - let bundle_name = Uuid::new_v4(); - let bundle_path = self.bundle_root.join(bundle_name.to_string()); + debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle"); - let span = Span::current(); - span.record("bundle_name", bundle_name.to_string()); + // start the bundle as another process. + let mut child = spawn_bundle(&bundle_path, &bundle_name.to_string())?; - let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL) - .context("failed to create spec") - .map_err(std::io::Error::other)?; + // Take stdout/stderr for streaming + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); - let linux = runtime_spec.linux().clone().unwrap(); + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); - runtime_spec.set_linux(Some(linux)); + let mut stdout_done = false; + let mut stderr_done = false; + let mut io_error: Option = None; - make_bundle(&request, &runtime_spec, &bundle_path) - .context("failed to produce bundle") - .map_err(std::io::Error::other)?; - - // pre-calculate the locations we want to later ingest, in the order of - // the original outputs. - // If we can't find calculate that path, don't start the build in first place. - let host_output_paths = get_host_output_paths(&request, &bundle_path) - .context("failed to calculate host output paths") - .map_err(std::io::Error::other)?; + // Stream logs line-by-line using select + loop { + if stdout_done && stderr_done { + break; + } - // assemble a BTreeMap of Nodes to pass into SnixStoreFs. - let patterns = ReferencePattern::new(request.refscan_needles); - // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount. - let _fuse_daemon = tokio::task::spawn_blocking({ - let blob_service = self.blob_service.clone(); - let directory_service = self.directory_service.clone(); - - let dest = bundle_path.join("inputs"); - - let root_nodes = Box::new(request.inputs); - move || { - let fs = snix_castore::fs::SnixStoreFs::new( - blob_service, - directory_service, - root_nodes, - true, - false, - ); - // mount the filesystem and wait for it to be unmounted. - // FUTUREWORK: make fuse daemon threads configurable? - FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon") + tokio::select! { + result = stdout_reader.next_line(), if !stdout_done => { + match result { + Ok(Some(line)) => { + yield BuildEvent::Log(LogOutput { + stream: LogStream::Stdout, + data: Bytes::from(line + "\n"), + }); + } + Ok(None) => { + stdout_done = true; + } + Err(e) => { + io_error = Some(e); + break; + } + } + } + result = stderr_reader.next_line(), if !stderr_done => { + match result { + Ok(Some(line)) => { + yield BuildEvent::Log(LogOutput { + stream: LogStream::Stderr, + data: Bytes::from(line + "\n"), + }); + } + Ok(None) => { + stderr_done = true; + } + Err(e) => { + io_error = Some(e); + break; + } + } + } + } } - }) - .await? - .context("mounting") - .map_err(std::io::Error::other)?; - - debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle"); - - // start the bundle as another process. - let child = spawn_bundle(bundle_path, &bundle_name.to_string())?; - - // wait for the process to exit - // FUTUREWORK: change the trait to allow reporting progress / logs… - let child_output = child - .wait_with_output() - .await - .context("failed to run process") - .map_err(std::io::Error::other)?; - // Check the exit code - if !child_output.status.success() { - let stdout = BStr::new(&child_output.stdout); - let stderr = BStr::new(&child_output.stderr); + // Check for IO errors during log streaming + if let Some(e) = io_error { + Err(e)?; + } - warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed"); + // Wait for the process to exit + let status = child.wait().await + .context("failed to wait for process") + .map_err(std::io::Error::other)?; + + // Check the exit code + if !status.success() { + let exit_code = status.code(); + yield BuildEvent::Failed(BuildError { + message: "build process exited with non-zero status".to_string(), + exit_code, + }); + return; + } - return Err(std::io::Error::other("nonzero exit code".to_string())); - } + // Ingest build outputs into the castore. + let mut outputs = Vec::with_capacity(host_output_paths.len()); - // Ingest build outputs into the castore. - // We use try_join_all here. No need to spawn new tasks, as this is - // mostly IO bound. - let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map( - |(i, host_output_path)| { + for (i, host_output_path) in host_output_paths.into_iter().enumerate() { let output_path = &request.outputs[i]; - let patterns = patterns.clone(); - async move { - debug!(host.path=?host_output_path, output.path=?output_path, "ingesting path"); - - let scanner = ReferenceScanner::new(patterns); - - Ok::<_, std::io::Error>(BuildOutput { - node: ingest_path( - self.blob_service.clone(), - &self.directory_service, - host_output_path, - Some(&scanner), - ) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unable to ingest output: {e}"), - ) - })?, - - output_needles: scanner - .matches() - .into_iter() - .enumerate() - .filter(|(_, val)| *val) - .map(|(idx, _)| idx as u64) - .collect(), - }) - } - }, - )) - .await?; + debug!(host.path=?host_output_path, output.path=?output_path, "ingesting path"); + + let scanner = ReferenceScanner::new(patterns.clone()); + + let node = ingest_path( + blob_service.clone(), + &directory_service, + host_output_path, + Some(&scanner), + ) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Unable to ingest output: {e}"), + ) + })?; + + let found_needles: Vec = scanner + .matches() + .into_iter() + .enumerate() + .filter(|(_, val)| *val) + .map(|(idx, _)| idx as u64) + .collect(); + + // Yield RefscanResult event + yield BuildEvent::RefscanResult(RefscanResultEvent { + output_index: i, + found_needles: found_needles.clone(), + }); + + outputs.push(BuildOutput { + node, + output_needles: found_needles.into_iter().collect(), + }); + } + + yield BuildEvent::Completed(BuildResult { outputs }); + }; - Ok(BuildResult { outputs }) + Box::pin(stream) } } diff --git a/snix/build/src/proto/grpc_buildservice_wrapper.rs b/snix/build/src/proto/grpc_buildservice_wrapper.rs index abc31f328..1a5f88128 100644 --- a/snix/build/src/proto/grpc_buildservice_wrapper.rs +++ b/snix/build/src/proto/grpc_buildservice_wrapper.rs @@ -1,8 +1,10 @@ use crate::buildservice::BuildService; +use futures::stream::BoxStream; +use futures::StreamExt; use std::ops::Deref; use tonic::async_trait; -use super::{BuildRequest, BuildResponse}; +use super::{BuildEvent, BuildRequest}; /// Implements the gRPC server trait ([crate::proto::build_service_server::BuildService] /// for anything implementing [BuildService]. @@ -23,15 +25,24 @@ impl crate::proto::build_service_server::BuildService for GRPCBuildServic where BUILD: Deref + Send + Sync + 'static, { + type DoBuildStream = BoxStream<'static, Result>; + async fn do_build( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { let request = TryInto::::try_into(request.into_inner()) .map_err(|err| tonic::Status::new(tonic::Code::InvalidArgument, err.to_string()))?; - match self.inner.do_build(request).await { - Ok(resp) => Ok(tonic::Response::new(resp.into())), - Err(e) => Err(tonic::Status::internal(e.to_string())), - } + + let stream = self.inner.do_build(request); + + // Map the stream to convert BuildEvent to proto BuildEvent + let proto_stream = stream.map(|result| { + result + .map(|event| event.into()) + .map_err(|e| tonic::Status::internal(e.to_string())) + }); + + Ok(tonic::Response::new(Box::pin(proto_stream))) } } diff --git a/snix/build/src/proto/mod.rs b/snix/build/src/proto/mod.rs index becdc9b22..aea5d0ccd 100644 --- a/snix/build/src/proto/mod.rs +++ b/snix/build/src/proto/mod.rs @@ -61,19 +61,6 @@ pub enum ValidateBuildRequestError { AdditionalFilesNotSorted, } -/// Errors that occur during the validation of [BuildResult] messages. -#[derive(Debug, thiserror::Error)] -pub enum ValidateBuildResultError { - #[error("request field is unpopulated")] - MissingRequestField, - #[error("request is invalid")] - InvalidBuildRequest(ValidateBuildRequestError), - #[error("output entry {0} missing")] - MissingOutputEntry(usize), - #[error("output entry {0} invalid")] - InvalidOutputEntry(usize), -} - /// Checks a path to be without any '..' components, and clean (no superfluous /// slashes). fn is_clean_path>(p: P) -> bool { @@ -282,50 +269,6 @@ impl TryFrom for crate::buildservice::BuildRequest { } } -impl From for BuildResponse { - fn from(value: BuildResult) -> Self { - Self { - outputs: value - .outputs - .into_iter() - .map(|output| build_response::Output { - output: Some(snix_castore::proto::Entry::from_name_and_node( - "".into(), - output.node, - )), - needles: output.output_needles.into_iter().collect(), - }) - .collect(), - } - } -} - -impl TryFrom for BuildResult { - type Error = ValidateBuildResultError; - - fn try_from(value: BuildResponse) -> Result { - Ok(Self { - outputs: value - .outputs - .into_iter() - .enumerate() - .map(|(i, output)| { - let node = output - .output - .ok_or(ValidateBuildResultError::MissingOutputEntry(i))? - .try_into_anonymous_node() - .map_err(|_| ValidateBuildResultError::InvalidOutputEntry(i))?; - - Ok::<_, ValidateBuildResultError>(crate::buildservice::BuildOutput { - node, - output_needles: BTreeSet::from_iter(output.needles), - }) - }) - .try_collect()?, - }) - } -} - /// Errors that occur during the validation of /// [build_request::BuildConstraints] messages. #[derive(Debug, thiserror::Error)] @@ -418,6 +361,226 @@ impl TryFrom for HashSet for build_event::Event { + fn from(value: crate::buildservice::BuildEvent) -> Self { + use crate::buildservice::BuildEvent as BE; + match value { + BE::Started(started) => build_event::Event::Started(started.into()), + BE::Log(log) => build_event::Event::Log(log.into()), + BE::RefscanResult(result) => build_event::Event::Refscan(result.into()), + BE::Completed(result) => build_event::Event::Completed(result.into()), + BE::Failed(error) => build_event::Event::Failed(error.into()), + } + } +} + +impl From for BuildEvent { + fn from(value: crate::buildservice::BuildEvent) -> Self { + Self { + event: Some(value.into()), + } + } +} + +impl TryFrom for crate::buildservice::BuildEvent { + type Error = ValidateBuildEventError; + + fn try_from(value: BuildEvent) -> Result { + let event = value.event.ok_or(ValidateBuildEventError::MissingEventField)?; + event.try_into() + } +} + +impl TryFrom for crate::buildservice::BuildEvent { + type Error = ValidateBuildEventError; + + fn try_from(value: build_event::Event) -> Result { + use crate::buildservice::BuildEvent as BE; + Ok(match value { + build_event::Event::Started(started) => BE::Started(started.into()), + build_event::Event::Log(log) => BE::Log(log.try_into()?), + build_event::Event::Refscan(result) => BE::RefscanResult(result.into()), + build_event::Event::Completed(completed) => { + BE::Completed(completed.try_into().map_err(ValidateBuildEventError::InvalidBuildCompleted)?) + } + build_event::Event::Failed(failed) => BE::Failed(failed.into()), + }) + } +} + +// === BuildStarted conversions === + +impl From for BuildStarted { + fn from(value: crate::buildservice::BuildStarted) -> Self { + Self { + build_id: value.build_id, + } + } +} + +impl From for crate::buildservice::BuildStarted { + fn from(value: BuildStarted) -> Self { + Self { + build_id: value.build_id, + } + } +} + +// === LogOutput conversions === + +impl From for log_output::Stream { + fn from(value: crate::buildservice::LogStream) -> Self { + use crate::buildservice::LogStream as LS; + match value { + LS::Stdout => log_output::Stream::Stdout, + LS::Stderr => log_output::Stream::Stderr, + } + } +} + +impl TryFrom for crate::buildservice::LogStream { + type Error = ValidateBuildEventError; + + fn try_from(value: log_output::Stream) -> Result { + use crate::buildservice::LogStream as LS; + match value { + log_output::Stream::Unspecified => Err(ValidateBuildEventError::InvalidLogStream), + log_output::Stream::Stdout => Ok(LS::Stdout), + log_output::Stream::Stderr => Ok(LS::Stderr), + } + } +} + +impl From for LogOutput { + fn from(value: crate::buildservice::LogOutput) -> Self { + Self { + stream: log_output::Stream::from(value.stream).into(), + data: value.data, + } + } +} + +impl TryFrom for crate::buildservice::LogOutput { + type Error = ValidateBuildEventError; + + fn try_from(value: LogOutput) -> Result { + let stream = log_output::Stream::try_from(value.stream) + .map_err(|_| ValidateBuildEventError::InvalidLogStream)?; + Ok(Self { + stream: stream.try_into()?, + data: value.data, + }) + } +} + +// === RefscanResultEvent conversions === + +impl From for RefscanResult { + fn from(value: crate::buildservice::RefscanResultEvent) -> Self { + Self { + output_index: value.output_index as u32, + found_needles: value.found_needles, + } + } +} + +impl From for crate::buildservice::RefscanResultEvent { + fn from(value: RefscanResult) -> Self { + Self { + output_index: value.output_index as usize, + found_needles: value.found_needles, + } + } +} + +// === BuildCompleted conversions === + +impl From for BuildCompleted { + fn from(value: BuildResult) -> Self { + Self { + outputs: value + .outputs + .into_iter() + .map(|output| build_completed::Output { + output: Some(snix_castore::proto::Entry::from_name_and_node( + "".into(), + output.node, + )), + needles: output.output_needles.into_iter().collect(), + }) + .collect(), + } + } +} + +impl TryFrom for BuildResult { + type Error = ValidateBuildCompletedError; + + fn try_from(value: BuildCompleted) -> Result { + Ok(Self { + outputs: value + .outputs + .into_iter() + .enumerate() + .map(|(i, output)| { + let node = output + .output + .ok_or(ValidateBuildCompletedError::MissingOutputEntry(i))? + .try_into_anonymous_node() + .map_err(|_| ValidateBuildCompletedError::InvalidOutputEntry(i))?; + + Ok::<_, ValidateBuildCompletedError>(crate::buildservice::BuildOutput { + node, + output_needles: BTreeSet::from_iter(output.needles), + }) + }) + .try_collect()?, + }) + } +} + +// === BuildError conversions === + +impl From for BuildFailed { + fn from(value: crate::buildservice::BuildError) -> Self { + Self { + message: value.message, + exit_code: value.exit_code, + } + } +} + +impl From for crate::buildservice::BuildError { + fn from(value: BuildFailed) -> Self { + Self { + message: value.message, + exit_code: value.exit_code, + } + } +} + #[cfg(test)] // TODO: add testcases for constraints special cases. The default cases in the protos // should result in the constraints not being added. For example min_memory 0 can be omitted. @@ -450,4 +613,177 @@ mod tests { } // TODO: add tests for BuildRequest validation itself + + mod build_event_conversions { + use super::super::*; + use crate::buildservice::{ + BuildError, BuildEvent as BE, BuildOutput, BuildResult, BuildStarted, LogOutput, + LogStream, RefscanResultEvent, + }; + use bytes::Bytes; + use std::collections::BTreeSet; + + #[test] + fn test_build_started_roundtrip() { + let started = BE::Started(BuildStarted { + build_id: "test-build-123".to_string(), + }); + + let proto: BuildEvent = started.clone().into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Started(s) => assert_eq!(s.build_id, "test-build-123"), + _ => panic!("expected Started variant"), + } + } + + #[test] + fn test_log_output_stdout_roundtrip() { + let log = BE::Log(LogOutput { + stream: LogStream::Stdout, + data: Bytes::from("hello stdout\n"), + }); + + let proto: BuildEvent = log.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Log(l) => { + assert!(matches!(l.stream, LogStream::Stdout)); + assert_eq!(l.data, Bytes::from("hello stdout\n")); + } + _ => panic!("expected Log variant"), + } + } + + #[test] + fn test_log_output_stderr_roundtrip() { + let log = BE::Log(LogOutput { + stream: LogStream::Stderr, + data: Bytes::from("error message\n"), + }); + + let proto: BuildEvent = log.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Log(l) => { + assert!(matches!(l.stream, LogStream::Stderr)); + assert_eq!(l.data, Bytes::from("error message\n")); + } + _ => panic!("expected Log variant"), + } + } + + #[test] + fn test_log_output_unspecified_fails() { + let proto = BuildEvent { + event: Some(build_event::Event::Log(super::super::LogOutput { + stream: log_output::Stream::Unspecified.into(), + data: Bytes::from("test"), + })), + }; + + let result: Result = proto.try_into(); + assert!(result.is_err()); + } + + #[test] + fn test_refscan_result_roundtrip() { + let refscan = BE::RefscanResult(RefscanResultEvent { + output_index: 2, + found_needles: vec![0, 3, 5], + }); + + let proto: BuildEvent = refscan.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::RefscanResult(r) => { + assert_eq!(r.output_index, 2); + assert_eq!(r.found_needles, vec![0, 3, 5]); + } + _ => panic!("expected RefscanResult variant"), + } + } + + #[test] + fn test_build_completed_roundtrip() { + let digest = snix_castore::B3Digest::from(&[0u8; 32]); + let result = BuildResult { + outputs: vec![BuildOutput { + node: snix_castore::Node::File { + digest: digest.clone(), + size: 100, + executable: false, + }, + output_needles: BTreeSet::from([1, 2, 3]), + }], + }; + + let proto: BuildCompleted = result.clone().into(); + let back: BuildResult = proto.try_into().expect("conversion should succeed"); + + assert_eq!(back.outputs.len(), 1); + assert_eq!(back.outputs[0].output_needles, BTreeSet::from([1, 2, 3])); + match &back.outputs[0].node { + snix_castore::Node::File { + digest: d, + size, + executable, + } => { + assert_eq!(d, &digest); + assert_eq!(*size, 100); + assert!(!executable); + } + _ => panic!("expected File node"), + } + } + + #[test] + fn test_build_failed_roundtrip() { + let failed = BE::Failed(BuildError { + message: "build failed with error".to_string(), + exit_code: Some(1), + }); + + let proto: BuildEvent = failed.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Failed(f) => { + assert_eq!(f.message, "build failed with error"); + assert_eq!(f.exit_code, Some(1)); + } + _ => panic!("expected Failed variant"), + } + } + + #[test] + fn test_build_failed_no_exit_code() { + let failed = BE::Failed(BuildError { + message: "signal terminated".to_string(), + exit_code: None, + }); + + let proto: BuildEvent = failed.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Failed(f) => { + assert_eq!(f.message, "signal terminated"); + assert_eq!(f.exit_code, None); + } + _ => panic!("expected Failed variant"), + } + } + + #[test] + fn test_missing_event_field_fails() { + let proto = BuildEvent { event: None }; + let result: Result = proto.try_into(); + assert!(result.is_err()); + } + } } diff --git a/snix/glue/src/snix_store_io.rs b/snix/glue/src/snix_store_io.rs index 2ee06e3df..6ae8c2391 100644 --- a/snix/glue/src/snix_store_io.rs +++ b/snix/glue/src/snix_store_io.rs @@ -219,13 +219,35 @@ impl SnixStoreIO { }) .collect(); - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(std::io::Error::other)?; + // create a build and consume the event stream + use futures::StreamExt; + use snix_build::buildservice::BuildEvent; + + let mut stream = self.build_service.as_ref().do_build(build_request); + + let build_result = loop { + match stream.next().await { + Some(Ok(BuildEvent::Completed(result))) => break result, + Some(Ok(BuildEvent::Failed(err))) => { + return Err(std::io::Error::other(format!( + "build failed: {}", + err.message + ))); + } + Some(Ok(_event)) => { + // Log events are currently ignored here + // FUTUREWORK: could be surfaced to the user + } + Some(Err(e)) => { + return Err(std::io::Error::other(e)); + } + None => { + return Err(std::io::Error::other( + "build stream ended without result", + )); + } + } + }; let mut out_path_info: Option = None;