Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 148 additions & 141 deletions src/reader/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,90 +774,115 @@ fn compact_upper_bound(encoding: &CompactCoord, target_f64: f64) -> usize {
}
}

/// For compact descending: find first index where value <= target (or < if not inclusive)
fn compact_first_leq_descending(
// =============================================================================
// Descending range search helpers (unified to reduce duplication)
// =============================================================================

/// Type of bound for descending range search.
///
/// Used to parameterize the unified descending search functions.
#[derive(Debug, Clone, Copy, PartialEq)]
enum DescendingBoundType {
/// Find first index where value <= target (start of range)
FirstLeq,
/// Find first index where value < target (end of range)
FirstLt,
}

/// Unified function for compact descending range search.
///
/// Consolidates `compact_first_leq_descending` and `compact_first_lt_descending`.
fn compact_descending_bound(
encoding: &CompactCoord,
target_f64: f64,
bound_type: DescendingBoundType,
inclusive: bool,
) -> usize {
// Descending means step < 0, values go from high to low
// We want first index where value <= target
match encoding {
CompactCoord::ArithmeticInt { first, step, len } => {
if *step == 0 {
let first_f64 = *first as f64;
if inclusive {
return if first_f64 <= target_f64 { 0 } else { *len };
} else {
return if first_f64 < target_f64 { 0 } else { *len };
}
}
// For descending (step < 0): value[i] = first + i * step
// Want first i where first + i*step <= target (or < target)
// i >= (first - target) / (-step)
let step_f64 = *step as f64;
let first_f64 = *first as f64;
let idx = if inclusive {
((first_f64 - target_f64) / (-step_f64)).ceil() as i64
} else {
((first_f64 - target_f64) / (-step_f64) + f64::EPSILON).ceil() as i64
};
idx.clamp(0, *len as i64) as usize
}
CompactCoord::Arithmetic { first, step, len } => {
if step.abs() < f64::EPSILON {
if inclusive {
return if *first <= target_f64 { 0 } else { *len };
} else {
return if *first < target_f64 { 0 } else { *len };
}
}
let idx = if inclusive {
((first - target_f64) / (-step)).ceil() as i64
} else {
((first - target_f64) / (-step) + f64::EPSILON).ceil() as i64
};
idx.clamp(0, *len as i64) as usize
}
}
}

/// For compact descending: find first index where value < target (or <= if not inclusive)
fn compact_first_lt_descending(encoding: &CompactCoord, target_f64: f64, inclusive: bool) -> usize {
// For descending, find first index where value < target (exclusive end)
match encoding {
CompactCoord::ArithmeticInt { first, step, len } => {
if *step == 0 {
let first_f64 = *first as f64;
if inclusive {
return if first_f64 >= target_f64 { *len } else { 0 };
} else {
return if first_f64 > target_f64 { *len } else { 0 };
}
return match (bound_type, inclusive) {
(DescendingBoundType::FirstLeq, true) => {
if first_f64 <= target_f64 {
0
} else {
*len
}
}
(DescendingBoundType::FirstLeq, false) => {
if first_f64 < target_f64 {
0
} else {
*len
}
}
(DescendingBoundType::FirstLt, true) => {
if first_f64 >= target_f64 {
*len
} else {
0
}
}
(DescendingBoundType::FirstLt, false) => {
if first_f64 > target_f64 {
*len
} else {
0
}
}
};
}
let step_f64 = *step as f64;
let first_f64 = *first as f64;
// For descending: want first i where first + i*step < target
// i > (first - target) / (-step)
let idx = if inclusive {
((first_f64 - target_f64) / (-step_f64)).floor() as i64 + 1
} else {
((first_f64 - target_f64) / (-step_f64)).ceil() as i64
let diff = (first_f64 - target_f64) / (-step_f64);
let idx = match (bound_type, inclusive) {
(DescendingBoundType::FirstLeq, true) => diff.ceil() as i64,
(DescendingBoundType::FirstLeq, false) => (diff + f64::EPSILON).ceil() as i64,
(DescendingBoundType::FirstLt, true) => diff.floor() as i64 + 1,
(DescendingBoundType::FirstLt, false) => diff.ceil() as i64,
};
idx.clamp(0, *len as i64) as usize
}
CompactCoord::Arithmetic { first, step, len } => {
if step.abs() < f64::EPSILON {
if inclusive {
return if *first >= target_f64 { *len } else { 0 };
} else {
return if *first > target_f64 { *len } else { 0 };
}
return match (bound_type, inclusive) {
(DescendingBoundType::FirstLeq, true) => {
if *first <= target_f64 {
0
} else {
*len
}
}
(DescendingBoundType::FirstLeq, false) => {
if *first < target_f64 {
0
} else {
*len
}
}
(DescendingBoundType::FirstLt, true) => {
if *first >= target_f64 {
*len
} else {
0
}
}
(DescendingBoundType::FirstLt, false) => {
if *first > target_f64 {
*len
} else {
0
}
}
};
}
let idx = if inclusive {
((first - target_f64) / (-step)).floor() as i64 + 1
} else {
((first - target_f64) / (-step)).ceil() as i64
let diff = (first - target_f64) / (-step);
let idx = match (bound_type, inclusive) {
(DescendingBoundType::FirstLeq, true) => diff.ceil() as i64,
(DescendingBoundType::FirstLeq, false) => (diff + f64::EPSILON).ceil() as i64,
(DescendingBoundType::FirstLt, true) => diff.floor() as i64 + 1,
(DescendingBoundType::FirstLt, false) => diff.ceil() as i64,
};
idx.clamp(0, *len as i64) as usize
}
Expand Down Expand Up @@ -1050,101 +1075,83 @@ fn find_filter_range(
}
}

/// For descending arrays: find first index where value <= target (or < if not inclusive)
fn find_first_leq_descending(
/// Unified function for descending array bound search.
///
/// Consolidates `find_first_leq_descending` and `find_first_lt_descending`.
fn find_descending_bound(
values: &CoordValuesRef<'_>,
target: &ScalarValue,
bound_type: DescendingBoundType,
inclusive: bool,
) -> Option<usize> {
let target_f64 = scalar_to_f64(target)?;

// In descending order, values start high and go low
// We want first index where value <= target (or < target if not inclusive)
// This means we skip all values > target
let result = match values {
CoordValuesRef::Int64(vals) => {
if inclusive {
vals.partition_point(|&x| (x as f64) > target_f64)
} else {
vals.partition_point(|&x| (x as f64) >= target_f64)
}
}
CoordValuesRef::Float32(vals) => {
if inclusive {
vals.partition_point(|&x| (x as f64) > target_f64)
} else {
vals.partition_point(|&x| (x as f64) >= target_f64)
}
}
CoordValuesRef::Int64(vals) => descending_partition_point(
vals.iter().map(|&x| x as f64),
target_f64,
bound_type,
inclusive,
),
CoordValuesRef::Float32(vals) => descending_partition_point(
vals.iter().map(|&x| x as f64),
target_f64,
bound_type,
inclusive,
),
CoordValuesRef::Float64(vals) => {
if inclusive {
vals.partition_point(|&x| x > target_f64)
} else {
vals.partition_point(|&x| x >= target_f64)
}
}
CoordValuesRef::TimestampMicros(vals) => {
if inclusive {
vals.partition_point(|&x| (x as f64) > target_f64)
} else {
vals.partition_point(|&x| (x as f64) >= target_f64)
}
}
descending_partition_point(vals.iter().copied(), target_f64, bound_type, inclusive)
}
CoordValuesRef::TimestampMicros(vals) => descending_partition_point(
vals.iter().map(|&x| x as f64),
target_f64,
bound_type,
inclusive,
),
CoordValuesRef::Compact { encoding, .. } => {
compact_first_leq_descending(encoding, target_f64, inclusive)
compact_descending_bound(encoding, target_f64, bound_type, inclusive)
}
};

Some(result)
}

/// Helper to compute partition point for descending arrays.
///
/// Returns the index based on bound type and inclusivity.
fn descending_partition_point(
values: impl Iterator<Item = f64> + Clone,
target: f64,
bound_type: DescendingBoundType,
inclusive: bool,
) -> usize {
let vals: Vec<f64> = values.collect();
match (bound_type, inclusive) {
// FirstLeq: find first index where value <= target
(DescendingBoundType::FirstLeq, true) => vals.partition_point(|&x| x > target),
(DescendingBoundType::FirstLeq, false) => vals.partition_point(|&x| x >= target),
// FirstLt: find first index where value < target
(DescendingBoundType::FirstLt, true) => vals.partition_point(|&x| x >= target),
(DescendingBoundType::FirstLt, false) => vals.partition_point(|&x| x > target),
}
}

/// For descending arrays: find first index where value <= target (or < if not inclusive)
fn find_first_leq_descending(
values: &CoordValuesRef<'_>,
target: &ScalarValue,
inclusive: bool,
) -> Option<usize> {
find_descending_bound(values, target, DescendingBoundType::FirstLeq, inclusive)
}

/// For descending arrays: find first index where value < target (or <= if not inclusive)
fn find_first_lt_descending(
values: &CoordValuesRef<'_>,
target: &ScalarValue,
inclusive: bool,
) -> Option<usize> {
let target_f64 = scalar_to_f64(target)?;

// In descending order, we want first index where value < target (or <= if not inclusive)
// This is the exclusive end of our range
let result = match values {
CoordValuesRef::Int64(vals) => {
if inclusive {
// Include values >= target, so end at first value < target
vals.partition_point(|&x| (x as f64) >= target_f64)
} else {
// Exclude target value, so end at first value <= target
vals.partition_point(|&x| (x as f64) > target_f64)
}
}
CoordValuesRef::Float32(vals) => {
if inclusive {
vals.partition_point(|&x| (x as f64) >= target_f64)
} else {
vals.partition_point(|&x| (x as f64) > target_f64)
}
}
CoordValuesRef::Float64(vals) => {
if inclusive {
vals.partition_point(|&x| x >= target_f64)
} else {
vals.partition_point(|&x| x > target_f64)
}
}
CoordValuesRef::TimestampMicros(vals) => {
if inclusive {
vals.partition_point(|&x| (x as f64) >= target_f64)
} else {
vals.partition_point(|&x| (x as f64) > target_f64)
}
}
CoordValuesRef::Compact { encoding, .. } => {
compact_first_lt_descending(encoding, target_f64, inclusive)
}
};

Some(result)
find_descending_bound(values, target, DescendingBoundType::FirstLt, inclusive)
}

/// Calculate the total number of rows after applying coordinate filters
Expand Down
Loading
Loading