From f918257b18df272af321868757b0955796ed2044 Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 21:16:05 -0500 Subject: [PATCH 1/6] gix,gix-odb,gix-pack: stream blob contents from the ODB Add streaming blob lookup support for loose and packed objects, avoid eagerly materializing packed results into memory, and expose repository helpers with regression coverage for loose, packed, empty, and wrong-type cases. Co-authored-by: Claude Co-authored-by: Codex --- gix-odb/src/cache.rs | 16 ++ gix-odb/src/find.rs | 70 +++++ gix-odb/src/memory.rs | 20 ++ gix-odb/src/store_impls/dynamic/find.rs | 2 +- gix-odb/src/store_impls/dynamic/mod.rs | 1 + gix-odb/src/store_impls/dynamic/stream.rs | 230 ++++++++++++++++ gix-odb/src/store_impls/loose/find.rs | 198 +++++++++++++- gix-pack/src/cache/delta/traverse/resolve.rs | 10 +- gix-pack/src/data/delta.rs | 11 +- gix-pack/src/data/file/decode/entry.rs | 259 ++++++++++++++++++- gix-pack/src/data/file/decode/mod.rs | 3 + gix/src/repository/mod.rs | 32 +++ gix/src/repository/object.rs | 56 ++++ gix/tests/gix/repository/object.rs | 87 +++++++ 14 files changed, 980 insertions(+), 15 deletions(-) create mode 100644 gix-odb/src/store_impls/dynamic/stream.rs diff --git a/gix-odb/src/cache.rs b/gix-odb/src/cache.rs index 77022a0966..ebe1bb9785 100644 --- a/gix-odb/src/cache.rs +++ b/gix-odb/src/cache.rs @@ -91,6 +91,22 @@ impl Cache { } } +impl Cache> +where + S: Deref + Clone, +{ + /// Find an object and return its decoded bytes as a stream. + pub fn try_find_stream( + &self, + id: &gix_hash::oid, + ) -> Result)>, gix_object::find::Error> { + match self.pack_cache.as_ref().map(RefCell::borrow_mut) { + Some(mut pack_cache) => self.inner.try_find_stream(id, pack_cache.deref_mut()), + None => self.inner.try_find_stream(id, &mut gix_pack::cache::Never), + } + } +} + impl From for Cache where S: gix_pack::Find, diff --git a/gix-odb/src/find.rs b/gix-odb/src/find.rs index 1ee1fa8edd..14bae35d31 100644 --- a/gix-odb/src/find.rs +++ b/gix-odb/src/find.rs @@ -1,3 +1,73 @@ +use std::io::{self, Cursor, Read}; + +/// A streaming view over an object's decoded bytes. +pub struct Stream { + kind: gix_object::Kind, + size: u64, + inner: StreamInner, +} + +enum StreamInner { + InMemory(Cursor>), + File(std::fs::File), + Loose(crate::store_impls::loose::find::StreamReader), +} + +impl Stream { + /// Return the kind of the object yielded by this stream. + pub fn kind(&self) -> gix_object::Kind { + self.kind + } + + /// Return the decoded object size in bytes. + pub fn size(&self) -> u64 { + self.size + } + + /// Return an empty blob stream. + pub fn empty_blob() -> Self { + Self::from_bytes(gix_object::Kind::Blob, Vec::new()) + } + + pub(crate) fn from_bytes(kind: gix_object::Kind, data: Vec) -> Self { + Self { + kind, + size: data.len() as u64, + inner: StreamInner::InMemory(Cursor::new(data)), + } + } + + pub(crate) fn from_file(kind: gix_object::Kind, size: u64, file: std::fs::File) -> Self { + Self { + kind, + size, + inner: StreamInner::File(file), + } + } + + pub(crate) fn from_loose( + kind: gix_object::Kind, + size: u64, + reader: crate::store_impls::loose::find::StreamReader, + ) -> Self { + Self { + kind, + size, + inner: StreamInner::Loose(reader), + } + } +} + +impl Read for Stream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match &mut self.inner { + StreamInner::InMemory(cursor) => cursor.read(buf), + StreamInner::File(file) => file.read(buf), + StreamInner::Loose(reader) => reader.read(buf), + } + } +} + /// An object header informing about object properties, without it being fully decoded in the process. #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum Header { diff --git a/gix-odb/src/memory.rs b/gix-odb/src/memory.rs index 6862437f1f..31a6e33c24 100644 --- a/gix-odb/src/memory.rs +++ b/gix-odb/src/memory.rs @@ -123,6 +123,26 @@ impl Proxy { } } +impl Proxy>> +where + S: Deref + Clone, +{ + /// Find an object and return its decoded bytes as a stream. + pub fn try_find_stream( + &self, + id: &gix_hash::oid, + ) -> Result)>, gix_object::find::Error> + { + if let Some(map) = self.memory.as_ref() { + let map = map.borrow(); + if let Some((kind, data)) = map.get(id) { + return Ok(Some((crate::find::Stream::from_bytes(*kind, data.clone()), None))); + } + } + self.inner.try_find_stream(id) + } +} + impl Clone for Proxy where T: Clone, diff --git a/gix-odb/src/store_impls/dynamic/find.rs b/gix-odb/src/store_impls/dynamic/find.rs index 13608b7c87..78de22491e 100644 --- a/gix-odb/src/store_impls/dynamic/find.rs +++ b/gix-odb/src/store_impls/dynamic/find.rs @@ -85,7 +85,7 @@ impl super::Handle where S: Deref + Clone, { - fn try_find_cached_inner<'a, 'b>( + pub(crate) fn try_find_cached_inner<'a, 'b>( &'b self, mut id: &'b gix_hash::oid, buffer: &'a mut Vec, diff --git a/gix-odb/src/store_impls/dynamic/mod.rs b/gix-odb/src/store_impls/dynamic/mod.rs index 7ea462b24a..a7e6fc5140 100644 --- a/gix-odb/src/store_impls/dynamic/mod.rs +++ b/gix-odb/src/store_impls/dynamic/mod.rs @@ -57,6 +57,7 @@ pub mod find; pub mod prefix; mod header; +mod stream; /// pub mod iter; diff --git a/gix-odb/src/store_impls/dynamic/stream.rs b/gix-odb/src/store_impls/dynamic/stream.rs new file mode 100644 index 0000000000..43e1376134 --- /dev/null +++ b/gix-odb/src/store_impls/dynamic/stream.rs @@ -0,0 +1,230 @@ +use std::{io::Seek, ops::Deref}; + +use gix_features::zlib; +use gix_hash::oid; +use gix_pack::cache::DecodeEntry; + +use super::find::Error; +use crate::store::{find::error::DeltaBaseRecursion, handle, load_index}; + +impl super::Handle +where + S: Deref + Clone, +{ + pub(crate) fn try_find_stream_inner<'b>( + &'b self, + mut id: &'b gix_hash::oid, + inflate: &mut zlib::Inflate, + pack_cache: &mut dyn DecodeEntry, + snapshot: &mut load_index::Snapshot, + recursion: Option>, + ) -> Result)>, Error> { + if let Some(r) = recursion { + if r.depth >= self.max_recursion_depth { + return Err(Error::DeltaBaseRecursionLimit { + max_depth: self.max_recursion_depth, + id: r.original_id.to_owned(), + }); + } + } else if !self.ignore_replacements { + if let Ok(pos) = self + .store + .replacements + .binary_search_by(|(map_this, _)| map_this.as_ref().cmp(id)) + { + id = self.store.replacements[pos].1.as_ref(); + } + } + + 'outer: loop { + { + let marker = snapshot.marker; + for (idx, index) in snapshot.indices.iter_mut().enumerate() { + if let Some(handle::index_lookup::Outcome { + object_index: handle::IndexForObjectInPack { pack_id, pack_offset }, + index_file, + pack: possibly_pack, + }) = index.lookup(id) + { + let pack = match possibly_pack { + Some(pack) => pack, + None => match self.store.load_pack(pack_id, marker)? { + Some(pack) => { + *possibly_pack = Some(pack); + possibly_pack.as_deref().expect("just put it in") + } + None => match self.store.load_one_index(self.refresh, snapshot.marker)? { + Some(new_snapshot) => { + *snapshot = new_snapshot; + self.clear_cache(); + continue 'outer; + } + None => return Ok(None), + }, + }, + }; + let resolved_pack_id = pack.id; + let entry = pack.entry(pack_offset)?; + let header_size = entry.header_size(); + let result = { + let mut scratch = Vec::new(); + let mut temp = tempfile::tempfile()?; + let result = match pack.decode_entry_to_write( + entry, + &mut scratch, + inflate, + &mut temp, + &|id, _out| { + index_file.pack_offset_by_id(id).and_then(|pack_offset| { + pack.entry(pack_offset) + .ok() + .map(gix_pack::data::decode::entry::ResolvedBase::InPack) + }) + }, + pack_cache, + ) { + Ok(outcome) => Ok((outcome, temp)), + Err(gix_pack::data::decode::Error::DeltaBaseUnresolved(base_id)) => { + let mut buf = Vec::new(); + let obj_kind = self + .try_find_cached_inner( + &base_id, + &mut buf, + inflate, + pack_cache, + snapshot, + recursion + .map(DeltaBaseRecursion::inc_depth) + .or_else(|| DeltaBaseRecursion::new(id).into()), + ) + .map_err(|err| Error::DeltaBaseLookup { + err: Box::new(err), + base_id, + id: id.to_owned(), + })? + .ok_or_else(|| Error::DeltaBaseMissing { + base_id, + id: id.to_owned(), + })? + .0 + .kind; + let handle::index_lookup::Outcome { + object_index: + handle::IndexForObjectInPack { + pack_id: _, + pack_offset, + }, + index_file, + pack: possibly_pack, + } = match snapshot.indices[idx].lookup(id) { + Some(res) => res, + None => { + let mut out = None; + for index in &mut snapshot.indices { + out = index.lookup(id); + if out.is_some() { + break; + } + } + + out.unwrap_or_else(|| { + panic!( + "could not find object {id} in any index after looking up one of its base objects {base_id}" + ) + }) + } + }; + let pack = possibly_pack + .as_ref() + .expect("pack to still be available like just now"); + let entry = pack.entry(pack_offset)?; + let mut scratch = Vec::new(); + let mut temp = tempfile::tempfile()?; + pack.decode_entry_to_write( + entry, + &mut scratch, + inflate, + &mut temp, + &|id, out| { + index_file + .pack_offset_by_id(id) + .and_then(|pack_offset| { + pack.entry(pack_offset) + .ok() + .map(gix_pack::data::decode::entry::ResolvedBase::InPack) + }) + .or_else(|| { + (id == base_id).then(|| { + out.resize(buf.len(), 0); + out.copy_from_slice(buf.as_slice()); + gix_pack::data::decode::entry::ResolvedBase::OutOfPack { + kind: obj_kind, + end: out.len(), + } + }) + }) + }, + pack_cache, + ) + .map(|outcome| (outcome, temp)) + } + Err(err) => Err(err), + }?; + result + }; + let (outcome, mut temp) = result; + temp.rewind()?; + let res = ( + crate::find::Stream::from_file(outcome.kind, outcome.object_size, temp), + Some(gix_pack::data::entry::Location { + pack_id: resolved_pack_id, + pack_offset, + entry_size: outcome.compressed_size + header_size, + }), + ); + + if idx != 0 { + snapshot.indices.swap(0, idx); + } + return Ok(Some(res)); + } + } + } + + for lodb in snapshot.loose_dbs.iter() { + if lodb.contains(id) { + return lodb + .try_find_stream(id) + .map(|obj| obj.map(|obj| (obj, None))) + .map_err(Into::into); + } + } + + match self.store.load_one_index(self.refresh, snapshot.marker)? { + Some(new_snapshot) => { + *snapshot = new_snapshot; + self.clear_cache(); + } + None => return Ok(None), + } + } + } +} + +impl super::Handle +where + S: Deref + Clone, +{ + /// Try to find the object identified by `id` in any backing store and return it as a readable stream, + /// along with its pack location if it came from a pack. + pub fn try_find_stream( + &self, + id: &oid, + pack_cache: &mut dyn DecodeEntry, + ) -> Result)>, gix_object::find::Error> { + let mut snapshot = self.snapshot.borrow_mut(); + let mut inflate = self.inflate.borrow_mut(); + self.try_find_stream_inner(id, &mut inflate, pack_cache, &mut snapshot, None) + .map_err(|err| Box::new(err) as _) + } +} diff --git a/gix-odb/src/store_impls/loose/find.rs b/gix-odb/src/store_impls/loose/find.rs index 6ac6d26de6..d2983cd5a9 100644 --- a/gix-odb/src/store_impls/loose/find.rs +++ b/gix-odb/src/store_impls/loose/find.rs @@ -1,4 +1,10 @@ -use std::{cmp::Ordering, collections::HashSet, fs, io::Read, path::PathBuf}; +use std::{ + cmp::Ordering, + collections::HashSet, + fs, + io::{self, Read}, + path::PathBuf, +}; use gix_features::zlib; @@ -27,6 +33,174 @@ pub enum Error { }, } +pub(crate) struct StreamReader { + file: fs::File, + inflate: zlib::Decompress, + compressed: [u8; 8192], + compressed_pos: usize, + compressed_len: usize, + pending: Vec, + pending_pos: usize, + done: bool, +} + +impl StreamReader { + fn new(file: fs::File) -> Self { + Self { + file, + inflate: zlib::Decompress::default(), + compressed: [0; 8192], + compressed_pos: 0, + compressed_len: 0, + pending: Vec::new(), + pending_pos: 0, + done: false, + } + } + + fn read_header(mut self, path: &std::path::Path) -> Result<(gix_object::Kind, u64, Self), Error> { + let mut decompressed = [0u8; HEADER_MAX_SIZE + 8192]; + let mut produced_total = 0usize; + loop { + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed).map_err(|source| Error::Io { + source, + action: "read", + path: path.to_owned(), + })?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let status = self + .inflate + .decompress(input, &mut decompressed[produced_total..], flush) + .map_err(|source| Error::DecompressFile { + source: source.into(), + path: path.to_owned(), + })?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + produced_total += produced; + + if let Ok((kind, size, header_size)) = gix_object::decode::loose_header(&decompressed[..produced_total]) { + self.pending = decompressed[header_size..produced_total].to_vec(); + return Ok((kind, size, self)); + } + + if produced_total == decompressed.len() { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header exceeded the supported maximum size", + }, + )); + } + + match status { + zlib::Status::StreamEnd => { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header terminated before it could be parsed", + }, + )) + } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header terminated before it could be parsed", + }, + )); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 || produced != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(Error::DecompressFile { + source: zlib::inflate::Error::Status(zlib::Status::BufError), + path: path.to_owned(), + }); + } + } + } + } +} + +impl io::Read for StreamReader { + fn read(&mut self, mut out: &mut [u8]) -> io::Result { + let mut total_written = 0usize; + + if self.pending_pos < self.pending.len() { + let pending = &self.pending[self.pending_pos..]; + let count = pending.len().min(out.len()); + out[..count].copy_from_slice(&pending[..count]); + self.pending_pos += count; + total_written += count; + out = &mut out[count..]; + if self.pending_pos == self.pending.len() { + self.pending.clear(); + self.pending_pos = 0; + } + if out.is_empty() { + return Ok(total_written); + } + } + + loop { + if self.done { + return Ok(total_written); + } + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed)?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let status = self + .inflate + .decompress(input, out, flush) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "corrupt deflate stream"))?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + total_written += produced; + + match status { + zlib::Status::StreamEnd => { + self.done = true; + return Ok(total_written); + } + zlib::Status::Ok | zlib::Status::BufError if produced != 0 => return Ok(total_written), + zlib::Status::Ok | zlib::Status::BufError if eof => { + self.done = true; + return Ok(total_written); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "deflate stream made no progress", + )); + } + } + } + } +} + /// Object lookup impl Store { const OPEN_ACTION: &'static str = "open"; @@ -134,6 +308,28 @@ impl Store { } } + /// Return the object identified by `id` as a decoded byte stream. + /// + /// Returns `Ok(None)` if there is no such object. + pub fn try_find_stream(&self, id: &gix_hash::oid) -> Result, Error> { + debug_assert_eq!(self.object_hash, id.kind()); + let path = hash_path(id, self.path.clone()); + let file = match fs::File::open(&path) { + Ok(file) => file, + Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None), + Err(source) => { + return Err(Error::Io { + source, + action: Self::OPEN_ACTION, + path, + }) + } + }; + + let (kind, size, reader) = StreamReader::new(file).read_header(&path)?; + Ok(Some(crate::find::Stream::from_loose(kind, size, reader))) + } + /// Return only the decompressed size of the object and its kind without fully reading it into memory as tuple of `(size, kind)`. /// Returns `None` if `id` does not exist in the database. pub fn try_header(&self, id: &gix_hash::oid) -> Result, Error> { diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index fbb038b5d0..14e50f6388 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -188,7 +188,8 @@ where header_ofs += consumed; fully_resolved_delta_bytes.resize(result_size as usize, 0); - data::delta::apply(&base_bytes, fully_resolved_delta_bytes, &delta_bytes[header_ofs..])?; + let mut target = &mut fully_resolved_delta_bytes[..]; + data::delta::apply(&base_bytes, &mut target, &delta_bytes[header_ofs..])?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all @@ -363,11 +364,8 @@ where header_ofs += consumed; fully_resolved_delta_bytes.resize(result_size as usize, 0); - data::delta::apply( - &base_bytes, - &mut fully_resolved_delta_bytes, - &delta_bytes[header_ofs..], - )?; + let mut target = &mut fully_resolved_delta_bytes[..]; + data::delta::apply(&base_bytes, &mut target, &delta_bytes[header_ofs..])?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all diff --git a/gix-pack/src/data/delta.rs b/gix-pack/src/data/delta.rs index f5b7dfb05e..e1d32b04d7 100644 --- a/gix-pack/src/data/delta.rs +++ b/gix-pack/src/data/delta.rs @@ -30,7 +30,7 @@ pub(crate) fn decode_header_size(d: &[u8]) -> (u64, usize) { (size, consumed) } -pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<(), apply::Error> { +pub(crate) fn apply(base: &[u8], target: &mut W, data: &[u8]) -> Result<(), apply::Error> { let mut i = 0; while let Some(cmd) = data.get(i) { i += 1; @@ -69,19 +69,20 @@ pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<( size = 0x10000; // 65536 } let ofs = ofs as usize; - std::io::Write::write(&mut target, &base[ofs..ofs + size as usize]) + target + .write_all(&base[ofs..ofs + size as usize]) .map_err(|_e| apply::Error::DeltaCopyBaseSliceMismatch)?; } 0 => return Err(apply::Error::UnsupportedCommandCode), size => { - std::io::Write::write(&mut target, &data[i..i + *size as usize]) + target + .write_all(&data[i..i + *size as usize]) .map_err(|_e| apply::Error::DeltaCopyDataSliceMismatch)?; i += *size as usize; } } } - assert_eq!(i, data.len()); - assert_eq!(target.len(), 0); + debug_assert_eq!(i, data.len()); Ok(()) } diff --git a/gix-pack/src/data/file/decode/entry.rs b/gix-pack/src/data/file/decode/entry.rs index 9711961e8f..51e891d52d 100644 --- a/gix-pack/src/data/file/decode/entry.rs +++ b/gix-pack/src/data/file/decode/entry.rs @@ -1,4 +1,4 @@ -use std::ops::Range; +use std::{io, ops::Range}; use gix_features::zlib; use smallvec::SmallVec; @@ -147,6 +147,40 @@ impl File { .map(|(_status, consumed_in, consumed_out)| (consumed_in, consumed_out)) } + fn decompress_entry_to_write( + &self, + entry: &data::Entry, + inflate: &mut zlib::Inflate, + out: &mut dyn io::Write, + ) -> Result { + let offset: usize = entry.data_offset.try_into().expect("offset representable by machine"); + assert!(offset < self.data.len(), "entry offset out of bounds"); + + inflate.reset(); + let mut input = io::Cursor::new(&self.data[offset..]); + let mut buf = [0u8; 8192]; + let mut total_written = 0usize; + loop { + let written = gix_features::zlib::stream::inflate::read(&mut input, &mut inflate.state, &mut buf)?; + if written == 0 { + break; + } + out.write_all(&buf[..written])?; + total_written += written; + if total_written as u64 == entry.decompressed_size { + break; + } + } + + if total_written as u64 != entry.decompressed_size { + return Err(Error::ZlibInflate(gix_features::zlib::inflate::Error::Status( + gix_features::zlib::Status::BufError, + ))); + } + + Ok(inflate.state.total_in() as usize) + } + /// Decode an entry, resolving delta's as needed, while growing the `out` vector if there is not enough /// space to hold the result object. /// @@ -187,6 +221,49 @@ impl File { } } + /// Decode an entry while streaming the fully resolved object bytes into `out`. + /// + /// Unlike [`decode_entry()`][Self::decode_entry()], this never returns + /// the decoded object bytes to the caller. Instead, it uses `scratch` + /// for the temporary buffers needed during delta resolution and writes + /// the final object bytes into `out`. + pub fn decode_entry_to_write( + &self, + entry: data::Entry, + scratch: &mut Vec, + inflate: &mut zlib::Inflate, + out: &mut dyn io::Write, + resolve: &dyn Fn(&gix_hash::oid, &mut Vec) -> Option, + delta_cache: &mut dyn cache::DecodeEntry, + ) -> Result { + use crate::data::entry::Header::*; + if let Some((kind, packed_size)) = delta_cache.get(self.id, entry.data_offset, scratch) { + out.write_all(scratch)?; + return Ok(Outcome { + kind, + num_deltas: 0, + decompressed_size: entry.decompressed_size, + compressed_size: packed_size, + object_size: scratch.len() as u64, + }); + } + + match entry.header { + Tree | Blob | Commit | Tag => self + .decompress_entry_to_write(&entry, inflate, out) + .map(|consumed_input| { + Outcome::from_object_entry( + entry.header.as_kind().expect("a non-delta entry"), + &entry, + consumed_input, + ) + }), + OfsDelta { .. } | RefDelta { .. } => { + self.resolve_deltas_to_write(entry, resolve, inflate, scratch, out, delta_cache) + } + } + } + /// resolve: technically, this shouldn't ever be required as stored local packs don't refer to objects by id /// that are outside of the pack. Unless, of course, the ref refers to an object within this pack, which means /// it's very, very large as 20bytes are smaller than the corresponding MSB encoded number @@ -375,7 +452,8 @@ impl File { if delta_idx + 1 == chain_len { last_result_size = Some(result_size); } - delta::apply(&source_buf[..base_size], &mut target_buf[..result_size], data)?; + let mut target = &mut target_buf[..result_size]; + delta::apply(&source_buf[..base_size], &mut target, data)?; // use the target as source for the next delta std::mem::swap(&mut source_buf, &mut target_buf); } @@ -416,6 +494,183 @@ impl File { object_size: last_result_size as u64, }) } + + fn resolve_deltas_to_write( + &self, + last: data::Entry, + resolve: &dyn Fn(&gix_hash::oid, &mut Vec) -> Option, + inflate: &mut zlib::Inflate, + out: &mut Vec, + writer: &mut dyn io::Write, + cache: &mut dyn cache::DecodeEntry, + ) -> Result { + let mut chain = SmallVec::<[Delta; 10]>::default(); + let first_entry = last.clone(); + let mut cursor = last; + let mut base_buffer_size: Option = None; + let mut object_kind: Option = None; + let mut consumed_input: Option = None; + + let mut total_delta_data_size: u64 = 0; + while cursor.header.is_delta() { + if let Some((kind, packed_size)) = cache.get(self.id, cursor.data_offset, out) { + base_buffer_size = Some(out.len()); + object_kind = Some(kind); + if total_delta_data_size == 0 { + consumed_input = Some(packed_size); + } + break; + } + total_delta_data_size += cursor.decompressed_size; + let decompressed_size = cursor + .decompressed_size + .try_into() + .expect("a single delta size small enough to fit a usize"); + chain.push(Delta { + data: Range { + start: 0, + end: decompressed_size, + }, + base_size: 0, + result_size: 0, + decompressed_size, + data_offset: cursor.data_offset, + }); + use crate::data::entry::Header; + cursor = match cursor.header { + Header::OfsDelta { base_distance } => self.entry(cursor.base_pack_offset(base_distance))?, + Header::RefDelta { base_id } => match resolve(base_id.as_ref(), out) { + Some(ResolvedBase::InPack(entry)) => entry, + Some(ResolvedBase::OutOfPack { end, kind }) => { + base_buffer_size = Some(end); + object_kind = Some(kind); + break; + } + None => return Err(Error::DeltaBaseUnresolved(base_id)), + }, + _ => unreachable!("cursor.is_delta() only allows deltas here"), + }; + } + + if chain.is_empty() { + writer.write_all(out)?; + return Ok(Outcome::from_object_entry( + object_kind.expect("object kind as set by cache"), + &first_entry, + consumed_input.expect("consumed bytes as set by cache"), + )); + } + + let total_delta_data_size: usize = total_delta_data_size.try_into().expect("delta data to fit in memory"); + + let chain_len = chain.len(); + let (first_buffer_end, second_buffer_end) = { + let delta_start = base_buffer_size.unwrap_or(0); + + let delta_range = Range { + start: delta_start, + end: delta_start + total_delta_data_size, + }; + out.try_reserve(delta_range.end.saturating_sub(out.len()))?; + out.resize(delta_range.end, 0); + + let mut instructions = &mut out[delta_range.clone()]; + let mut relative_delta_start = 0; + let mut biggest_result_size = 0; + for (delta_idx, delta) in chain.iter_mut().rev().enumerate() { + let consumed_from_data_offset = self.decompress_entry_from_data_offset( + delta.data_offset, + inflate, + &mut instructions[..delta.decompressed_size], + )?; + let is_last_delta_to_be_applied = delta_idx + 1 == chain_len; + if is_last_delta_to_be_applied { + consumed_input = Some(consumed_from_data_offset); + } + + let (base_size, offset) = delta::decode_header_size(instructions); + let mut bytes_consumed_by_header = offset; + biggest_result_size = biggest_result_size.max(base_size); + delta.base_size = base_size.try_into().expect("base size fits into usize"); + + let (result_size, offset) = delta::decode_header_size(&instructions[offset..]); + bytes_consumed_by_header += offset; + biggest_result_size = biggest_result_size.max(result_size); + delta.result_size = result_size.try_into().expect("result size fits into usize"); + + delta.data.start = relative_delta_start + bytes_consumed_by_header; + relative_delta_start += delta.decompressed_size; + delta.data.end = relative_delta_start; + + instructions = &mut instructions[delta.decompressed_size..]; + } + + let biggest_result_size: usize = biggest_result_size.try_into().map_err(|_| Error::OutOfMemory)?; + let first_buffer_size = biggest_result_size; + let second_buffer_size = if chain_len > 1 { first_buffer_size } else { 0 }; + let out_size = first_buffer_size + second_buffer_size + total_delta_data_size; + out.try_reserve(out_size.saturating_sub(out.len()))?; + out.resize(out_size, 0); + + let second_buffer_end = { + let end = first_buffer_size + second_buffer_size; + if delta_range.start < end { + out.copy_within(delta_range, end); + } else { + let (buffers, instructions) = out.split_at_mut(end); + instructions.copy_from_slice(&buffers[delta_range]); + } + end + }; + + if base_buffer_size.is_none() { + let base_entry = cursor; + debug_assert!(!base_entry.header.is_delta()); + object_kind = base_entry.header.as_kind(); + let out_base = &mut out[..out_size - total_delta_data_size]; + self.decompress_entry_from_data_offset(base_entry.data_offset, inflate, out_base)?; + } + + (first_buffer_size, second_buffer_end) + }; + + let (buffers, instructions) = out.split_at_mut(second_buffer_end); + let (mut source_buf, mut target_buf) = buffers.split_at_mut(first_buffer_end); + + let mut last_result_size = None; + for ( + delta_idx, + Delta { + data, + base_size, + result_size, + .. + }, + ) in chain.into_iter().rev().enumerate() + { + let data = &mut instructions[data]; + let is_last_delta = delta_idx + 1 == chain_len; + if is_last_delta { + last_result_size = Some(result_size); + delta::apply(&source_buf[..base_size], writer, data)?; + } else { + let mut target = &mut target_buf[..result_size]; + delta::apply(&source_buf[..base_size], &mut target, data)?; + std::mem::swap(&mut source_buf, &mut target_buf); + } + } + + let object_kind = object_kind.expect("a base object as root of any delta chain that we are here to resolve"); + let consumed_input = consumed_input.expect("at least one decompressed delta object"); + let last_result_size = last_result_size.expect("at least one delta chain item"); + Ok(Outcome { + kind: object_kind, + num_deltas: chain_len as u32, + decompressed_size: first_entry.decompressed_size, + compressed_size: consumed_input, + object_size: last_result_size as u64, + }) + } } #[cfg(test)] diff --git a/gix-pack/src/data/file/decode/mod.rs b/gix-pack/src/data/file/decode/mod.rs index 71bbf1595c..ed086e4eeb 100644 --- a/gix-pack/src/data/file/decode/mod.rs +++ b/gix-pack/src/data/file/decode/mod.rs @@ -7,12 +7,15 @@ pub mod header; /// Returned by [`File::decode_header()`][crate::data::File::decode_header()], /// [`File::decode_entry()`][crate::data::File::decode_entry()] and . +/// [`File::decode_entry_to_write()`][crate::data::File::decode_entry_to_write()] and /// [`File::decompress_entry()`][crate::data::File::decompress_entry()] #[derive(thiserror::Error, Debug)] #[allow(missing_docs)] pub enum Error { #[error("Failed to decompress pack entry")] ZlibInflate(#[from] gix_features::zlib::inflate::Error), + #[error("Failed to write decoded object bytes")] + Io(#[from] std::io::Error), #[error("A delta chain could not be followed as the ref base with id {0} could not be found")] DeltaBaseUnresolved(gix_hash::ObjectId), #[error(transparent)] diff --git a/gix/src/repository/mod.rs b/gix/src/repository/mod.rs index 068d3bcbba..06ff5ab785 100644 --- a/gix/src/repository/mod.rs +++ b/gix/src/repository/mod.rs @@ -539,3 +539,35 @@ pub mod worktree_archive { /// The error returned by [`Repository::worktree_archive()`](crate::Repository::worktree_archive()). pub type Error = gix_error::Error; } + +/// +pub mod blob_stream { + /// The error returned by [`Repository::try_find_blob_stream()`](crate::Repository::try_find_blob_stream()). + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error(transparent)] + Find(#[from] crate::object::find::Error), + #[error("Needed {id} to be a blob to stream it, got {actual}")] + NotABlob { + id: gix_hash::ObjectId, + actual: gix_object::Kind, + }, + } + + /// Errors returned by [`Repository::find_blob_stream()`](crate::Repository::find_blob_stream()). + pub mod existing { + /// The error returned by [`Repository::find_blob_stream()`](crate::Repository::find_blob_stream()). + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error(transparent)] + Find(#[from] crate::object::find::existing::Error), + #[error("Needed {id} to be a blob to stream it, got {actual}")] + NotABlob { + id: gix_hash::ObjectId, + actual: gix_object::Kind, + }, + } + } +} diff --git a/gix/src/repository/object.rs b/gix/src/repository/object.rs index ff4df5b042..7b56037180 100644 --- a/gix/src/repository/object.rs +++ b/gix/src/repository/object.rs @@ -240,6 +240,62 @@ impl crate::Repository { None => Ok(None), } } + + /// Try to find a blob with `id` and return its decoded bytes as a stream. + pub fn try_find_blob_stream( + &self, + id: impl Into, + ) -> Result, crate::repository::blob_stream::Error> { + let id = id.into(); + if id == ObjectId::empty_blob(self.object_hash()) { + return Ok(Some(gix_odb::find::Stream::empty_blob())); + } + + match self.try_find_header(id)? { + Some(header) => { + if header.kind() != gix_object::Kind::Blob { + return Err(crate::repository::blob_stream::Error::NotABlob { + id, + actual: header.kind(), + }); + } + } + None => return Ok(None), + } + + match self + .objects + .try_find_stream(&id) + .map_err(crate::object::find::Error::from)? + { + Some((stream, _location)) => Ok(Some(stream)), + None => Ok(None), + } + } + + /// Find a blob with `id` and return its decoded bytes as a stream. + pub fn find_blob_stream( + &self, + id: impl Into, + ) -> Result { + let id = id.into(); + self.try_find_blob_stream(id) + .map_err(|err| match err { + crate::repository::blob_stream::Error::Find(err) => { + crate::repository::blob_stream::existing::Error::Find(gix_object::find::existing::Error::Find( + err.0, + )) + } + crate::repository::blob_stream::Error::NotABlob { id, actual } => { + crate::repository::blob_stream::existing::Error::NotABlob { id, actual } + } + })? + .ok_or_else(|| { + crate::repository::blob_stream::existing::Error::Find(gix_object::find::existing::Error::NotFound { + oid: id, + }) + }) + } } /// Write objects of any type. diff --git a/gix/tests/gix/repository/object.rs b/gix/tests/gix/repository/object.rs index 007bb1cee8..336f7ef7d5 100644 --- a/gix/tests/gix/repository/object.rs +++ b/gix/tests/gix/repository/object.rs @@ -581,6 +581,93 @@ mod find { } } +mod blob_stream { + use std::io::Read; + + use gix_object::Kind; + + fn blob_id_with_storage(repo: &gix::Repository, packed: bool) -> crate::Result { + for id in repo.objects.iter()? { + let id = id?; + let Some(header) = repo.try_find_header(id)? else { + continue; + }; + let is_packed = matches!(header, gix_odb::find::Header::Packed(_)); + if header.kind() == Kind::Blob && is_packed == packed { + return Ok(id); + } + } + panic!( + "expected at least one {} blob in fixture", + if packed { "packed" } else { "loose" } + ); + } + + #[test] + fn streams_loose_blobs() -> crate::Result { + let repo = crate::named_repo("make_packed_and_loose.sh")?; + let id = blob_id_with_storage(&repo, false)?; + let expected = repo.find_blob(id)?.data.clone(); + + let mut stream = repo.find_blob_stream(id)?; + assert_eq!(stream.kind(), Kind::Blob); + assert_eq!(stream.size(), expected.len() as u64); + + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn streams_packed_blobs_via_repository_and_odb() -> crate::Result { + let repo = crate::named_repo("make_packed_and_loose.sh")?; + let id = blob_id_with_storage(&repo, true)?; + let expected = repo.find_blob(id)?.data.clone(); + + let mut stream = repo.find_blob_stream(id)?; + assert_eq!(stream.kind(), Kind::Blob); + assert_eq!(stream.size(), expected.len() as u64); + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + + let Some((mut odb_stream, location)) = repo.objects.try_find_stream(id.as_ref())? else { + panic!("blob must be present"); + }; + assert!(location.is_some(), "packed blobs report their pack location"); + assert_eq!(odb_stream.kind(), Kind::Blob); + assert_eq!(odb_stream.size(), expected.len() as u64); + actual.clear(); + odb_stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn streams_empty_blob_without_storage_lookup() -> crate::Result { + let repo = crate::basic_repo()?; + let mut stream = repo.find_blob_stream(repo.object_hash().empty_blob())?; + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert!(actual.is_empty()); + Ok(()) + } + + #[test] + fn rejects_non_blob_stream_requests() -> crate::Result { + let repo = crate::basic_repo()?; + match repo.find_blob_stream(repo.head_id()?) { + Err(gix::repository::blob_stream::existing::Error::NotABlob { actual, .. }) => { + assert_eq!(actual, Kind::Commit); + } + Ok(_) => panic!("expected the HEAD commit lookup to be rejected"), + Err(other) => panic!("unexpected error: {other}"), + } + Ok(()) + } +} + #[test] fn empty_objects_are_always_present_but_not_in_plumbing() -> crate::Result { let repo = empty_bare_in_memory_repo()?; From 84931e7c3c33bf532b7778abde6de4d1e95b4199 Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 22:39:38 -0500 Subject: [PATCH 2/6] Harden blob streaming validation and add memory proof Reject truncated loose-object streams, restore delta output-length validation, and add allocator-backed proof that packed delta blob streaming lowers peak memory usage compared to eager blob lookup. Co-authored-by: Codex --- gix-odb/src/store_impls/loose/find.rs | 109 ++++++++- gix-odb/tests/odb/store/loose.rs | 23 ++ gix-pack/src/cache/delta/traverse/resolve.rs | 14 +- gix-pack/src/data/delta.rs | 42 +++- gix-pack/src/data/file/decode/entry.rs | 6 +- gix/tests/blob_stream_memory.rs | 221 +++++++++++++++++++ 6 files changed, 402 insertions(+), 13 deletions(-) create mode 100644 gix/tests/blob_stream_memory.rs diff --git a/gix-odb/src/store_impls/loose/find.rs b/gix-odb/src/store_impls/loose/find.rs index d2983cd5a9..0dc0fd2e10 100644 --- a/gix-odb/src/store_impls/loose/find.rs +++ b/gix-odb/src/store_impls/loose/find.rs @@ -41,6 +41,7 @@ pub(crate) struct StreamReader { compressed_len: usize, pending: Vec, pending_pos: usize, + remaining: u64, done: bool, } @@ -54,10 +55,69 @@ impl StreamReader { compressed_len: 0, pending: Vec::new(), pending_pos: 0, + remaining: 0, done: false, } } + fn finish_stream(&mut self) -> io::Result<()> { + if self.done { + return Ok(()); + } + + let mut scratch = [0u8; 256]; + loop { + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed)?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let status = self + .inflate + .decompress(input, &mut scratch, flush) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "corrupt deflate stream"))?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + + if produced != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "loose object stream exceeded the declared size", + )); + } + + match status { + zlib::Status::StreamEnd => { + self.done = true; + return Ok(()); + } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "deflate stream made no progress", + )); + } + } + } + } + fn read_header(mut self, path: &std::path::Path) -> Result<(gix_object::Kind, u64, Self), Error> { let mut decompressed = [0u8; HEADER_MAX_SIZE + 8192]; let mut produced_total = 0usize; @@ -93,7 +153,17 @@ impl StreamReader { produced_total += produced; if let Ok((kind, size, header_size)) = gix_object::decode::loose_header(&decompressed[..produced_total]) { + let pending_len = produced_total.saturating_sub(header_size); + if pending_len as u64 > size { + return Err(Error::SizeMismatch { + expected: size, + actual: pending_len as u64, + path: path.to_owned(), + }); + } self.pending = decompressed[header_size..produced_total].to_vec(); + self.remaining = size - pending_len as u64; + self.done = self.remaining == 0 && matches!(status, zlib::Status::StreamEnd); return Ok((kind, size, self)); } @@ -136,6 +206,10 @@ impl io::Read for StreamReader { fn read(&mut self, mut out: &mut [u8]) -> io::Result { let mut total_written = 0usize; + if out.is_empty() { + return Ok(0); + } + if self.pending_pos < self.pending.len() { let pending = &self.pending[self.pending_pos..]; let count = pending.len().min(out.len()); @@ -147,11 +221,20 @@ impl io::Read for StreamReader { self.pending.clear(); self.pending_pos = 0; } + if self.remaining == 0 { + self.finish_stream()?; + return Ok(total_written); + } if out.is_empty() { return Ok(total_written); } } + if self.remaining == 0 { + self.finish_stream()?; + return Ok(total_written); + } + loop { if self.done { return Ok(total_written); @@ -170,25 +253,43 @@ impl io::Read for StreamReader { } else { zlib::FlushDecompress::None }; + let out_len = out.len().min(usize::try_from(self.remaining).unwrap_or(usize::MAX)); let status = self .inflate - .decompress(input, out, flush) + .decompress(input, &mut out[..out_len], flush) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "corrupt deflate stream"))?; let consumed = (self.inflate.total_in() - before_in) as usize; let produced = (self.inflate.total_out() - before_out) as usize; self.compressed_pos += consumed; total_written += produced; + self.remaining = self + .remaining + .checked_sub(produced as u64) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "loose object exceeded the declared size"))?; match status { zlib::Status::StreamEnd => { + if self.remaining != 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } self.done = true; return Ok(total_written); } - zlib::Status::Ok | zlib::Status::BufError if produced != 0 => return Ok(total_written), - zlib::Status::Ok | zlib::Status::BufError if eof => { - self.done = true; + zlib::Status::Ok | zlib::Status::BufError if produced != 0 => { + if self.remaining == 0 { + self.finish_stream()?; + } return Ok(total_written); } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } zlib::Status::Ok | zlib::Status::BufError if consumed != 0 => {} zlib::Status::Ok | zlib::Status::BufError => { return Err(io::Error::new( diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index 456a3b30ad..af4ef40d0b 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -33,6 +33,29 @@ pub fn locate_oid(id: gix_hash::ObjectId, buf: &mut Vec) -> gix_object::Data ldb().try_find(&id, buf).expect("read success").expect("id present") } +mod stream { + use std::io::{ErrorKind, Read}; + + use gix_odb::loose::Store; + + #[test] + fn truncated_streams_fail_instead_of_returning_early_eof() -> crate::Result { + let dir = gix_testtools::tempfile::tempdir()?; + let store = Store::at(dir.path(), gix_hash::Kind::Sha1); + let id = store.write_buf(gix_object::Kind::Blob, &vec![b'x'; 64 * 1024])?; + let path = store.object_path(&id); + + let mut compressed = std::fs::read(&path)?; + compressed.truncate(compressed.len() - 1); + std::fs::write(&path, compressed)?; + + let mut stream = store.try_find_stream(id.as_ref())?.expect("object still addressable"); + let err = std::io::copy(&mut stream, &mut std::io::sink()).expect_err("truncated streams must fail"); + assert_eq!(err.kind(), ErrorKind::UnexpectedEof); + Ok(()) + } +} + #[test] fn verify_integrity() { let db = ldb(); diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index 14e50f6388..36ae3b3bff 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -189,7 +189,12 @@ where fully_resolved_delta_bytes.resize(result_size as usize, 0); let mut target = &mut fully_resolved_delta_bytes[..]; - data::delta::apply(&base_bytes, &mut target, &delta_bytes[header_ofs..])?; + data::delta::apply( + &base_bytes, + &mut target, + &delta_bytes[header_ofs..], + result_size as usize, + )?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all @@ -365,7 +370,12 @@ where fully_resolved_delta_bytes.resize(result_size as usize, 0); let mut target = &mut fully_resolved_delta_bytes[..]; - data::delta::apply(&base_bytes, &mut target, &delta_bytes[header_ofs..])?; + data::delta::apply( + &base_bytes, + &mut target, + &delta_bytes[header_ofs..], + result_size as usize, + )?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all diff --git a/gix-pack/src/data/delta.rs b/gix-pack/src/data/delta.rs index e1d32b04d7..278ec949ef 100644 --- a/gix-pack/src/data/delta.rs +++ b/gix-pack/src/data/delta.rs @@ -10,6 +10,8 @@ pub mod apply { DeltaCopyBaseSliceMismatch, #[error("Delta copy data: byte slices must match")] DeltaCopyDataSliceMismatch, + #[error("Delta output size mismatch: expected {expected} bytes, got {actual}")] + OutputSizeMismatch { expected: usize, actual: usize }, } } @@ -30,8 +32,14 @@ pub(crate) fn decode_header_size(d: &[u8]) -> (u64, usize) { (size, consumed) } -pub(crate) fn apply(base: &[u8], target: &mut W, data: &[u8]) -> Result<(), apply::Error> { +pub(crate) fn apply( + base: &[u8], + target: &mut W, + data: &[u8], + expected_size: usize, +) -> Result<(), apply::Error> { let mut i = 0; + let mut produced = 0usize; while let Some(cmd) = data.get(i) { i += 1; match cmd { @@ -69,20 +77,46 @@ pub(crate) fn apply(base: &[u8], target: &mut W, dat size = 0x10000; // 65536 } let ofs = ofs as usize; + let size = size as usize; target - .write_all(&base[ofs..ofs + size as usize]) + .write_all(&base[ofs..ofs + size]) .map_err(|_e| apply::Error::DeltaCopyBaseSliceMismatch)?; + produced += size; } 0 => return Err(apply::Error::UnsupportedCommandCode), size => { + let size = *size as usize; target - .write_all(&data[i..i + *size as usize]) + .write_all(&data[i..i + size]) .map_err(|_e| apply::Error::DeltaCopyDataSliceMismatch)?; - i += *size as usize; + produced += size; + i += size; } } } debug_assert_eq!(i, data.len()); + if produced != expected_size { + return Err(apply::Error::OutputSizeMismatch { + expected: expected_size, + actual: produced, + }); + } + Ok(()) } + +#[cfg(test)] +mod tests { + use super::apply; + + #[test] + fn rejects_outputs_shorter_than_declared() { + let mut out = Vec::new(); + let err = super::apply(b"hello", &mut out, &[0x90, 0x05], 6).expect_err("malformed deltas must fail"); + assert!(matches!( + err, + apply::Error::OutputSizeMismatch { expected: 6, actual: 5 } + )); + } +} diff --git a/gix-pack/src/data/file/decode/entry.rs b/gix-pack/src/data/file/decode/entry.rs index 51e891d52d..e79f4402e8 100644 --- a/gix-pack/src/data/file/decode/entry.rs +++ b/gix-pack/src/data/file/decode/entry.rs @@ -453,7 +453,7 @@ impl File { last_result_size = Some(result_size); } let mut target = &mut target_buf[..result_size]; - delta::apply(&source_buf[..base_size], &mut target, data)?; + delta::apply(&source_buf[..base_size], &mut target, data, result_size)?; // use the target as source for the next delta std::mem::swap(&mut source_buf, &mut target_buf); } @@ -652,10 +652,10 @@ impl File { let is_last_delta = delta_idx + 1 == chain_len; if is_last_delta { last_result_size = Some(result_size); - delta::apply(&source_buf[..base_size], writer, data)?; + delta::apply(&source_buf[..base_size], writer, data, result_size)?; } else { let mut target = &mut target_buf[..result_size]; - delta::apply(&source_buf[..base_size], &mut target, data)?; + delta::apply(&source_buf[..base_size], &mut target, data, result_size)?; std::mem::swap(&mut source_buf, &mut target_buf); } } diff --git a/gix/tests/blob_stream_memory.rs b/gix/tests/blob_stream_memory.rs new file mode 100644 index 0000000000..154548e287 --- /dev/null +++ b/gix/tests/blob_stream_memory.rs @@ -0,0 +1,221 @@ +#![allow(clippy::result_large_err)] + +use std::{ + alloc::{GlobalAlloc, Layout, System}, + io, + path::Path, + process::Command, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use gix::odb::find::Header; +use gix_object::Kind; +use gix_testtools::{tempfile, Result}; + +#[global_allocator] +static ALLOCATOR: MeasuringAllocator = MeasuringAllocator::new(); + +struct MeasuringAllocator { + current: AtomicUsize, + peak: AtomicUsize, +} + +impl MeasuringAllocator { + const fn new() -> Self { + Self { + current: AtomicUsize::new(0), + peak: AtomicUsize::new(0), + } + } + + fn prefixed_layout(layout: Layout) -> (Layout, usize) { + let (layout, offset) = Layout::new::() + .extend(layout) + .expect("prefix layout can be extended"); + (layout.pad_to_align(), offset) + } + + fn note_increase(&self, size: usize) { + if size == 0 { + return; + } + let current = self.current.fetch_add(size, Ordering::SeqCst) + size; + let mut observed = self.peak.load(Ordering::SeqCst); + while current > observed + && self + .peak + .compare_exchange(observed, current, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + observed = self.peak.load(Ordering::SeqCst); + } + } + + fn note_decrease(&self, size: usize) { + if size != 0 { + self.current.fetch_sub(size, Ordering::SeqCst); + } + } + + fn measure(&self, f: impl FnOnce() -> T) -> (T, usize) { + let baseline = self.current.load(Ordering::SeqCst); + self.peak.store(baseline, Ordering::SeqCst); + let value = f(); + (value, self.peak.load(Ordering::SeqCst).saturating_sub(baseline)) + } +} + +unsafe impl GlobalAlloc for MeasuringAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + let raw = unsafe { System.alloc(layout) }; + if !raw.is_null() { + self.note_increase(requested); + unsafe { raw.add(offset) } + } else { + raw + } + } + + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + let raw = unsafe { System.alloc_zeroed(layout) }; + if !raw.is_null() { + self.note_increase(requested); + unsafe { raw.add(offset) } + } else { + raw + } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + self.note_decrease(requested); + unsafe { System.dealloc(ptr.sub(offset), layout) }; + } + + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let (old_layout, old_offset) = Self::prefixed_layout(layout); + let (new_layout, new_offset) = + Self::prefixed_layout(Layout::from_size_align(new_size, layout.align()).expect("valid layout")); + let raw = unsafe { System.realloc(ptr.sub(old_offset), old_layout, new_layout.size()) }; + if !raw.is_null() { + match new_size.cmp(&layout.size()) { + std::cmp::Ordering::Greater => self.note_increase(new_size - layout.size()), + std::cmp::Ordering::Less => self.note_decrease(layout.size() - new_size), + std::cmp::Ordering::Equal => {} + } + unsafe { raw.add(new_offset) } + } else { + raw + } + } +} + +fn restricted() -> gix::open::Options { + gix::open::Options::isolated().config_overrides(["user.name=gitoxide", "user.email=gitoxide@localhost"]) +} + +fn open_repo(path: &Path) -> Result { + Ok(gix::ThreadSafeRepository::open_opts(path, restricted())?.to_thread_local()) +} + +fn git(dir: &Path, args: &[&str]) -> Result<()> { + let output = Command::new("git").current_dir(dir).args(args).output()?; + if output.status.success() { + return Ok(()); + } + Err(format!( + "git {:?} failed with status {:?}\nstdout:\n{}\nstderr:\n{}", + args, + output.status.code(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ) + .into()) +} + +fn create_packed_delta_repo() -> Result { + let dir = tempfile::tempdir()?; + git(dir.path(), &["init"])?; + git(dir.path(), &["config", "user.name", "gitoxide"])?; + git(dir.path(), &["config", "user.email", "gitoxide@localhost"])?; + + let blob_path = dir.path().join("blob.bin"); + let mut base = vec![b'a'; 16 * 1024 * 1024]; + for chunk in base.chunks_mut(4096) { + chunk[0] = b'A'; + chunk[1] = b'0'; + } + std::fs::write(&blob_path, &base)?; + git(dir.path(), &["add", "blob.bin"])?; + git(dir.path(), &["commit", "-m", "base"])?; + + let mut changed = base; + for idx in (0..changed.len()).step_by(4096) { + changed[idx + 1] = b'1'; + } + std::fs::write(&blob_path, &changed)?; + git(dir.path(), &["add", "blob.bin"])?; + git(dir.path(), &["commit", "-m", "delta"])?; + git( + dir.path(), + &["repack", "-adf", "--window=250", "--depth=50", "--window-memory=1g"], + )?; + git(dir.path(), &["prune-packed"])?; + Ok(dir) +} + +fn packed_delta_blob_id(repo: &gix::Repository) -> Result<(gix::ObjectId, u64)> { + for id in repo.objects.iter()? { + let id = id?; + match repo.try_find_header(id)? { + Some(Header::Packed(header)) if header.kind == Kind::Blob && header.num_deltas > 0 => { + return Ok((id, header.object_size)); + } + _ => {} + } + } + Err("expected at least one packed delta blob".into()) +} + +#[test] +fn streaming_packed_delta_blobs_uses_less_peak_memory_than_eager_lookup() -> Result { + let repo_dir = create_packed_delta_repo()?; + let repo = open_repo(repo_dir.path())?; + let (id, object_size) = packed_delta_blob_id(&repo)?; + assert!(object_size >= 16 * 1024 * 1024); + drop(repo); + + let eager_peak = { + let repo = open_repo(repo_dir.path())?; + let (_, peak) = ALLOCATOR.measure(|| { + let blob = repo.find_blob(id).expect("packed delta blob can be decoded eagerly"); + assert_eq!(blob.data.len() as u64, object_size); + }); + peak + }; + + let streaming_peak = { + let repo = open_repo(repo_dir.path())?; + let (_, peak) = ALLOCATOR.measure(|| { + let mut stream = repo.find_blob_stream(id).expect("packed delta blob can be streamed"); + assert_eq!(stream.size(), object_size); + io::copy(&mut stream, &mut io::sink()).expect("streaming copy to sink succeeds"); + }); + peak + }; + + assert!( + streaming_peak < eager_peak, + "streaming should lower peak allocations, got eager={eager_peak} and stream={streaming_peak}" + ); + assert!( + eager_peak.saturating_sub(streaming_peak) >= 8 * 1024 * 1024, + "expected a meaningful peak-memory reduction, got eager={eager_peak} and stream={streaming_peak}" + ); + Ok(()) +} From 36495dbb6e69595800e735c7615fcf23b8930a7c Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 22:57:33 -0500 Subject: [PATCH 3/6] Fix blob stream loose-store test imports Bring the gix_object::Write trait into scope for write_buf() and remove the unused Read import so workspace clippy and test builds pass again. Co-authored-by: Codex --- gix-odb/tests/odb/store/loose.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index af4ef40d0b..35a8d6c75e 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -34,8 +34,9 @@ pub fn locate_oid(id: gix_hash::ObjectId, buf: &mut Vec) -> gix_object::Data } mod stream { - use std::io::{ErrorKind, Read}; + use std::io::ErrorKind; + use gix_object::Write; use gix_odb::loose::Store; #[test] From 20eaed27417c12c80b43ef432ec24f1f5f560bec Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 23:07:56 -0500 Subject: [PATCH 4/6] Relax truncated loose-stream test error kind Accept both UnexpectedEof and InvalidData for truncated loose-object stream corruption, as the stream reader can surface either depending on where zlib detects the truncated deflate payload. Co-authored-by: Codex --- gix-odb/tests/odb/store/loose.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index 35a8d6c75e..f742cc49e1 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -52,7 +52,11 @@ mod stream { let mut stream = store.try_find_stream(id.as_ref())?.expect("object still addressable"); let err = std::io::copy(&mut stream, &mut std::io::sink()).expect_err("truncated streams must fail"); - assert_eq!(err.kind(), ErrorKind::UnexpectedEof); + assert!( + matches!(err.kind(), ErrorKind::UnexpectedEof | ErrorKind::InvalidData), + "expected stream corruption to surface as EOF or invalid data, got {:?}", + err.kind() + ); Ok(()) } } From 41b5fb7a864d8aea2b12cdeba78a0ec3c134f9a4 Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 23:21:24 -0500 Subject: [PATCH 5/6] Fix loose stream truncation test setup --- gix-odb/tests/odb/store/loose.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index f742cc49e1..ec36ed843d 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -46,6 +46,10 @@ mod stream { let id = store.write_buf(gix_object::Kind::Blob, &vec![b'x'; 64 * 1024])?; let path = store.object_path(&id); + let mut permissions = std::fs::metadata(&path)?.permissions(); + permissions.set_readonly(false); + std::fs::set_permissions(&path, permissions)?; + let mut compressed = std::fs::read(&path)?; compressed.truncate(compressed.len() - 1); std::fs::write(&path, compressed)?; From 4b7bf1654997a0df0fe091363b7771b304cc0426 Mon Sep 17 00:00:00 2001 From: Malcom Gilbert Date: Sun, 19 Apr 2026 23:35:20 -0500 Subject: [PATCH 6/6] Fix Unix permissions in loose stream test --- gix-odb/tests/odb/store/loose.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index ec36ed843d..1607da07df 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -46,9 +46,20 @@ mod stream { let id = store.write_buf(gix_object::Kind::Blob, &vec![b'x'; 64 * 1024])?; let path = store.object_path(&id); - let mut permissions = std::fs::metadata(&path)?.permissions(); - permissions.set_readonly(false); - std::fs::set_permissions(&path, permissions)?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + + let mut permissions = std::fs::metadata(&path)?.permissions(); + permissions.set_mode(permissions.mode() | 0o200); + std::fs::set_permissions(&path, permissions)?; + } + #[cfg(not(unix))] + { + let mut permissions = std::fs::metadata(&path)?.permissions(); + permissions.set_readonly(false); + std::fs::set_permissions(&path, permissions)?; + } let mut compressed = std::fs::read(&path)?; compressed.truncate(compressed.len() - 1);