diff --git a/snix/Cargo.lock b/snix/Cargo.lock index 8394e5cd9..e28d041b0 100644 --- a/snix/Cargo.lock +++ b/snix/Cargo.lock @@ -4095,6 +4095,7 @@ name = "snix-build" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "blake3", "bstr", "bytes", diff --git a/snix/build/Cargo.toml b/snix/build/Cargo.toml index 9b5c1d96f..3e99bffed 100644 --- a/snix/build/Cargo.toml +++ b/snix/build/Cargo.toml @@ -21,6 +21,7 @@ mimalloc.workspace = true tonic-reflection = { workspace = true, optional = true } anyhow = "1.0.79" +async-stream = "0.3" blake3 = "1.5.0" bstr = "1.6.0" data-encoding = "2.5.0" diff --git a/snix/build/protos/build.proto b/snix/build/protos/build.proto index 516d776a6..22deed86e 100644 --- a/snix/build/protos/build.proto +++ b/snix/build/protos/build.proto @@ -155,8 +155,47 @@ message BuildRequest { // TODO: allow describing something like "preferLocal", to influence composition? } -// A BuildResponse is (one possible) outcome of executing a [BuildRequest]. -message BuildResponse { +// BuildEvent represents a single event during a build process. +// The stream of events will always end with either BuildCompleted or BuildFailed. +message BuildEvent { + oneof event { + BuildStarted started = 1; + LogOutput log = 2; + RefscanResult refscan = 3; + BuildCompleted completed = 4; + BuildFailed failed = 5; + } +} + +// Emitted at the start of a build. +message BuildStarted { + // A unique identifier for this build. + string build_id = 1; +} + +// A line of log output from the build process. +message LogOutput { + enum Stream { + STREAM_UNSPECIFIED = 0; + STREAM_STDOUT = 1; + STREAM_STDERR = 2; + } + // Which output stream this line came from. + Stream stream = 1; + // The log line data (including newline). + bytes data = 2; +} + +// Reference scanning result for a single output. +message RefscanResult { + // Index of the output in the original BuildRequest.outputs. + uint32 output_index = 1; + // Indexes into [BuildRequest::refscan_needles] found in this output. + repeated uint64 found_needles = 2; +} + +// Emitted when a build completes successfully. +message BuildCompleted { // The outputs that were produced after successfully building. // They are provided in the same order as specified in the [BuildRequest]. repeated Output outputs = 1; @@ -170,8 +209,14 @@ message BuildResponse { // Indexes into the found [BuildRequest::refscan_needles] in this output. repeated uint64 needles = 2; } +} - // TODO: where did this run, how long, logs, … +// Emitted when a build fails. +message BuildFailed { + // Human-readable error message. + string message = 1; + // Exit code of the build process, if available. + optional int32 exit_code = 2; } /// TODO: check remarkable notes on constraints again diff --git a/snix/build/protos/rpc_build.proto b/snix/build/protos/rpc_build.proto index 969e1d51c..9d41f1cf6 100644 --- a/snix/build/protos/rpc_build.proto +++ b/snix/build/protos/rpc_build.proto @@ -10,5 +10,7 @@ import "snix/build/protos/build.proto"; option go_package = "snix.dev/build/proto;buildv1"; service BuildService { - rpc DoBuild(BuildRequest) returns (BuildResponse); + // Execute a build and stream events back. + // Dropping the stream signals cancellation. + rpc DoBuild(BuildRequest) returns (stream BuildEvent); } diff --git a/snix/build/src/buildservice/build_request.rs b/snix/build/src/buildservice/build_request.rs index 2f54c5700..7d45b96c0 100644 --- a/snix/build/src/buildservice/build_request.rs +++ b/snix/build/src/buildservice/build_request.rs @@ -146,3 +146,59 @@ pub struct BuildOutput { /// Indexes into the found [BuildRequest::refscan_needles] in that output. pub output_needles: BTreeSet, } + +/// An event emitted during a build process. +#[derive(Debug, Clone)] +pub enum BuildEvent { + /// Emitted at the start of a build. + Started(BuildStarted), + /// A line of log output from the build process. + Log(LogOutput), + /// Reference scanning result for an output. + RefscanResult(RefscanResultEvent), + /// The build completed successfully. + Completed(BuildResult), + /// The build failed. + Failed(BuildError), +} + +/// Emitted at the start of a build. +#[derive(Debug, Clone)] +pub struct BuildStarted { + /// A unique identifier for this build. + pub build_id: String, +} + +/// Which output stream a log line came from. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogStream { + Stdout, + Stderr, +} + +/// A line of log output from the build process. +#[derive(Debug, Clone)] +pub struct LogOutput { + /// Which output stream this line came from. + pub stream: LogStream, + /// The log line data (including newline). + pub data: Bytes, +} + +/// Reference scanning result for a single output. +#[derive(Debug, Clone)] +pub struct RefscanResultEvent { + /// Index of the output in the original BuildRequest.outputs. + pub output_index: usize, + /// Indexes into [BuildRequest::refscan_needles] found in this output. + pub found_needles: Vec, +} + +/// Describes a build failure. +#[derive(Debug, Clone)] +pub struct BuildError { + /// Human-readable error message. + pub message: String, + /// Exit code of the build process, if available. + pub exit_code: Option, +} diff --git a/snix/build/src/buildservice/dummy.rs b/snix/build/src/buildservice/dummy.rs index 2d3864e7b..9c151c50a 100644 --- a/snix/build/src/buildservice/dummy.rs +++ b/snix/build/src/buildservice/dummy.rs @@ -1,18 +1,66 @@ -use tonic::async_trait; +use futures::stream; use tracing::instrument; -use super::BuildService; -use crate::buildservice::{BuildRequest, BuildResult}; +use super::{BuildEventStream, BuildService}; +use crate::buildservice::BuildRequest; #[derive(Default)] pub struct DummyBuildService {} -#[async_trait] impl BuildService for DummyBuildService { - #[instrument(skip(self), ret, err)] - async fn do_build(&self, _request: BuildRequest) -> std::io::Result { - Err(std::io::Error::other( - "builds are not supported with DummyBuildService", - )) + #[instrument(skip(self))] + fn do_build(&self, _request: BuildRequest) -> BuildEventStream { + Box::pin(stream::once(async { + Err(std::io::Error::other( + "builds are not supported with DummyBuildService", + )) + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use std::collections::BTreeMap; + use std::path::PathBuf; + + fn make_dummy_request() -> BuildRequest { + BuildRequest { + inputs: BTreeMap::new(), + command_args: vec!["echo".to_string(), "hello".to_string()], + working_dir: PathBuf::from("build"), + scratch_paths: vec![PathBuf::from("build")], + inputs_dir: PathBuf::from("nix/store"), + outputs: vec![PathBuf::from("build/out")], + environment_vars: vec![], + constraints: Default::default(), + additional_files: vec![], + refscan_needles: vec![], + } + } + + #[tokio::test] + async fn test_dummy_build_service_returns_error_stream() { + let service = DummyBuildService::default(); + let mut stream = service.do_build(make_dummy_request()); + + // First item should be an error + let first = stream.next().await; + assert!(first.is_some(), "stream should yield at least one item"); + + let result = first.unwrap(); + assert!(result.is_err(), "should be an error"); + + let err = result.unwrap_err(); + assert!( + err.to_string() + .contains("builds are not supported with DummyBuildService"), + "error message should explain builds are not supported" + ); + + // Stream should end after the error + let second = stream.next().await; + assert!(second.is_none(), "stream should end after the error"); } } diff --git a/snix/build/src/buildservice/grpc.rs b/snix/build/src/buildservice/grpc.rs index ca735866e..3b656b700 100644 --- a/snix/build/src/buildservice/grpc.rs +++ b/snix/build/src/buildservice/grpc.rs @@ -1,9 +1,9 @@ -use tonic::{async_trait, transport::Channel}; +use tonic::transport::Channel; -use crate::buildservice::BuildRequest; +use crate::buildservice::{BuildEvent, BuildRequest}; use crate::proto::{self, build_service_client::BuildServiceClient}; -use super::{BuildResult, BuildService}; +use super::{BuildEventStream, BuildService}; pub struct GRPCBuildService { client: BuildServiceClient, @@ -16,16 +16,25 @@ impl GRPCBuildService { } } -#[async_trait] impl BuildService for GRPCBuildService { - async fn do_build(&self, request: BuildRequest) -> std::io::Result { + fn do_build(&self, request: BuildRequest) -> BuildEventStream { let mut client = self.client.clone(); - let resp = client - .do_build(Into::::into(request)) - .await - .map_err(std::io::Error::other)? - .into_inner(); + let proto_request: proto::BuildRequest = request.into(); - Ok::(resp.try_into().map_err(std::io::Error::other)?) + let stream = async_stream::try_stream! { + let response = client + .do_build(proto_request) + .await + .map_err(std::io::Error::other)?; + + let mut stream = response.into_inner(); + + while let Some(event) = stream.message().await.map_err(std::io::Error::other)? { + let event: BuildEvent = event.try_into().map_err(std::io::Error::other)?; + yield event; + } + }; + + Box::pin(stream) } } diff --git a/snix/build/src/buildservice/mod.rs b/snix/build/src/buildservice/mod.rs index dfb480076..9b5132b06 100644 --- a/snix/build/src/buildservice/mod.rs +++ b/snix/build/src/buildservice/mod.rs @@ -1,4 +1,4 @@ -use tonic::async_trait; +use futures::stream::BoxStream; pub mod build_request; pub use crate::buildservice::build_request::*; @@ -12,8 +12,19 @@ mod oci; pub use dummy::DummyBuildService; pub use from_addr::from_addr; -#[async_trait] +/// A stream of build events. +pub type BuildEventStream = BoxStream<'static, Result>; + +/// Service for executing builds. pub trait BuildService: Send + Sync { - /// TODO: document - async fn do_build(&self, request: BuildRequest) -> std::io::Result; + /// Execute a build and return a stream of events. + /// + /// The stream will emit events as the build progresses: + /// - `BuildStarted` at the beginning + /// - `Log` events for stdout/stderr output (line by line) + /// - `RefscanResult` events for each output after ingestion + /// - Either `Completed` or `Failed` at the end + /// + /// Dropping the stream signals cancellation - the build will be aborted. + fn do_build(&self, request: BuildRequest) -> BuildEventStream; } diff --git a/snix/build/src/buildservice/oci.rs b/snix/build/src/buildservice/oci.rs index ae004fa1c..9131d23cc 100644 --- a/snix/build/src/buildservice/oci.rs +++ b/snix/build/src/buildservice/oci.rs @@ -1,5 +1,7 @@ +use std::sync::Arc; + use anyhow::Context; -use bstr::BStr; +use bytes::Bytes; use snix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -7,16 +9,19 @@ use snix_castore::{ import::fs::ingest_path, refscan::{ReferencePattern, ReferenceScanner}, }; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; -use tonic::async_trait; -use tracing::{Span, debug, instrument, warn}; +use tracing::{Span, debug, instrument}; use uuid::Uuid; -use crate::buildservice::{BuildOutput, BuildRequest, BuildResult}; +use crate::buildservice::{ + BuildError, BuildEvent, BuildOutput, BuildRequest, BuildResult, BuildStarted, LogOutput, + LogStream, RefscanResultEvent, +}; use crate::oci::{get_host_output_paths, make_bundle, make_spec}; use std::{ffi::OsStr, path::PathBuf, process::Stdio}; -use super::BuildService; +use super::{BuildEventStream, BuildService}; const SANDBOX_SHELL: &str = env!("SNIX_BUILD_SANDBOX_SHELL"); const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable @@ -32,7 +37,7 @@ pub struct OCIBuildService { // semaphore to track number of concurrently running builds. // this is necessary, as otherwise we very quickly run out of open file handles. - concurrent_builds: tokio::sync::Semaphore, + concurrent_builds: Arc, } impl OCIBuildService { @@ -45,137 +50,210 @@ impl OCIBuildService { bundle_root, blob_service, directory_service, - concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS), + concurrent_builds: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS)), } } } -#[async_trait] impl BuildService for OCIBuildService where BS: BlobService + Clone + 'static, DS: DirectoryService + Clone + 'static, { - #[instrument(skip_all, err)] - async fn do_build(&self, request: BuildRequest) -> std::io::Result { - let _permit = self.concurrent_builds.acquire().await.unwrap(); + #[instrument(skip_all)] + fn do_build(&self, request: BuildRequest) -> BuildEventStream { + let bundle_root = self.bundle_root.clone(); + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + let concurrent_builds = self.concurrent_builds.clone(); + + let stream = async_stream::try_stream! { + let _permit = concurrent_builds.acquire().await.unwrap(); + + let bundle_name = Uuid::new_v4(); + let bundle_path = bundle_root.join(bundle_name.to_string()); + + let span = Span::current(); + span.record("bundle_name", bundle_name.to_string()); + + // Yield BuildStarted event + yield BuildEvent::Started(BuildStarted { + build_id: bundle_name.to_string(), + }); + + let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL) + .context("failed to create spec") + .map_err(std::io::Error::other)?; + + let linux = runtime_spec.linux().clone().unwrap(); + + runtime_spec.set_linux(Some(linux)); + + make_bundle(&request, &runtime_spec, &bundle_path) + .context("failed to produce bundle") + .map_err(std::io::Error::other)?; + + // pre-calculate the locations we want to later ingest, in the order of + // the original outputs. + // If we can't find calculate that path, don't start the build in first place. + let host_output_paths = get_host_output_paths(&request, &bundle_path) + .context("failed to calculate host output paths") + .map_err(std::io::Error::other)?; + + // assemble a BTreeMap of Nodes to pass into SnixStoreFs. + let patterns = ReferencePattern::new(request.refscan_needles); + // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount. + let _fuse_daemon = tokio::task::spawn_blocking({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + + let dest = bundle_path.join("inputs"); + + let root_nodes = Box::new(request.inputs); + move || { + let fs = snix_castore::fs::SnixStoreFs::new( + blob_service, + directory_service, + root_nodes, + true, + false, + ); + // mount the filesystem and wait for it to be unmounted. + // FUTUREWORK: make fuse daemon threads configurable? + FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon") + } + }) + .await? + .context("mounting") + .map_err(std::io::Error::other)?; - let bundle_name = Uuid::new_v4(); - let bundle_path = self.bundle_root.join(bundle_name.to_string()); + debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle"); - let span = Span::current(); - span.record("bundle_name", bundle_name.to_string()); + // start the bundle as another process. + let mut child = spawn_bundle(&bundle_path, &bundle_name.to_string())?; - let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL) - .context("failed to create spec") - .map_err(std::io::Error::other)?; + // Take stdout/stderr for streaming + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); - let linux = runtime_spec.linux().clone().unwrap(); + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); - runtime_spec.set_linux(Some(linux)); + let mut stdout_done = false; + let mut stderr_done = false; + let mut io_error: Option = None; - make_bundle(&request, &runtime_spec, &bundle_path) - .context("failed to produce bundle") - .map_err(std::io::Error::other)?; - - // pre-calculate the locations we want to later ingest, in the order of - // the original outputs. - // If we can't find calculate that path, don't start the build in first place. - let host_output_paths = get_host_output_paths(&request, &bundle_path) - .context("failed to calculate host output paths") - .map_err(std::io::Error::other)?; + // Stream logs line-by-line using select + loop { + if stdout_done && stderr_done { + break; + } - // assemble a BTreeMap of Nodes to pass into SnixStoreFs. - let patterns = ReferencePattern::new(request.refscan_needles); - // NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount. - let _fuse_daemon = tokio::task::spawn_blocking({ - let blob_service = self.blob_service.clone(); - let directory_service = self.directory_service.clone(); - - let dest = bundle_path.join("inputs"); - - let root_nodes = Box::new(request.inputs); - move || { - let fs = snix_castore::fs::SnixStoreFs::new( - blob_service, - directory_service, - root_nodes, - true, - false, - ); - // mount the filesystem and wait for it to be unmounted. - // FUTUREWORK: make fuse daemon threads configurable? - FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon") + tokio::select! { + result = stdout_reader.next_line(), if !stdout_done => { + match result { + Ok(Some(line)) => { + yield BuildEvent::Log(LogOutput { + stream: LogStream::Stdout, + data: Bytes::from(line + "\n"), + }); + } + Ok(None) => { + stdout_done = true; + } + Err(e) => { + io_error = Some(e); + break; + } + } + } + result = stderr_reader.next_line(), if !stderr_done => { + match result { + Ok(Some(line)) => { + yield BuildEvent::Log(LogOutput { + stream: LogStream::Stderr, + data: Bytes::from(line + "\n"), + }); + } + Ok(None) => { + stderr_done = true; + } + Err(e) => { + io_error = Some(e); + break; + } + } + } + } } - }) - .await? - .context("mounting") - .map_err(std::io::Error::other)?; - - debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle"); - - // start the bundle as another process. - let child = spawn_bundle(bundle_path, &bundle_name.to_string())?; - - // wait for the process to exit - // FUTUREWORK: change the trait to allow reporting progress / logs… - let child_output = child - .wait_with_output() - .await - .context("failed to run process") - .map_err(std::io::Error::other)?; - // Check the exit code - if !child_output.status.success() { - let stdout = BStr::new(&child_output.stdout); - let stderr = BStr::new(&child_output.stderr); + // Check for IO errors during log streaming + if let Some(e) = io_error { + Err(e)?; + } - warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed"); + // Wait for the process to exit + let status = child.wait().await + .context("failed to wait for process") + .map_err(std::io::Error::other)?; + + // Check the exit code + if !status.success() { + let exit_code = status.code(); + yield BuildEvent::Failed(BuildError { + message: "build process exited with non-zero status".to_string(), + exit_code, + }); + return; + } - return Err(std::io::Error::other("nonzero exit code".to_string())); - } + // Ingest build outputs into the castore. + let mut outputs = Vec::with_capacity(host_output_paths.len()); - // Ingest build outputs into the castore. - // We use try_join_all here. No need to spawn new tasks, as this is - // mostly IO bound. - let outputs = futures::future::try_join_all(host_output_paths.into_iter().enumerate().map( - |(i, host_output_path)| { + for (i, host_output_path) in host_output_paths.into_iter().enumerate() { let output_path = &request.outputs[i]; - let patterns = patterns.clone(); - async move { - debug!(host.path=?host_output_path, output.path=?output_path, "ingesting path"); - - let scanner = ReferenceScanner::new(patterns); - - Ok::<_, std::io::Error>(BuildOutput { - node: ingest_path( - self.blob_service.clone(), - &self.directory_service, - host_output_path, - Some(&scanner), - ) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Unable to ingest output: {e}"), - ) - })?, - - output_needles: scanner - .matches() - .into_iter() - .enumerate() - .filter(|(_, val)| *val) - .map(|(idx, _)| idx as u64) - .collect(), - }) - } - }, - )) - .await?; + debug!(host.path=?host_output_path, output.path=?output_path, "ingesting path"); + + let scanner = ReferenceScanner::new(patterns.clone()); + + let node = ingest_path( + blob_service.clone(), + &directory_service, + host_output_path, + Some(&scanner), + ) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Unable to ingest output: {e}"), + ) + })?; + + let found_needles: Vec = scanner + .matches() + .into_iter() + .enumerate() + .filter(|(_, val)| *val) + .map(|(idx, _)| idx as u64) + .collect(); + + // Yield RefscanResult event + yield BuildEvent::RefscanResult(RefscanResultEvent { + output_index: i, + found_needles: found_needles.clone(), + }); + + outputs.push(BuildOutput { + node, + output_needles: found_needles.into_iter().collect(), + }); + } + + yield BuildEvent::Completed(BuildResult { outputs }); + }; - Ok(BuildResult { outputs }) + Box::pin(stream) } } diff --git a/snix/build/src/proto/grpc_buildservice_wrapper.rs b/snix/build/src/proto/grpc_buildservice_wrapper.rs index abc31f328..1a5f88128 100644 --- a/snix/build/src/proto/grpc_buildservice_wrapper.rs +++ b/snix/build/src/proto/grpc_buildservice_wrapper.rs @@ -1,8 +1,10 @@ use crate::buildservice::BuildService; +use futures::stream::BoxStream; +use futures::StreamExt; use std::ops::Deref; use tonic::async_trait; -use super::{BuildRequest, BuildResponse}; +use super::{BuildEvent, BuildRequest}; /// Implements the gRPC server trait ([crate::proto::build_service_server::BuildService] /// for anything implementing [BuildService]. @@ -23,15 +25,24 @@ impl crate::proto::build_service_server::BuildService for GRPCBuildServic where BUILD: Deref + Send + Sync + 'static, { + type DoBuildStream = BoxStream<'static, Result>; + async fn do_build( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { let request = TryInto::::try_into(request.into_inner()) .map_err(|err| tonic::Status::new(tonic::Code::InvalidArgument, err.to_string()))?; - match self.inner.do_build(request).await { - Ok(resp) => Ok(tonic::Response::new(resp.into())), - Err(e) => Err(tonic::Status::internal(e.to_string())), - } + + let stream = self.inner.do_build(request); + + // Map the stream to convert BuildEvent to proto BuildEvent + let proto_stream = stream.map(|result| { + result + .map(|event| event.into()) + .map_err(|e| tonic::Status::internal(e.to_string())) + }); + + Ok(tonic::Response::new(Box::pin(proto_stream))) } } diff --git a/snix/build/src/proto/mod.rs b/snix/build/src/proto/mod.rs index becdc9b22..aea5d0ccd 100644 --- a/snix/build/src/proto/mod.rs +++ b/snix/build/src/proto/mod.rs @@ -61,19 +61,6 @@ pub enum ValidateBuildRequestError { AdditionalFilesNotSorted, } -/// Errors that occur during the validation of [BuildResult] messages. -#[derive(Debug, thiserror::Error)] -pub enum ValidateBuildResultError { - #[error("request field is unpopulated")] - MissingRequestField, - #[error("request is invalid")] - InvalidBuildRequest(ValidateBuildRequestError), - #[error("output entry {0} missing")] - MissingOutputEntry(usize), - #[error("output entry {0} invalid")] - InvalidOutputEntry(usize), -} - /// Checks a path to be without any '..' components, and clean (no superfluous /// slashes). fn is_clean_path>(p: P) -> bool { @@ -282,50 +269,6 @@ impl TryFrom for crate::buildservice::BuildRequest { } } -impl From for BuildResponse { - fn from(value: BuildResult) -> Self { - Self { - outputs: value - .outputs - .into_iter() - .map(|output| build_response::Output { - output: Some(snix_castore::proto::Entry::from_name_and_node( - "".into(), - output.node, - )), - needles: output.output_needles.into_iter().collect(), - }) - .collect(), - } - } -} - -impl TryFrom for BuildResult { - type Error = ValidateBuildResultError; - - fn try_from(value: BuildResponse) -> Result { - Ok(Self { - outputs: value - .outputs - .into_iter() - .enumerate() - .map(|(i, output)| { - let node = output - .output - .ok_or(ValidateBuildResultError::MissingOutputEntry(i))? - .try_into_anonymous_node() - .map_err(|_| ValidateBuildResultError::InvalidOutputEntry(i))?; - - Ok::<_, ValidateBuildResultError>(crate::buildservice::BuildOutput { - node, - output_needles: BTreeSet::from_iter(output.needles), - }) - }) - .try_collect()?, - }) - } -} - /// Errors that occur during the validation of /// [build_request::BuildConstraints] messages. #[derive(Debug, thiserror::Error)] @@ -418,6 +361,226 @@ impl TryFrom for HashSet for build_event::Event { + fn from(value: crate::buildservice::BuildEvent) -> Self { + use crate::buildservice::BuildEvent as BE; + match value { + BE::Started(started) => build_event::Event::Started(started.into()), + BE::Log(log) => build_event::Event::Log(log.into()), + BE::RefscanResult(result) => build_event::Event::Refscan(result.into()), + BE::Completed(result) => build_event::Event::Completed(result.into()), + BE::Failed(error) => build_event::Event::Failed(error.into()), + } + } +} + +impl From for BuildEvent { + fn from(value: crate::buildservice::BuildEvent) -> Self { + Self { + event: Some(value.into()), + } + } +} + +impl TryFrom for crate::buildservice::BuildEvent { + type Error = ValidateBuildEventError; + + fn try_from(value: BuildEvent) -> Result { + let event = value.event.ok_or(ValidateBuildEventError::MissingEventField)?; + event.try_into() + } +} + +impl TryFrom for crate::buildservice::BuildEvent { + type Error = ValidateBuildEventError; + + fn try_from(value: build_event::Event) -> Result { + use crate::buildservice::BuildEvent as BE; + Ok(match value { + build_event::Event::Started(started) => BE::Started(started.into()), + build_event::Event::Log(log) => BE::Log(log.try_into()?), + build_event::Event::Refscan(result) => BE::RefscanResult(result.into()), + build_event::Event::Completed(completed) => { + BE::Completed(completed.try_into().map_err(ValidateBuildEventError::InvalidBuildCompleted)?) + } + build_event::Event::Failed(failed) => BE::Failed(failed.into()), + }) + } +} + +// === BuildStarted conversions === + +impl From for BuildStarted { + fn from(value: crate::buildservice::BuildStarted) -> Self { + Self { + build_id: value.build_id, + } + } +} + +impl From for crate::buildservice::BuildStarted { + fn from(value: BuildStarted) -> Self { + Self { + build_id: value.build_id, + } + } +} + +// === LogOutput conversions === + +impl From for log_output::Stream { + fn from(value: crate::buildservice::LogStream) -> Self { + use crate::buildservice::LogStream as LS; + match value { + LS::Stdout => log_output::Stream::Stdout, + LS::Stderr => log_output::Stream::Stderr, + } + } +} + +impl TryFrom for crate::buildservice::LogStream { + type Error = ValidateBuildEventError; + + fn try_from(value: log_output::Stream) -> Result { + use crate::buildservice::LogStream as LS; + match value { + log_output::Stream::Unspecified => Err(ValidateBuildEventError::InvalidLogStream), + log_output::Stream::Stdout => Ok(LS::Stdout), + log_output::Stream::Stderr => Ok(LS::Stderr), + } + } +} + +impl From for LogOutput { + fn from(value: crate::buildservice::LogOutput) -> Self { + Self { + stream: log_output::Stream::from(value.stream).into(), + data: value.data, + } + } +} + +impl TryFrom for crate::buildservice::LogOutput { + type Error = ValidateBuildEventError; + + fn try_from(value: LogOutput) -> Result { + let stream = log_output::Stream::try_from(value.stream) + .map_err(|_| ValidateBuildEventError::InvalidLogStream)?; + Ok(Self { + stream: stream.try_into()?, + data: value.data, + }) + } +} + +// === RefscanResultEvent conversions === + +impl From for RefscanResult { + fn from(value: crate::buildservice::RefscanResultEvent) -> Self { + Self { + output_index: value.output_index as u32, + found_needles: value.found_needles, + } + } +} + +impl From for crate::buildservice::RefscanResultEvent { + fn from(value: RefscanResult) -> Self { + Self { + output_index: value.output_index as usize, + found_needles: value.found_needles, + } + } +} + +// === BuildCompleted conversions === + +impl From for BuildCompleted { + fn from(value: BuildResult) -> Self { + Self { + outputs: value + .outputs + .into_iter() + .map(|output| build_completed::Output { + output: Some(snix_castore::proto::Entry::from_name_and_node( + "".into(), + output.node, + )), + needles: output.output_needles.into_iter().collect(), + }) + .collect(), + } + } +} + +impl TryFrom for BuildResult { + type Error = ValidateBuildCompletedError; + + fn try_from(value: BuildCompleted) -> Result { + Ok(Self { + outputs: value + .outputs + .into_iter() + .enumerate() + .map(|(i, output)| { + let node = output + .output + .ok_or(ValidateBuildCompletedError::MissingOutputEntry(i))? + .try_into_anonymous_node() + .map_err(|_| ValidateBuildCompletedError::InvalidOutputEntry(i))?; + + Ok::<_, ValidateBuildCompletedError>(crate::buildservice::BuildOutput { + node, + output_needles: BTreeSet::from_iter(output.needles), + }) + }) + .try_collect()?, + }) + } +} + +// === BuildError conversions === + +impl From for BuildFailed { + fn from(value: crate::buildservice::BuildError) -> Self { + Self { + message: value.message, + exit_code: value.exit_code, + } + } +} + +impl From for crate::buildservice::BuildError { + fn from(value: BuildFailed) -> Self { + Self { + message: value.message, + exit_code: value.exit_code, + } + } +} + #[cfg(test)] // TODO: add testcases for constraints special cases. The default cases in the protos // should result in the constraints not being added. For example min_memory 0 can be omitted. @@ -450,4 +613,177 @@ mod tests { } // TODO: add tests for BuildRequest validation itself + + mod build_event_conversions { + use super::super::*; + use crate::buildservice::{ + BuildError, BuildEvent as BE, BuildOutput, BuildResult, BuildStarted, LogOutput, + LogStream, RefscanResultEvent, + }; + use bytes::Bytes; + use std::collections::BTreeSet; + + #[test] + fn test_build_started_roundtrip() { + let started = BE::Started(BuildStarted { + build_id: "test-build-123".to_string(), + }); + + let proto: BuildEvent = started.clone().into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Started(s) => assert_eq!(s.build_id, "test-build-123"), + _ => panic!("expected Started variant"), + } + } + + #[test] + fn test_log_output_stdout_roundtrip() { + let log = BE::Log(LogOutput { + stream: LogStream::Stdout, + data: Bytes::from("hello stdout\n"), + }); + + let proto: BuildEvent = log.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Log(l) => { + assert!(matches!(l.stream, LogStream::Stdout)); + assert_eq!(l.data, Bytes::from("hello stdout\n")); + } + _ => panic!("expected Log variant"), + } + } + + #[test] + fn test_log_output_stderr_roundtrip() { + let log = BE::Log(LogOutput { + stream: LogStream::Stderr, + data: Bytes::from("error message\n"), + }); + + let proto: BuildEvent = log.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Log(l) => { + assert!(matches!(l.stream, LogStream::Stderr)); + assert_eq!(l.data, Bytes::from("error message\n")); + } + _ => panic!("expected Log variant"), + } + } + + #[test] + fn test_log_output_unspecified_fails() { + let proto = BuildEvent { + event: Some(build_event::Event::Log(super::super::LogOutput { + stream: log_output::Stream::Unspecified.into(), + data: Bytes::from("test"), + })), + }; + + let result: Result = proto.try_into(); + assert!(result.is_err()); + } + + #[test] + fn test_refscan_result_roundtrip() { + let refscan = BE::RefscanResult(RefscanResultEvent { + output_index: 2, + found_needles: vec![0, 3, 5], + }); + + let proto: BuildEvent = refscan.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::RefscanResult(r) => { + assert_eq!(r.output_index, 2); + assert_eq!(r.found_needles, vec![0, 3, 5]); + } + _ => panic!("expected RefscanResult variant"), + } + } + + #[test] + fn test_build_completed_roundtrip() { + let digest = snix_castore::B3Digest::from(&[0u8; 32]); + let result = BuildResult { + outputs: vec![BuildOutput { + node: snix_castore::Node::File { + digest: digest.clone(), + size: 100, + executable: false, + }, + output_needles: BTreeSet::from([1, 2, 3]), + }], + }; + + let proto: BuildCompleted = result.clone().into(); + let back: BuildResult = proto.try_into().expect("conversion should succeed"); + + assert_eq!(back.outputs.len(), 1); + assert_eq!(back.outputs[0].output_needles, BTreeSet::from([1, 2, 3])); + match &back.outputs[0].node { + snix_castore::Node::File { + digest: d, + size, + executable, + } => { + assert_eq!(d, &digest); + assert_eq!(*size, 100); + assert!(!executable); + } + _ => panic!("expected File node"), + } + } + + #[test] + fn test_build_failed_roundtrip() { + let failed = BE::Failed(BuildError { + message: "build failed with error".to_string(), + exit_code: Some(1), + }); + + let proto: BuildEvent = failed.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Failed(f) => { + assert_eq!(f.message, "build failed with error"); + assert_eq!(f.exit_code, Some(1)); + } + _ => panic!("expected Failed variant"), + } + } + + #[test] + fn test_build_failed_no_exit_code() { + let failed = BE::Failed(BuildError { + message: "signal terminated".to_string(), + exit_code: None, + }); + + let proto: BuildEvent = failed.into(); + let back: BE = proto.try_into().expect("conversion should succeed"); + + match back { + BE::Failed(f) => { + assert_eq!(f.message, "signal terminated"); + assert_eq!(f.exit_code, None); + } + _ => panic!("expected Failed variant"), + } + } + + #[test] + fn test_missing_event_field_fails() { + let proto = BuildEvent { event: None }; + let result: Result = proto.try_into(); + assert!(result.is_err()); + } + } } diff --git a/snix/glue/src/snix_store_io.rs b/snix/glue/src/snix_store_io.rs index 2ee06e3df..6ae8c2391 100644 --- a/snix/glue/src/snix_store_io.rs +++ b/snix/glue/src/snix_store_io.rs @@ -219,13 +219,35 @@ impl SnixStoreIO { }) .collect(); - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(std::io::Error::other)?; + // create a build and consume the event stream + use futures::StreamExt; + use snix_build::buildservice::BuildEvent; + + let mut stream = self.build_service.as_ref().do_build(build_request); + + let build_result = loop { + match stream.next().await { + Some(Ok(BuildEvent::Completed(result))) => break result, + Some(Ok(BuildEvent::Failed(err))) => { + return Err(std::io::Error::other(format!( + "build failed: {}", + err.message + ))); + } + Some(Ok(_event)) => { + // Log events are currently ignored here + // FUTUREWORK: could be surfaced to the user + } + Some(Err(e)) => { + return Err(std::io::Error::other(e)); + } + None => { + return Err(std::io::Error::other( + "build stream ended without result", + )); + } + } + }; let mut out_path_info: Option = None;