From 7c66defb1738f25f42868f0011d92b2dbb4562dc Mon Sep 17 00:00:00 2001 From: jayendra13 Date: Wed, 4 Feb 2026 07:17:06 +0530 Subject: [PATCH 1/2] Consolidate schema inference with build_schema_from_store_meta (Phase 3) Extract the duplicated schema building logic from three functions into a single `build_schema_from_store_meta()` helper: - infer_schema_from_zmetadata_json - infer_schema_with_meta - infer_schema_with_meta_async All three had identical logic for: 1. Building Dictionary-encoded coordinate fields (with CF time support) 2. Building regular array fields for data variables The helper centralizes this logic, making future schema changes easier to maintain consistently. Impact: -50 lines (57 additions, 107 deletions) Part of: https://github.com/jayendra13/zarr-datafusion/issues/5 --- src/reader/schema_inference.rs | 164 ++++++++++++--------------------- 1 file changed, 57 insertions(+), 107 deletions(-) diff --git a/src/reader/schema_inference.rs b/src/reader/schema_inference.rs index 92754e4..338bb61 100644 --- a/src/reader/schema_inference.rs +++ b/src/reader/schema_inference.rs @@ -44,6 +44,56 @@ pub enum ZarrVersion { V3, } +// ============================================================================= +// Schema building helper +// ============================================================================= + +/// Build an Arrow schema from Zarr store metadata. +/// +/// This consolidates the identical schema building logic used in: +/// - `infer_schema_from_zmetadata_json` +/// - `infer_schema_with_meta` +/// - `infer_schema_with_meta_async` +/// +/// Coordinates use Dictionary encoding for memory efficiency. +/// CF time coordinates use Dictionary with Timestamp(Microsecond, UTC) values. +pub fn build_schema_from_store_meta(meta: &ZarrStoreMeta) -> Schema { + let mut fields: Vec = Vec::new(); + + // Coordinates use Dictionary encoding for memory efficiency + // CF time coordinates use Dictionary with Timestamp values + for coord in &meta.coords { + let data_type = if coord + .cf_time_attrs + .as_ref() + .is_some_and(|a| a.is_time_coordinate()) + { + // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values + DataType::Dictionary( + Box::new(DataType::Int16), + Box::new(DataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".into()), + )), + ) + } else { + zarr_dtype_to_arrow_dictionary(&coord.data_type) + }; + fields.push(Field::new(&coord.name, data_type, false)); + } + + // Data variables use regular arrays + for var in &meta.data_vars { + fields.push(Field::new( + &var.name, + zarr_dtype_to_arrow(&var.data_type), + true, + )); + } + + Schema::new(fields) +} + /// Detect Zarr version by checking metadata files pub fn detect_zarr_version( store_path: &str, @@ -146,45 +196,9 @@ pub fn discover_arrays( pub fn infer_schema_from_zmetadata_json( metadata: &serde_json::Value, ) -> Result<(Schema, ZarrStoreMeta), Box> { - use arrow::datatypes::{DataType, Field, TimeUnit}; - let meta = discover_arrays_from_json(metadata)?.ok_or("No arrays found in .zmetadata JSON")?; - - // Build schema from meta (same logic as infer_schema_with_meta) - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } - - Ok((Schema::new(fields), meta)) + let schema = build_schema_from_store_meta(&meta); + Ok((schema, meta)) } /// Discover arrays from a pre-loaded .zmetadata JSON value @@ -727,44 +741,12 @@ pub fn infer_schema_with_meta( store_path: &str, ) -> Result<(Schema, ZarrStoreMeta), Box> { let meta = discover_arrays(store_path)?; - - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } + let schema = build_schema_from_store_meta(&meta); // Note: Schema metadata causes issues with DataFusion's optimizer schema comparisons. // Instead of storing metadata in the schema, we return ZarrStoreMeta which contains // all dimension info. The CLI can access this via the ZarrTable struct. - Ok((Schema::new(fields), meta)) + Ok((schema, meta)) } /// Parse CF time attributes from a JSON attributes object @@ -1316,45 +1298,13 @@ pub async fn infer_schema_with_meta_async( ) -> Result<(Schema, ZarrStoreMeta), Box> { debug!("Starting async schema inference"); let meta = discover_arrays_async(store, prefix).await?; - - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } + let schema = build_schema_from_store_meta(&meta); // Note: Schema metadata causes issues with DataFusion's optimizer schema comparisons. // Instead of storing metadata in the schema, we return ZarrStoreMeta which contains // all dimension info. The CLI can access this via the ZarrTable struct. - info!(num_fields = fields.len(), "Schema inferred"); - Ok((Schema::new(fields), meta)) + info!(num_fields = schema.fields().len(), "Schema inferred"); + Ok((schema, meta)) } #[cfg(test)] From d9a0d48be135ae962994531e6aab8103257d7aa0 Mon Sep 17 00:00:00 2001 From: jayendra13 Date: Wed, 4 Feb 2026 07:27:39 +0530 Subject: [PATCH 2/2] Unify descending range search functions in filter.rs (Phase 4) Consolidate the duplicated descending range search functions into unified helpers: - Add DescendingBoundType enum to parameterize FirstLeq vs FirstLt - Add compact_descending_bound() to unify compact_first_leq_descending and compact_first_lt_descending - Add find_descending_bound() to unify find_first_leq_descending and find_first_lt_descending - Add descending_partition_point() helper for explicit value arrays The original functions (find_first_leq_descending, find_first_lt_descending) are kept as thin wrappers for backward compatibility. Impact: +7 lines net, but significantly improved maintainability through consolidation of repeated match arms and comparison logic. Part of: https://github.com/jayendra13/zarr-datafusion/issues/5 --- src/reader/filter.rs | 289 ++++++++++++++++++++++--------------------- 1 file changed, 148 insertions(+), 141 deletions(-) diff --git a/src/reader/filter.rs b/src/reader/filter.rs index 38fd652..1bfaaa8 100644 --- a/src/reader/filter.rs +++ b/src/reader/filter.rs @@ -774,90 +774,115 @@ fn compact_upper_bound(encoding: &CompactCoord, target_f64: f64) -> usize { } } -/// For compact descending: find first index where value <= target (or < if not inclusive) -fn compact_first_leq_descending( +// ============================================================================= +// Descending range search helpers (unified to reduce duplication) +// ============================================================================= + +/// Type of bound for descending range search. +/// +/// Used to parameterize the unified descending search functions. +#[derive(Debug, Clone, Copy, PartialEq)] +enum DescendingBoundType { + /// Find first index where value <= target (start of range) + FirstLeq, + /// Find first index where value < target (end of range) + FirstLt, +} + +/// Unified function for compact descending range search. +/// +/// Consolidates `compact_first_leq_descending` and `compact_first_lt_descending`. +fn compact_descending_bound( encoding: &CompactCoord, target_f64: f64, + bound_type: DescendingBoundType, inclusive: bool, ) -> usize { - // Descending means step < 0, values go from high to low - // We want first index where value <= target match encoding { CompactCoord::ArithmeticInt { first, step, len } => { if *step == 0 { let first_f64 = *first as f64; - if inclusive { - return if first_f64 <= target_f64 { 0 } else { *len }; - } else { - return if first_f64 < target_f64 { 0 } else { *len }; - } - } - // For descending (step < 0): value[i] = first + i * step - // Want first i where first + i*step <= target (or < target) - // i >= (first - target) / (-step) - let step_f64 = *step as f64; - let first_f64 = *first as f64; - let idx = if inclusive { - ((first_f64 - target_f64) / (-step_f64)).ceil() as i64 - } else { - ((first_f64 - target_f64) / (-step_f64) + f64::EPSILON).ceil() as i64 - }; - idx.clamp(0, *len as i64) as usize - } - CompactCoord::Arithmetic { first, step, len } => { - if step.abs() < f64::EPSILON { - if inclusive { - return if *first <= target_f64 { 0 } else { *len }; - } else { - return if *first < target_f64 { 0 } else { *len }; - } - } - let idx = if inclusive { - ((first - target_f64) / (-step)).ceil() as i64 - } else { - ((first - target_f64) / (-step) + f64::EPSILON).ceil() as i64 - }; - idx.clamp(0, *len as i64) as usize - } - } -} - -/// For compact descending: find first index where value < target (or <= if not inclusive) -fn compact_first_lt_descending(encoding: &CompactCoord, target_f64: f64, inclusive: bool) -> usize { - // For descending, find first index where value < target (exclusive end) - match encoding { - CompactCoord::ArithmeticInt { first, step, len } => { - if *step == 0 { - let first_f64 = *first as f64; - if inclusive { - return if first_f64 >= target_f64 { *len } else { 0 }; - } else { - return if first_f64 > target_f64 { *len } else { 0 }; - } + return match (bound_type, inclusive) { + (DescendingBoundType::FirstLeq, true) => { + if first_f64 <= target_f64 { + 0 + } else { + *len + } + } + (DescendingBoundType::FirstLeq, false) => { + if first_f64 < target_f64 { + 0 + } else { + *len + } + } + (DescendingBoundType::FirstLt, true) => { + if first_f64 >= target_f64 { + *len + } else { + 0 + } + } + (DescendingBoundType::FirstLt, false) => { + if first_f64 > target_f64 { + *len + } else { + 0 + } + } + }; } let step_f64 = *step as f64; let first_f64 = *first as f64; - // For descending: want first i where first + i*step < target - // i > (first - target) / (-step) - let idx = if inclusive { - ((first_f64 - target_f64) / (-step_f64)).floor() as i64 + 1 - } else { - ((first_f64 - target_f64) / (-step_f64)).ceil() as i64 + let diff = (first_f64 - target_f64) / (-step_f64); + let idx = match (bound_type, inclusive) { + (DescendingBoundType::FirstLeq, true) => diff.ceil() as i64, + (DescendingBoundType::FirstLeq, false) => (diff + f64::EPSILON).ceil() as i64, + (DescendingBoundType::FirstLt, true) => diff.floor() as i64 + 1, + (DescendingBoundType::FirstLt, false) => diff.ceil() as i64, }; idx.clamp(0, *len as i64) as usize } CompactCoord::Arithmetic { first, step, len } => { if step.abs() < f64::EPSILON { - if inclusive { - return if *first >= target_f64 { *len } else { 0 }; - } else { - return if *first > target_f64 { *len } else { 0 }; - } + return match (bound_type, inclusive) { + (DescendingBoundType::FirstLeq, true) => { + if *first <= target_f64 { + 0 + } else { + *len + } + } + (DescendingBoundType::FirstLeq, false) => { + if *first < target_f64 { + 0 + } else { + *len + } + } + (DescendingBoundType::FirstLt, true) => { + if *first >= target_f64 { + *len + } else { + 0 + } + } + (DescendingBoundType::FirstLt, false) => { + if *first > target_f64 { + *len + } else { + 0 + } + } + }; } - let idx = if inclusive { - ((first - target_f64) / (-step)).floor() as i64 + 1 - } else { - ((first - target_f64) / (-step)).ceil() as i64 + let diff = (first - target_f64) / (-step); + let idx = match (bound_type, inclusive) { + (DescendingBoundType::FirstLeq, true) => diff.ceil() as i64, + (DescendingBoundType::FirstLeq, false) => (diff + f64::EPSILON).ceil() as i64, + (DescendingBoundType::FirstLt, true) => diff.floor() as i64 + 1, + (DescendingBoundType::FirstLt, false) => diff.ceil() as i64, }; idx.clamp(0, *len as i64) as usize } @@ -1050,101 +1075,83 @@ fn find_filter_range( } } -/// For descending arrays: find first index where value <= target (or < if not inclusive) -fn find_first_leq_descending( +/// Unified function for descending array bound search. +/// +/// Consolidates `find_first_leq_descending` and `find_first_lt_descending`. +fn find_descending_bound( values: &CoordValuesRef<'_>, target: &ScalarValue, + bound_type: DescendingBoundType, inclusive: bool, ) -> Option { let target_f64 = scalar_to_f64(target)?; - // In descending order, values start high and go low - // We want first index where value <= target (or < target if not inclusive) - // This means we skip all values > target let result = match values { - CoordValuesRef::Int64(vals) => { - if inclusive { - vals.partition_point(|&x| (x as f64) > target_f64) - } else { - vals.partition_point(|&x| (x as f64) >= target_f64) - } - } - CoordValuesRef::Float32(vals) => { - if inclusive { - vals.partition_point(|&x| (x as f64) > target_f64) - } else { - vals.partition_point(|&x| (x as f64) >= target_f64) - } - } + CoordValuesRef::Int64(vals) => descending_partition_point( + vals.iter().map(|&x| x as f64), + target_f64, + bound_type, + inclusive, + ), + CoordValuesRef::Float32(vals) => descending_partition_point( + vals.iter().map(|&x| x as f64), + target_f64, + bound_type, + inclusive, + ), CoordValuesRef::Float64(vals) => { - if inclusive { - vals.partition_point(|&x| x > target_f64) - } else { - vals.partition_point(|&x| x >= target_f64) - } - } - CoordValuesRef::TimestampMicros(vals) => { - if inclusive { - vals.partition_point(|&x| (x as f64) > target_f64) - } else { - vals.partition_point(|&x| (x as f64) >= target_f64) - } - } + descending_partition_point(vals.iter().copied(), target_f64, bound_type, inclusive) + } + CoordValuesRef::TimestampMicros(vals) => descending_partition_point( + vals.iter().map(|&x| x as f64), + target_f64, + bound_type, + inclusive, + ), CoordValuesRef::Compact { encoding, .. } => { - compact_first_leq_descending(encoding, target_f64, inclusive) + compact_descending_bound(encoding, target_f64, bound_type, inclusive) } }; Some(result) } +/// Helper to compute partition point for descending arrays. +/// +/// Returns the index based on bound type and inclusivity. +fn descending_partition_point( + values: impl Iterator + Clone, + target: f64, + bound_type: DescendingBoundType, + inclusive: bool, +) -> usize { + let vals: Vec = values.collect(); + match (bound_type, inclusive) { + // FirstLeq: find first index where value <= target + (DescendingBoundType::FirstLeq, true) => vals.partition_point(|&x| x > target), + (DescendingBoundType::FirstLeq, false) => vals.partition_point(|&x| x >= target), + // FirstLt: find first index where value < target + (DescendingBoundType::FirstLt, true) => vals.partition_point(|&x| x >= target), + (DescendingBoundType::FirstLt, false) => vals.partition_point(|&x| x > target), + } +} + +/// For descending arrays: find first index where value <= target (or < if not inclusive) +fn find_first_leq_descending( + values: &CoordValuesRef<'_>, + target: &ScalarValue, + inclusive: bool, +) -> Option { + find_descending_bound(values, target, DescendingBoundType::FirstLeq, inclusive) +} + /// For descending arrays: find first index where value < target (or <= if not inclusive) fn find_first_lt_descending( values: &CoordValuesRef<'_>, target: &ScalarValue, inclusive: bool, ) -> Option { - let target_f64 = scalar_to_f64(target)?; - - // In descending order, we want first index where value < target (or <= if not inclusive) - // This is the exclusive end of our range - let result = match values { - CoordValuesRef::Int64(vals) => { - if inclusive { - // Include values >= target, so end at first value < target - vals.partition_point(|&x| (x as f64) >= target_f64) - } else { - // Exclude target value, so end at first value <= target - vals.partition_point(|&x| (x as f64) > target_f64) - } - } - CoordValuesRef::Float32(vals) => { - if inclusive { - vals.partition_point(|&x| (x as f64) >= target_f64) - } else { - vals.partition_point(|&x| (x as f64) > target_f64) - } - } - CoordValuesRef::Float64(vals) => { - if inclusive { - vals.partition_point(|&x| x >= target_f64) - } else { - vals.partition_point(|&x| x > target_f64) - } - } - CoordValuesRef::TimestampMicros(vals) => { - if inclusive { - vals.partition_point(|&x| (x as f64) >= target_f64) - } else { - vals.partition_point(|&x| (x as f64) > target_f64) - } - } - CoordValuesRef::Compact { encoding, .. } => { - compact_first_lt_descending(encoding, target_f64, inclusive) - } - }; - - Some(result) + find_descending_bound(values, target, DescendingBoundType::FirstLt, inclusive) } /// Calculate the total number of rows after applying coordinate filters