diff --git a/gitoxide-core/src/pack/create.rs b/gitoxide-core/src/pack/create.rs index 7e9ebcb285..b631e0b421 100644 --- a/gitoxide-core/src/pack/create.rs +++ b/gitoxide-core/src/pack/create.rs @@ -2,8 +2,8 @@ use std::{ffi::OsStr, io, path::Path, str::FromStr, time::Instant}; use anyhow::anyhow; use gix::{ - hash, hash::ObjectId, interrupt, objs::bstr::ByteVec, odb::pack, parallel::InOrderIter, prelude::Finalize, - progress, traverse, Count, NestedProgress, Progress, + hash, hash::ObjectId, interrupt, objs::bstr::ByteVec, odb::pack, parallel::InOrderIter, progress, traverse, Count, + NestedProgress, Progress, }; use crate::OutputFormat; @@ -284,7 +284,7 @@ where } else { writeln!(out, "{pack_name}")?; } - stats.entries = in_order_entries.inner.finalize()?; + stats.entries = in_order_entries.inner.finalize_boxed()?; write_progress.show_throughput(start); entries_progress.show_throughput(start); diff --git a/gix-hashtable/src/lib.rs b/gix-hashtable/src/lib.rs index 592729ea68..f24e686fd8 100644 --- a/gix-hashtable/src/lib.rs +++ b/gix-hashtable/src/lib.rs @@ -26,6 +26,7 @@ pub use hashbrown::{hash_map, hash_set, hash_table, Equivalent}; /// thread-safe types pub mod sync { /// A map for associating data with object ids in a thread-safe fashion. It should scale well up to 256 threads. + #[derive(Debug)] pub struct ObjectIdMap { /// Sharing is done by the first byte of the incoming object id. shards: [parking_lot::Mutex>; 256], diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index 1e0026af78..ac898ce333 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -42,7 +42,7 @@ pub enum Error { #[error("Failed to spawn thread when switching to work-stealing mode")] SpawnThread(#[from] std::io::Error), #[error(transparent)] - Delta(#[from] crate::data::delta::apply::Error), + Delta(#[from] crate::data::delta::ApplyError), } /// Additional context passed to the `inspect_object(…)` function of the [`Tree::traverse()`] method. diff --git a/gix-pack/src/data/delta.rs b/gix-pack/src/data/delta.rs index 48f90a9851..e56568a31e 100644 --- a/gix-pack/src/data/delta.rs +++ b/gix-pack/src/data/delta.rs @@ -1,29 +1,42 @@ -/// -pub mod apply { - /// Returned when failing to apply deltas. - #[derive(thiserror::Error, Debug)] - #[allow(missing_docs)] - pub enum Error { - #[error("Corrupt delta data: {message}")] - Corrupt { message: &'static str }, - #[error("Encountered unsupported command code: 0")] - UnsupportedCommandCode, - #[error("Delta copy from base: byte slices must match")] - DeltaCopyBaseSliceMismatch, - #[error("Delta copy data: byte slices must match")] - DeltaCopyDataSliceMismatch, - } +use std::io::Write; + +/// Returned when failing to apply deltas. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum ApplyError { + #[error("Corrupt delta data: {message}")] + Corrupt { message: &'static str }, + #[error("Encountered unsupported command code: 0")] + UnsupportedCommandCode, + #[error("Delta copy from base: byte slices must match")] + DeltaCopyBaseSliceMismatch, + #[error("Delta copy data: byte slices must match")] + DeltaCopyDataSliceMismatch, +} + +/// Returned when failing to encode deltas. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum EncodeError { + #[error("Failed to write bytes: {0}")] + IOError(std::io::Error), + #[error("Too large offset in Copy instruction, should <= 0xffffffff, got {0}")] + TooLargeOffset(usize), + #[error("Too large size in Copy instruction, should <= 0x00ffffff, got {0}")] + TooLargeSize(usize), + #[error("Too large data in Add instruction, length should <= 127, got {0}")] + TooLargeData(usize), } /// Given the decompressed pack delta `d`, decode a size in bytes (either the base object size or the result object size) /// Equivalent to [this canonical git function](https://github.com/git/git/blob/311531c9de557d25ac087c1637818bd2aad6eb3a/delta.h#L89) -pub(crate) fn decode_header_size(d: &[u8]) -> Result<(u64, usize), apply::Error> { +pub(crate) fn decode_header_size(d: &[u8]) -> Result<(u64, usize), ApplyError> { let mut shift = 0; let mut size = 0u64; let mut consumed = 0; for cmd in d.iter() { if shift >= u64::BITS { - return Err(apply::Error::Corrupt { + return Err(ApplyError::Corrupt { message: "delta header size uses more bits than fit into u64", }); } @@ -34,14 +47,14 @@ pub(crate) fn decode_header_size(d: &[u8]) -> Result<(u64, usize), apply::Error> return Ok((size, consumed)); } } - Err(apply::Error::Corrupt { + Err(ApplyError::Corrupt { message: "delta header size is truncated", }) } -pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<(), apply::Error> { - fn next_byte(data: &[u8], i: &mut usize) -> Result { - let byte = *data.get(*i).ok_or(apply::Error::Corrupt { +pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<(), ApplyError> { + fn next_byte(data: &[u8], i: &mut usize) -> Result { + let byte = *data.get(*i).ok_or(ApplyError::Corrupt { message: "delta copy instruction is truncated", })?; *i += 1; @@ -52,6 +65,7 @@ pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<( while let Some(cmd) = data.get(i) { i += 1; match cmd { + // Copy cmd if cmd & 0b1000_0000 != 0 => { let (mut ofs, mut size): (u32, u32) = (0, 0); if cmd & 0b0000_0001 != 0 { @@ -79,33 +93,33 @@ pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<( size = 0x10000; // 65536 } let ofs = ofs as usize; - let end = ofs.checked_add(size as usize).ok_or(apply::Error::Corrupt { + let end = ofs.checked_add(size as usize).ok_or(ApplyError::Corrupt { message: "delta copy range overflows", })?; - std::io::Write::write( + std::io::Write::write_all( &mut target, - base.get(ofs..end).ok_or(apply::Error::Corrupt { + base.get(ofs..end).ok_or(ApplyError::Corrupt { message: "delta copy range exceeds base object size", })?, ) - .map_err(|_e| apply::Error::DeltaCopyBaseSliceMismatch)?; + .map_err(|_e| ApplyError::DeltaCopyBaseSliceMismatch)?; } 0 => { - return Err(apply::Error::Corrupt { + return Err(ApplyError::Corrupt { message: "delta command 0 is reserved and invalid", }) } size => { - let end = i.checked_add(*size as usize).ok_or(apply::Error::Corrupt { + let end = i.checked_add(*size as usize).ok_or(ApplyError::Corrupt { message: "delta insert range overflows", })?; - std::io::Write::write( + std::io::Write::write_all( &mut target, - data.get(i..end).ok_or(apply::Error::Corrupt { + data.get(i..end).ok_or(ApplyError::Corrupt { message: "delta insert data is truncated", })?, ) - .map_err(|_e| apply::Error::DeltaCopyDataSliceMismatch)?; + .map_err(|_e| ApplyError::DeltaCopyDataSliceMismatch)?; i = end; } } @@ -116,10 +130,140 @@ pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<( "delta instructions were not consumed completely, should be impossible" ); if !target.is_empty() { - return Err(apply::Error::Corrupt { + return Err(ApplyError::Corrupt { message: "delta instructions produced fewer bytes than promised", }); } Ok(()) } + +/// Delta instruction +#[derive(Debug)] +pub enum Instruction<'a> { + /// Copy data from source + Copy { + /// Start position to copy + offset: usize, + /// Data length in bytes + size: usize, + }, + /// Insert bytes embedded in instruction + Add { + /// Data to add + data: &'a [u8], + }, +} + +impl Instruction<'_> { + /// Encode instruction to bytes. + pub fn encode(self, mut writer: impl Write) -> Result<(), EncodeError> { + match self { + Self::Copy { offset, mut size } => { + let mut header = 0x80u8; + let mut buf = [0u8; 7]; + let mut n = 0; + + if size == 0x10000 { + size = 0; + } else if size > 0x00ffffff { + return Err(EncodeError::TooLargeSize(size)); + } + if offset > 0xffffffff { + return Err(EncodeError::TooLargeOffset(offset)); + } + + for i in 0..4 { + let byte = (offset >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << i; + buf[n] = byte; + n += 1; + } + } + for i in 0..3 { + let byte = (size >> (i * 8)) as u8; + if byte != 0 { + header |= 1 << (4 + i); + buf[n] = byte; + n += 1; + } + } + + writer.write_all(&[header]).map_err(EncodeError::IOError)?; + writer.write_all(&buf[..n]).map_err(EncodeError::IOError)?; + Ok(()) + } + Self::Add { data } => { + if data.len() > 127 { + return Err(EncodeError::TooLargeData(data.len())); + } + + let header = data.len() as u8; + writer.write_all(&[header]).map_err(EncodeError::IOError)?; + writer.write_all(data).map_err(EncodeError::IOError)?; + Ok(()) + } + } + } +} + +/// Calculate delta instructions from `source` to `target`. +pub fn compute_delta<'a>(source: &[u8], target: &'a [u8]) -> Vec> { + // TODO: more efficient + // TODO: more configurable + let mut common_prefix_len: usize = 0; + for (s, t) in source.iter().zip(target) { + if s == t { + common_prefix_len += 1; + } else { + break; + } + } + + let mut insts = Vec::new(); + if common_prefix_len > 0 { + insts.push(Instruction::Copy { + offset: 0, + size: common_prefix_len, + }); + } + for chunk in target[common_prefix_len..].chunks(127) { + insts.push(Instruction::Add { data: chunk }); + } + insts +} + +#[cfg(test)] +mod tests { + use super::*; + + fn apply_delta<'a>(source: &'a [u8], delta: &Vec>) -> Vec { + let mut buf = Vec::new(); + for inst in delta { + match inst { + Instruction::Add { data } => buf.extend_from_slice(data), + Instruction::Copy { offset, size } => buf.extend_from_slice(&source[*offset..*offset + *size]), + } + } + buf + } + + #[test] + fn make_it_right() { + let source = "hello, world".as_bytes(); + let target = "hello, gitoxide".as_bytes(); + let delta = compute_delta(source, target); + let restored = apply_delta(source, &delta); + assert_eq!(target, restored); + + let mut delta_data = Vec::new(); + for inst in delta { + inst.encode(&mut delta_data).unwrap(); + } + + let mut restored_target = vec![0u8; target.len()]; + apply(source, &mut restored_target, &delta_data).unwrap(); + assert_eq!(target, restored_target); + } +} diff --git a/gix-pack/src/data/file/decode/mod.rs b/gix-pack/src/data/file/decode/mod.rs index 71bbf1595c..6867547523 100644 --- a/gix-pack/src/data/file/decode/mod.rs +++ b/gix-pack/src/data/file/decode/mod.rs @@ -20,7 +20,7 @@ pub enum Error { #[error("Entry too large to fit in memory")] OutOfMemory, #[error(transparent)] - Delta(#[from] crate::data::delta::apply::Error), + Delta(#[from] crate::data::delta::ApplyError), } impl From for Error { diff --git a/gix-pack/src/data/output/bytes.rs b/gix-pack/src/data/output/bytes.rs index c6ee8e5f1e..c89ec4cfb1 100644 --- a/gix-pack/src/data/output/bytes.rs +++ b/gix-pack/src/data/output/bytes.rs @@ -109,7 +109,7 @@ where } self.pack_offsets_and_validity.push((self.written, true)); let header = entry.to_entry_header(self.entry_version, |index| { - let (base_offset, is_valid_object) = self.pack_offsets_and_validity[index]; + let (base_offset, is_valid_object) = self.pack_offsets_and_validity.get(index).expect("objects in pack should be sorted"); if !is_valid_object { unreachable!("if you see this the object database is correct as a delta refers to a non-existing object") } diff --git a/gix-pack/src/data/output/entry/iter_from_counts.rs b/gix-pack/src/data/output/entry/iter_from_counts.rs index 5760bbb22f..40f870d464 100644 --- a/gix-pack/src/data/output/entry/iter_from_counts.rs +++ b/gix-pack/src/data/output/entry/iter_from_counts.rs @@ -1,5 +1,5 @@ pub(crate) mod function { - use std::{cmp::Ordering, sync::Arc}; + use std::sync::Arc; use gix_features::{ parallel, @@ -10,7 +10,7 @@ pub(crate) mod function { }, }; - use super::{reduce, util, Error, Mode, Options, Outcome, ProgressId}; + use super::{reduce, util, Error, Mode, Options, Outcome}; use crate::data::output; /// Given a known list of object `counts`, calculate entries ready to be put into a data pack. @@ -53,8 +53,7 @@ pub(crate) mod function { thread_limit, chunk_size, }: Options, - ) -> impl Iterator), Error>> - + parallel::reduce::Finalize> + ) -> Box where Find: crate::Find + Send + Clone + 'static, { @@ -64,89 +63,215 @@ pub(crate) mod function { ); let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit(chunk_size, Some(counts.len()), thread_limit, None); - { - let progress = Arc::new(parking_lot::Mutex::new( - progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()), - )); - progress.lock().init(None, gix_features::progress::count("counts")); - let enough_counts_present = counts.len() > 4_000; - let start = std::time::Instant::now(); - parallel::in_parallel_if( - || enough_counts_present, - counts.chunks_mut(chunk_size), - thread_limit, - |_n| Vec::::new(), - { - let progress = Arc::clone(&progress); - let db = db.clone(); - move |chunk, buf| { - let chunk_size = chunk.len(); - for count in chunk { - use crate::data::output::count::PackLocation::*; - match count.entry_pack_location { - LookedUp(_) => continue, - NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)), + util::resolve_counts(counts.as_mut_slice(), &db, &mut progress, thread_limit, chunk_size); + match mode { + Mode::PackCopyAndBaseObjects => { + let counts_range_by_pack_id = util::rearrange_counts_by_pack_id(&mut counts, &mut progress); + let sorted_counts = Arc::new(counts); + let progress = Arc::new(parking_lot::Mutex::new(progress)); + let chunks = util::ChunkRanges::new(chunk_size, sorted_counts.len()); + + Box::new(parallel::reduce::Stepwise::new( + chunks.enumerate(), + thread_limit, + { + let progress = Arc::clone(&progress); + move |n| { + ( + Vec::new(), // object data buffer + progress + .lock() + .add_child_with_id(format!("thread {n}"), gix_features::progress::UNKNOWN), + ) + } + }, + { + let sorted_counts = Arc::clone(&sorted_counts); + move |(chunk_id, chunk_range): (SequenceId, std::ops::Range), (buf, progress)| { + let mut out = Vec::new(); + let chunk = &sorted_counts[chunk_range]; + let mut stats = Outcome::default(); + let mut latest_pack_mapping = None; + progress.init(Some(chunk.len()), gix_features::progress::count("objects")); + + for count in chunk.iter() { + out.push(match count + .entry_pack_location + .as_ref() + .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe))) + { + // Existing in a pack + Some((location, pack_entry)) => { + // Unset latest_pack_offsets_to_id if outside the pack range + if let Some((cached_pack_id, _)) = &latest_pack_mapping { + if *cached_pack_id != location.pack_id { + latest_pack_mapping = None; + } + } + + // Params for pack finding + let (base_index_offset, counts_in_pack) = { + let index = counts_range_by_pack_id + .binary_search_by_key(&location.pack_id, |e| e.0) + .expect("pack-id always present"); + let pack_range = counts_range_by_pack_id[index].1.clone(); + (pack_range.start, &sorted_counts[pack_range]) + }; + + // First try to find existing entry in existing packs + if let Some(entry) = output::Entry::from_pack_entry( + pack_entry, + count, + counts_in_pack, + base_index_offset, + allow_thin_pack.then_some({ + |pack_id, base_offset| { + let (cached_pack_id, offsets_oid_mapping) = latest_pack_mapping + .get_or_insert_with(|| { + db.pack_offsets_and_oid(pack_id) + .map(|mut v| { + v.sort_by_key(|e| e.0); + (pack_id, v) + }) + .expect("pack used for counts is still available") + }); + debug_assert_eq!(*cached_pack_id, pack_id); + + stats.ref_delta_objects += 1; + offsets_oid_mapping + .binary_search_by_key(&base_offset, |e| e.0) + .ok() + .map(|idx| offsets_oid_mapping[idx].1) + } + }), + version, + ) { + stats.objects_copied_from_pack += 1; + entry + } + // Fallback to find in Object Database + // TODO: useless decompress then compress here + else if let Some((obj, _location)) = + db.try_find(&count.id, buf).map_err(Error::Find)? + { + stats.decoded_and_recompressed_objects += 1; + output::Entry::from_base(count, &obj) + } + // If both missing, return Entry::invalid + else { + stats.missing_objects += 1; + Ok(output::Entry::invalid()) + } + } + // Existing as a loose object + None => match db.try_find(&count.id, buf).map_err(Error::Find)? { + Some((obj, _location)) => { + stats.decoded_and_recompressed_objects += 1; + output::Entry::from_base(count, &obj) + } + None => { + stats.missing_objects += 1; + Ok(output::Entry::invalid()) + } + }, + }?); + progress.inc(); } + Ok((chunk_id, out, stats)) } - progress.lock().inc_by(chunk_size); - Ok::<_, ()>(()) - } - }, - parallel::reduce::IdentityWithResult::<(), ()>::default(), - ) - .expect("infallible - we ignore none-existing objects"); - progress.lock().show_throughput(start); + }, + reduce::Statistics::default(), + )) + } + Mode::Customized => unimplemented!("should handle customized mode in other function"), } - let counts_range_by_pack_id = match mode { - Mode::PackCopyAndBaseObjects => { - let mut progress = progress.add_child_with_id("sorting".into(), ProgressId::SortEntries.into()); - progress.init(Some(counts.len()), gix_features::progress::count("counts")); - let start = std::time::Instant::now(); - - use crate::data::output::count::PackLocation::*; - counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) { - (LookedUp(None), LookedUp(None)) => Ordering::Equal, - (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater, - (LookedUp(None), LookedUp(Some(_))) => Ordering::Less, - (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs - .pack_id - .cmp(&rhs.pack_id) - .then(lhs.pack_offset.cmp(&rhs.pack_offset)), - (_, _) => unreachable!("counts were resolved beforehand"), - }); - - let mut index: Vec<(u32, std::ops::Range)> = Vec::new(); - let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none()); - let mut slice = &counts[chunks_pack_start..]; - while !slice.is_empty() { - let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id; - let pack_end = slice.partition_point(|e| { - e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id - }); - index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end)); - slice = &slice[pack_end..]; - chunks_pack_start += pack_end; - } + } +} - progress.set(counts.len()); - progress.show_throughput(start); +/// Customized handler for counts. +pub mod customized { + use std::{cmp::Ordering, sync::Arc}; - index - } - }; + use gix_features::{ + parallel, + parallel::SequenceId, + progress::{ + prodash::{Count, DynNestedProgress}, + Progress, + }, + }; + use gix_hash::ObjectId; + + use super::{reduce, util, Error, Options, Outcome}; + use crate::data::output; + + type Topo = std::collections::HashMap; + + /// Like [`super::function::iter_from_counts`], but can determine + /// whether an object is a base or a delta based on topological relationships. + /// + /// Key object refers to delta target, value object refers to delta source. + /// Treat objects missing in keys as base objects. + /// + /// If the required delta does not exist, it will be computed. + #[cfg(feature = "pack-cache-lru-dynamic")] + pub fn iter_from_counts_with_topo( + mut counts: Vec, + db: Find, + progress: Box, + topo: Topo, + cache_capacity: usize, + Options { + version, + mode, + allow_thin_pack, + thread_limit, + chunk_size, + }: Options, + ) -> Box + where + Find: crate::Find + Send + Clone + 'static, + { + if allow_thin_pack { + todo!("support allow_thin_pack"); + } + + assert!( + matches!(mode, super::types::Mode::Customized), + "mode except Customized should be handled by other function" + ); - let counts = Arc::new(counts); + let sorted_counts = { + topo_sort(counts.as_mut_slice(), &topo).expect("no loop in delta topo"); + Arc::new(counts) + }; let progress = Arc::new(parking_lot::Mutex::new(progress)); - let chunks = util::ChunkRanges::new(chunk_size, counts.len()); + let chunks = util::ChunkRanges::new(chunk_size, sorted_counts.len()); - parallel::reduce::Stepwise::new( + // Cache decompressed data for Find::try_find_cached + let object_cache = Arc::new(std::sync::Mutex::new(crate::cache::lru::MemoryCappedHashmap::new( + cache_capacity, + ))); // TODO: use parking_lot::Mutex + let oid_index_mapping = Arc::new( + sorted_counts + .iter() + .enumerate() + .map(|(index, count)| (count.id, index)) + .collect::>(), + ); // TODO: rearrange delta solving order or lru to avoid cache peak + Box::new(parallel::reduce::Stepwise::new( chunks.enumerate(), thread_limit, { let progress = Arc::clone(&progress); move |n| { ( - Vec::new(), // object data buffer + // Cache entries object ID and offset for packs + std::collections::HashMap::>::new(), + // buffer object data for target + Vec::new(), + // buffer object data for source + Vec::new(), progress .lock() .add_child_with_id(format!("thread {n}"), gix_features::progress::UNKNOWN), @@ -154,93 +279,201 @@ pub(crate) mod function { } }, { - let counts = Arc::clone(&counts); - move |(chunk_id, chunk_range): (SequenceId, std::ops::Range), (buf, progress)| { + let sorted_counts = Arc::clone(&sorted_counts); + let oid_index_mapping = Arc::clone(&oid_index_mapping); + let cache = Arc::clone(&object_cache); + move |(chunk_id, chunk_range): (SequenceId, std::ops::Range), + (pack_index_cache, buf_t, buf_s, progress)| { let mut out = Vec::new(); - let chunk = &counts[chunk_range]; + let chunk = &sorted_counts[chunk_range]; let mut stats = Outcome::default(); - let mut pack_offsets_to_id = None; progress.init(Some(chunk.len()), gix_features::progress::count("objects")); + // TODO: refactor needed for count in chunk.iter() { - out.push(match count - .entry_pack_location - .as_ref() - .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe))) - { - Some((location, pack_entry)) => { - if let Some((cached_pack_id, _)) = &pack_offsets_to_id { - if *cached_pack_id != location.pack_id { - pack_offsets_to_id = None; - } - } - let pack_range = counts_range_by_pack_id[counts_range_by_pack_id - .binary_search_by_key(&location.pack_id, |e| e.0) - .expect("pack-id always present")] - .1 - .clone(); - let base_index_offset = pack_range.start; - let counts_in_pack = &counts[pack_range]; - let entry = output::Entry::from_pack_entry( - pack_entry, + let oid = count.id; + let db_find_cached = |oid, buf| { + db.try_find_cached( + oid, + buf, + &mut *cache.lock().expect("other thread should not panic on cache"), + ) + .map_err(Error::Find) + }; + let entry = if let Some(source_oid) = topo.get(&oid) { + let mut find_existing_delta = || -> Option<_> { + let (_location, pack_entry) = count + .entry_pack_location + .as_ref() + .and_then(|l| db.entry_by_location(l).map(|pe| (l, pe)))?; + let delta = find_delta( count, - counts_in_pack, - base_index_offset, - allow_thin_pack.then_some({ - |pack_id, base_offset| { - let (cached_pack_id, cache) = pack_offsets_to_id.get_or_insert_with(|| { + &pack_entry, + source_oid, + |pack_id, base_offset| { + let offsets_oid_mapping = + pack_index_cache.entry(pack_id).or_insert_with(|| { db.pack_offsets_and_oid(pack_id) .map(|mut v| { v.sort_by_key(|e| e.0); - (pack_id, v) + v }) .expect("pack used for counts is still available") }); - debug_assert_eq!(*cached_pack_id, pack_id); - stats.ref_delta_objects += 1; - cache - .binary_search_by_key(&base_offset, |e| e.0) - .ok() - .map(|idx| cache[idx].1) - } - }), - version, - ); - match entry { - Some(entry) => { - stats.objects_copied_from_pack += 1; - entry - } - None => match db.try_find(&count.id, buf).map_err(Error::Find)? { - Some((obj, _location)) => { - stats.decoded_and_recompressed_objects += 1; - output::Entry::from_data(count, &obj) - } - None => { - stats.missing_objects += 1; - Ok(output::Entry::invalid()) - } + offsets_oid_mapping + .binary_search_by_key(&base_offset, |e| e.0) + .ok() + .map(|idx| offsets_oid_mapping[idx].1) }, - } + version, + )?; + Some(output::Entry::from_delta_ref( + count, + delta, + *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack + )) + }; + // Find existing delta + if let Some(entry) = find_existing_delta() { + stats.objects_copied_from_pack += 1; + entry } - None => match db.try_find(&count.id, buf).map_err(Error::Find)? { - Some((obj, _location)) => { - stats.decoded_and_recompressed_objects += 1; - output::Entry::from_data(count, &obj) - } - None => { - stats.missing_objects += 1; + // Build delta + else if let Some((target, _)) = db_find_cached(&oid, buf_t)? { + if let Some((source, _)) = db_find_cached(source_oid, buf_s)? { + let delta_insts = crate::data::delta::compute_delta(source.data, target.data); + let mut delta_data_buf = Vec::new(); + for inst in delta_insts { + // Panic here because delta algorithm is incorrect, should fast fail + inst.encode(&mut delta_data_buf) + .expect("delta instruction should valid"); + } + output::Entry::from_delta_ref( + count, + delta_data_buf.as_slice(), + *oid_index_mapping + .get(source_oid) + .expect("all target and source objects should in ONE pack"), // TODO: allow ref delta in thin pack + ) + } else { Ok(output::Entry::invalid()) } - }, - }?); + } else { + Ok(output::Entry::invalid()) + } + } else if let Some((data, _)) = db_find_cached(&oid, buf_t)? { + output::Entry::from_base(count, &data) + } else { + Ok(output::Entry::invalid()) + }?; + out.push(entry); progress.inc(); } Ok((chunk_id, out, stats)) } }, reduce::Statistics::default(), - ) + )) + } + + /// Topological sort `counts` in place, parents first. + /// If there is a loop, returns Err(usize), meaning how many ObjectID are in loops indicated in the `to_parent`. + fn topo_sort( + counts: &mut [output::Count], + to_parent: &std::collections::HashMap, + ) -> Result<(), usize> { + // firstly sort `vertexes` as children first, then reverse `vertexex` + use std::collections::HashMap; + + type CountIndex = usize; + + let n = counts.len(); + if n == 0 { + return Ok(()); + } + + let oid_to_idx: HashMap = counts + .iter() + .enumerate() + .map(|(idx, c)| (c.id.to_owned(), idx)) + .collect(); + + let mut idx_to_child_count: HashMap = (0..n).map(|c| (c, 0)).collect(); + for (child, parent) in to_parent { + let child = oid_to_idx.get(child).unwrap(); + let parent = oid_to_idx.get(parent).unwrap(); + if idx_to_child_count.contains_key(child) { + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count += 1; + } + } + } + + // leaf vertex collection + let mut stack: Vec = idx_to_child_count + .iter() + .filter_map(|(&c, count)| (*count == 0).then_some(c)) + .collect(); + + let mut sorted = Vec::with_capacity(n); + while let Some(curr) = stack.pop() { + if let Some(parent) = to_parent.get(&counts[curr].id) { + let parent = oid_to_idx.get(parent).unwrap(); + if let Some(count) = idx_to_child_count.get_mut(parent) { + *count -= 1; + if *count == 0 { + stack.push(*parent); + } + } + } + sorted.push(curr); + } + + match sorted.len().cmp(&n) { + Ordering::Less => Err(n - sorted.len()), + Ordering::Equal => { + sorted.reverse(); + super::util::apply_permutation(counts, &sorted); + Ok(()) + } + Ordering::Greater => { + unreachable!("sorted counts should less or equal than all counts") + } + } + } + + fn find_delta<'a>( + count: &output::Count, + entry: &'a crate::find::Entry, + source_oid: &ObjectId, + mut pack_offset_to_oid: impl FnMut(u32, u64) -> Option, + target_version: crate::data::Version, + ) -> Option<&'a [u8]> { + if entry.version != target_version { + return None; + } + + let pack_offset_must_be_zero = 0; + let pack_entry = + crate::data::Entry::from_bytes(&entry.data, pack_offset_must_be_zero, count.id.as_slice().len()).ok()?; + + use crate::data::entry::Header::*; + match pack_entry.header { + OfsDelta { base_distance } => { + let pack_location = count.entry_pack_location.as_ref().expect("packed"); + let base_offset = pack_location + .pack_offset + .checked_sub(base_distance) + .expect("pack-offset - distance is firmly within the pack"); + pack_offset_to_oid(pack_location.pack_id, base_offset) + } + RefDelta { base_id } => Some(base_id), + _ => None, + } + .filter(|id| id == source_oid) + .map(|_| &entry.data[pack_entry.data_offset as usize..]) } } @@ -276,6 +509,119 @@ mod util { } } } + + pub fn apply_permutation(data: &mut [T], indices: &[usize]) { + let n = data.len(); + + // inverse transformation: indices[i] = j => indices[j] = i + let mut inv = vec![0; n]; + for (i, &j) in indices.iter().enumerate() { + inv[j] = i; + } + + for i in 0..n { + while inv[i] != i { + let target = inv[i]; + data.swap(i, target); + inv.swap(i, target); + } + } + } + + pub fn resolve_counts( + counts: &mut [crate::data::output::Count], + db: &Find, + progress: &mut Box, + thread_limit: Option, + chunk_size: usize, + ) where + Find: crate::Find + Send + Clone + 'static, + { + use std::sync::Arc; + + use gix_features::{ + parallel, + progress::{Count, Progress}, + }; + + use super::ProgressId; + + let progress = Arc::new(parking_lot::Mutex::new( + progress.add_child_with_id("resolving".into(), ProgressId::ResolveCounts.into()), + )); + progress.lock().init(None, gix_features::progress::count("counts")); + let enough_counts_present = counts.len() > 4_000; + let start = std::time::Instant::now(); + parallel::in_parallel_if( + || enough_counts_present, + counts.chunks_mut(chunk_size), + thread_limit, + |_n| Vec::::new(), + { + let progress = Arc::clone(&progress); + let db = db.clone(); + move |chunk, buf| { + let chunk_size = chunk.len(); + for count in chunk { + use crate::data::output::count::PackLocation::*; + match count.entry_pack_location { + LookedUp(_) => continue, + NotLookedUp => count.entry_pack_location = LookedUp(db.location_by_oid(&count.id, buf)), + } + } + progress.lock().inc_by(chunk_size); + Ok::<_, ()>(()) + } + }, + parallel::reduce::IdentityWithResult::<(), ()>::default(), + ) + .expect("infallible - we ignore none-existing objects"); + progress.lock().show_throughput(start); + } + + pub fn rearrange_counts_by_pack_id( + counts: &mut [crate::data::output::Count], + progress: &mut Box, + ) -> Vec<(u32, std::ops::Range)> { + use std::cmp::Ordering; + + use gix_features::progress::{Count, Progress}; + + use super::ProgressId; + + let mut progress = progress.add_child_with_id("sorting".into(), ProgressId::SortEntries.into()); + progress.init(Some(counts.len()), gix_features::progress::count("counts")); + let start = std::time::Instant::now(); + + use crate::data::output::count::PackLocation::*; + counts.sort_by(|lhs, rhs| match (&lhs.entry_pack_location, &rhs.entry_pack_location) { + (LookedUp(None), LookedUp(None)) => Ordering::Equal, + (LookedUp(Some(_)), LookedUp(None)) => Ordering::Greater, + (LookedUp(None), LookedUp(Some(_))) => Ordering::Less, + (LookedUp(Some(lhs)), LookedUp(Some(rhs))) => lhs + .pack_id + .cmp(&rhs.pack_id) + .then(lhs.pack_offset.cmp(&rhs.pack_offset)), + (_, _) => unreachable!("counts were resolved beforehand"), + }); + + let mut index: Vec<(u32, std::ops::Range)> = Vec::new(); + let mut chunks_pack_start = counts.partition_point(|e| e.entry_pack_location.is_none()); + let mut slice = &counts[chunks_pack_start..]; + while !slice.is_empty() { + let current_pack_id = slice[0].entry_pack_location.as_ref().expect("packed object").pack_id; + let pack_end = slice + .partition_point(|e| e.entry_pack_location.as_ref().expect("packed object").pack_id == current_pack_id); + index.push((current_pack_id, chunks_pack_start..chunks_pack_start + pack_end)); + slice = &slice[pack_end..]; + chunks_pack_start += pack_end; + } + + progress.set(counts.len()); + progress.show_throughput(start); + + index + } } mod reduce { @@ -363,6 +709,8 @@ mod types { /// from existing pack compression and spending the smallest possible time on compressing unpacked objects at /// the cost of bandwidth. PackCopyAndBaseObjects, + /// Other customized process for counts. + Customized, } /// Configuration options for the pack generation functions provided in [`iter_from_counts()`][crate::data::output::entry::iter_from_counts()]. @@ -428,5 +776,23 @@ mod types { } } } + + type Item = Result<(gix_features::parallel::SequenceId, Vec), Error>; + type Stats = super::reduce::Statistics; + type StatsOutput = ::Output; + type StatsError = ::Error; + + pub trait DynFinalizeIterator: Iterator { + fn finalize_boxed(self: Box) -> Result; + } + + impl DynFinalizeIterator for T + where + T: Iterator + gix_features::parallel::reduce::Finalize, + { + fn finalize_boxed(self: Box) -> Result { + self.finalize() + } + } } pub use types::{Error, Mode, Options, Outcome, ProgressId}; diff --git a/gix-pack/src/data/output/entry/mod.rs b/gix-pack/src/data/output/entry/mod.rs index 18d20dd2c0..fb066ad1c2 100644 --- a/gix-pack/src/data/output/entry/mod.rs +++ b/gix-pack/src/data/output/entry/mod.rs @@ -31,7 +31,7 @@ pub enum Kind { }, } -/// The error returned by [`output::Entry::from_data()`]. +/// The error returned by [`output::Entry::from_base()`]. #[allow(missing_docs)] #[derive(Debug, thiserror::Error)] pub enum Error { @@ -61,6 +61,7 @@ impl output::Entry { } /// Create an Entry from a previously counted object which is located in a pack. It's `entry` is provided here. + /// `potential_bases` should be sorted by `Count.entry_pack_location.pack_offset`. /// The `version` specifies what kind of target `Entry` version the caller desires. pub fn from_pack_entry( mut entry: find::Entry, @@ -131,8 +132,8 @@ impl output::Entry { }) } - /// Create a new instance from the given `oid` and its corresponding git object data `obj`. - pub fn from_data(count: &output::Count, obj: &gix_object::Data<'_>) -> Result { + /// Create a new instance with type Base from the given `oid` and its corresponding git object data `obj`. + pub fn from_base(count: &output::Count, obj: &gix_object::Data<'_>) -> Result { Ok(output::Entry { id: count.id.to_owned(), kind: Kind::Base(obj.kind), @@ -151,6 +152,28 @@ impl output::Entry { }) } + /// Like [`output::Entry::from_base()`], but with type OfsDelta. + /// `delta_data` is encoded instructions. Header with encoded size and will be encoded by `output::Entry::to_entry_header` + /// `object_index` is the absolute index to the object. + pub fn from_delta_ref(count: &output::Count, delta_data: &[u8], object_index: usize) -> Result { + Ok(output::Entry { + id: count.id.to_owned(), + kind: Kind::DeltaRef { object_index }, + decompressed_size: delta_data.len(), + compressed_data: { + let mut out = gix_features::zlib::stream::deflate::Write::new(Vec::new()); + if let Err(err) = std::io::copy(&mut &*delta_data, &mut out) { + match err.kind() { + std::io::ErrorKind::Other => return Err(Error::ZlibDeflate(err)), + err => unreachable!("Should never see other errors than zlib, but got {:?}", err), + } + } + out.flush()?; + out.into_inner() + }, + }) + } + /// Transform ourselves into pack entry header of `version` which can be written into a pack. /// /// `index_to_pack(object_index) -> pack_offset` is a function to convert the base object's index into diff --git a/gix-pack/tests/pack/data/output/count_and_entries.rs b/gix-pack/tests/pack/data/output/count_and_entries.rs index af27e468e9..1b4cccc0c8 100644 --- a/gix-pack/tests/pack/data/output/count_and_entries.rs +++ b/gix-pack/tests/pack/data/output/count_and_entries.rs @@ -1,9 +1,6 @@ use std::sync::atomic::AtomicBool; -use gix_features::{ - parallel::{reduce::Finalize, InOrderIter}, - progress, -}; +use gix_features::{parallel::InOrderIter, progress}; use gix_odb::{pack, pack::FindExt}; use gix_pack::data::{ output, @@ -304,7 +301,7 @@ fn traversals() -> crate::Result { }); assert_eq!(actual_count, expected_count); assert_eq!(counts_len, expected_count.total()); - let stats = entries_iter.finalize()?; + let stats = entries_iter.finalize_boxed()?; assert_eq!(stats, expected_entries_outcome); assert_eq!( @@ -422,3 +419,194 @@ fn write_and_verify( Ok(()) } + +#[cfg(feature = "all-features")] +#[test] +fn customized_delta_topo() -> crate::Result { + use gix_pack::data::output::entry::iter_from_counts::{Mode, Options}; + + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] + struct Count { + trees: usize, + commits: usize, + blobs: usize, + tags: usize, + delta_ref: usize, + delta_oid: usize, + } + impl Count { + fn add(&mut self, kind: output::entry::Kind) { + use gix_object::Kind::*; + use output::entry::Kind::*; + match kind { + Base(Tree) => self.trees += 1, + Base(Commit) => self.commits += 1, + Base(Blob) => self.blobs += 1, + Base(Tag) => self.tags += 1, + DeltaRef { .. } => self.delta_ref += 1, + DeltaOid { .. } => self.delta_oid += 1, + } + } + } + + for db_kind in [ + DbKind::DeterministicGeneratedContent, + DbKind::DeterministicGeneratedContentMultiIndex, + ] { + let db = db(db_kind)?; + + // Get objects for testing + // TODO: delta chain may not stable + let objects: Vec<_> = vec![ + hex_to_id("a63e479f22985d08b5debd6567e15999123d25a4"), // base + hex_to_id("d1ff3f36411c6eead64400062a7c8e30886b94ff"), // delta @ 1 + hex_to_id("37fbc9660088c6afad4b48169e80fe59670190d1"), // delta @ 2 + hex_to_id("dc2da8bbf4d82a654b35a2a43c0d714d4d7afbf9"), // delta @ 3 + ]; + + // Count objects + let (counts, _) = output::count::objects( + db.clone(), + Box::new(objects.clone().into_iter().map(Ok)), + &progress::Discard, + &AtomicBool::new(false), + count::objects::Options { + input_object_expansion: count::objects::ObjectExpansion::AsIs, + thread_limit: Some(1), + ..Default::default() + }, + )?; + + // Empty topo: every object is base + { + let topo = std::collections::HashMap::new(); + + let mut entries_iter = output::entry::iter_from_counts::customized::iter_from_counts_with_topo( + counts, + db.clone(), + Box::new(progress::Discard), + topo, + 1024 * 1024, // 1MB cache + Options { + mode: Mode::Customized, + ..Default::default() + }, + ); + + let entries: Vec<_> = InOrderIter::from(entries_iter.by_ref()) + .collect::, _>>()? + .into_iter() + .flatten() + .collect(); + + let actual_count = entries.iter().fold(Count::default(), |mut c, e| { + c.add(e.kind); + c + }); + + // All should be base objects since topo is empty + assert!(actual_count.delta_ref == 0 && actual_count.delta_oid == 0); + } + + // Test with non-empty topo + { + let objects = objects.to_owned(); + let topo_with_deltas = { + let mut m = std::collections::HashMap::new(); + m.insert(objects[3], objects[2]); + m.insert(objects[1], objects[2]); + m + }; + + let (counts2, _) = output::count::objects( + db.clone(), + Box::new(objects.to_owned().into_iter().map(Ok)), + &progress::Discard, + &AtomicBool::new(false), + count::objects::Options { + input_object_expansion: count::objects::ObjectExpansion::AsIs, + thread_limit: Some(1), + ..Default::default() + }, + )?; + + // Test reuse delta + { + let entries_iter2 = output::entry::iter_from_counts::customized::iter_from_counts_with_topo( + counts2.clone(), + db.clone(), + Box::new(progress::Discard), + topo_with_deltas.to_owned(), + 1024 * 1024, + Options { + mode: Mode::Customized, + ..Default::default() + }, + ); + let stat = entries_iter2.finalize_boxed().unwrap(); + assert_eq!(stat.objects_copied_from_pack, 1); + } + + let mut entries_iter2 = output::entry::iter_from_counts::customized::iter_from_counts_with_topo( + counts2.clone(), + db.clone(), + Box::new(progress::Discard), + topo_with_deltas, + 1024 * 1024, + Options { + mode: Mode::Customized, + ..Default::default() + }, + ); + + let entries2: Vec<_> = InOrderIter::from(entries_iter2.by_ref()) + .collect::, _>>()? + .into_iter() + .flatten() + .collect(); + + assert_eq!(entries2.len(), counts2.len(), "length of input and output should equal"); + let delta_oids = std::collections::HashSet::from([objects[1], objects[3]]); + for entry in entries2.iter() { + if delta_oids.contains(&entry.id) { + assert!(matches!(entry.kind, entry::Kind::DeltaRef { .. })); + } else { + assert!(matches!(entry.kind, entry::Kind::Base(..))); + } + } + + // Directly write to a pack file + let tmp_dir = gix_testtools::tempfile::TempDir::new()?; + let pack_file_path = tmp_dir.path().join("new.pack"); + let mut pack_file = std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&pack_file_path)?; + let (_num_written_bytes, _pack_hash) = { + let num_entries = entries2.len(); + let mut pack_writer = output::bytes::FromEntriesIter::new( + std::iter::once(Ok::<_, entry::iter_from_counts::Error>(entries2)), + &mut pack_file, + num_entries as u32, + pack::data::Version::V2, + gix_hash::Kind::Sha1, + ); + let mut n = pack_writer.next().expect("one entries bundle was written")?; + n += pack_writer.next().expect("the trailer was written")?; + assert!( + pack_writer.next().is_none(), + "there is nothing more to iterate this time" + ); + // verify we can still get the original parts back + let hash = pack_writer.digest().expect("digest is available when iterator is done"); + let _ = pack_writer.input; + let _ = pack_writer.into_write(); + (n, hash) + }; + + // TODO: parse pack file + } + } + + Ok(()) +}