diff --git a/lib/collection/src/collection_manager/segments_searcher.rs b/lib/collection/src/collection_manager/segments_searcher.rs index 77932a95c09..3fc6fdfb110 100644 --- a/lib/collection/src/collection_manager/segments_searcher.rs +++ b/lib/collection/src/collection_manager/segments_searcher.rs @@ -219,12 +219,18 @@ impl SegmentsSearcher { // Using block to ensure `segments` variable is dropped in the end of it let (locked_segments, searches): (Vec<_>, Vec<_>) = { - // Unfortunately, we have to do `segments.read()` twice, once in blocking task - // and once here, due to `Send` bounds :/ - let Some(segments_lock) = segments.try_read_for(timeout) else { - return Err(CollectionError::timeout(timeout, "search")); + let segments: Vec<_> = { + // Unfortunately, we have to do `segments.read()` twice, once in blocking task + // and once here, due to `Send` bounds :/ + let Some(segments_lock) = segments.try_read_for(timeout) else { + return Err(CollectionError::timeout(timeout, "search")); + }; + + // Collect the segments first so we don't lock the segment holder during the operations. + segments_lock + .non_appendable_then_appendable_segments() + .collect() }; - let segments = segments_lock.non_appendable_then_appendable_segments(); // Probabilistic sampling for the `limit` parameter avoids over-fetching points from segments. // e.g. 10 segments with limit 1000 would fetch 10000 points in total and discard 9000 points. @@ -234,10 +240,11 @@ impl SegmentsSearcher { // - more than 1 segment // - segments are not empty let use_sampling = sampling_enabled - && segments_lock.len() > 1 + && segments.len() > 1 && query_context_arc.available_point_count() > 0; segments + .into_iter() .map(|segment| { let query_context_arc_segment = query_context_arc.clone(); // update timeout @@ -409,15 +416,22 @@ impl SegmentsSearcher { let filter = filter.cloned(); let points = runtime_handle.spawn_blocking(move || { let is_stopped = stopping_guard.get_is_stopped(); - let segments = match timeout { - None => Ok(segments.read()), - Some(t) => segments - .try_read_for(t) - .ok_or_else(|| CollectionError::timeout(t, "read_filtered")), - }?; + + // Collect the segments first so we don't lock the segment holder during the operations. + let segments: Vec<_> = { + match timeout { + None => Ok(segments.read()), + Some(t) => segments + .try_read_for(t) + .ok_or_else(|| CollectionError::timeout(t, "read_filtered")), + }? + .non_appendable_then_appendable_segments() + .collect() + }; + let hw_counter = hw_measurement_acc.get_counter_cell(); let all_points: BTreeSet<_> = segments - .non_appendable_then_appendable_segments() + .into_iter() .flat_map(|segment| { segment.get().read().read_filtered( None, @@ -446,11 +460,18 @@ impl SegmentsSearcher { let limit = arc_ctx.limit; let mut futures = { - let Some(segments_guard) = segments.try_read_for(timeout) else { - return Err(CollectionError::timeout(timeout, "rescore_with_formula")); + let segments: Vec<_> = { + let Some(segments_guard) = segments.try_read_for(timeout) else { + return Err(CollectionError::timeout(timeout, "rescore_with_formula")); + }; + // Collect the segments first so we don't lock the segment holder during the operations. + segments_guard + .non_appendable_then_appendable_segments() + .collect() }; - segments_guard - .non_appendable_then_appendable_segments() + + segments + .into_iter() .map(|segment| { let handle = runtime_handle.spawn_blocking({ let arc_ctx = arc_ctx.clone(); diff --git a/lib/collection/src/shards/local_shard/mod.rs b/lib/collection/src/shards/local_shard/mod.rs index 1425b80f2f7..b03a502778a 100644 --- a/lib/collection/src/shards/local_shard/mod.rs +++ b/lib/collection/src/shards/local_shard/mod.rs @@ -888,10 +888,16 @@ impl LocalShard { // clone filter for spawning task let filter = filter.cloned(); let cardinality = tokio::task::spawn_blocking(move || { - let segments = segments.read(); // blocking sync lock - segments + // Collect the segments first so we don't lock the segment holder during the operations. + let segments = segments + .read() .iter() - .map(|(_id, segment)| { + .map(|i| i.1.clone()) + .collect::>(); + + segments + .into_iter() + .map(|segment| { segment .get() .read() // blocking sync lock @@ -985,14 +991,19 @@ impl LocalShard { let segments = self.segments.clone(); let segment_info = tokio::task::spawn_blocking(move || { - let segments = segments.read(); // blocking sync lock + // Collect the segments first so we don't lock the segment holder during the operations. + let segments = segments + .read() + .iter() + .map(|i| i.1.clone()) + .collect::>(); let mut schema: HashMap = Default::default(); let mut indexed_vectors_count = 0; let mut points_count = 0; let mut segments_count = 0; - for (_idx, segment) in segments.iter() { + for segment in segments { segments_count += 1; let segment_info = segment.get().read().info(); diff --git a/lib/collection/src/shards/local_shard/query.rs b/lib/collection/src/shards/local_shard/query.rs index e8dff0bd8bd..c4c22eac800 100644 --- a/lib/collection/src/shards/local_shard/query.rs +++ b/lib/collection/src/shards/local_shard/query.rs @@ -413,7 +413,6 @@ impl LocalShard { } } - #[allow(clippy::too_many_arguments)] async fn fusion_rescore( &self, sources: impl Iterator>, diff --git a/lib/collection/src/shards/local_shard/search.rs b/lib/collection/src/shards/local_shard/search.rs index c1449871a04..1040b903899 100644 --- a/lib/collection/src/shards/local_shard/search.rs +++ b/lib/collection/src/shards/local_shard/search.rs @@ -131,7 +131,7 @@ impl LocalShard { let search_request = SegmentsSearcher::search( self.segments.clone(), - Arc::clone(&core_request), + core_request.clone(), search_runtime_handle, true, query_context, diff --git a/lib/collection/src/shards/replica_set/mod.rs b/lib/collection/src/shards/replica_set/mod.rs index 154f3217882..f8ae0edf5a4 100644 --- a/lib/collection/src/shards/replica_set/mod.rs +++ b/lib/collection/src/shards/replica_set/mod.rs @@ -1317,8 +1317,12 @@ impl ShardReplicaSet { let mut total_payload_size = 0; let mut total_points = 0; - for segment in local.segments.read().iter() { - let size_info = segment.1.get().read().size_info(); + // Collect the segments first so we don't have the segment holder locked for the entire duration of the loop. + let segments: Vec<_> = + local.segments.read().iter().map(|i| i.1).cloned().collect(); + + for segment in segments { + let size_info = segment.get().read().size_info(); total_vector_size += size_info.vectors_size_bytes; total_payload_size += size_info.payloads_size_bytes; total_points += size_info.num_points; diff --git a/lib/edge/src/query.rs b/lib/edge/src/query.rs index e2d3afda0cb..57c2b4a119e 100644 --- a/lib/edge/src/query.rs +++ b/lib/edge/src/query.rs @@ -312,11 +312,14 @@ impl EdgeShard { let mut rescored_results = Vec::new(); - for segment in self + // Collect the segments first so we don't lock the segment holder during the operations. + let segments = self .segments .read() .non_appendable_then_appendable_segments() - { + .collect::>(); + + for segment in segments { let rescored_result = segment .get() .read() diff --git a/lib/segment/src/segment/entry.rs b/lib/segment/src/segment/entry.rs index afb4505c856..f51f4c65743 100644 --- a/lib/segment/src/segment/entry.rs +++ b/lib/segment/src/segment/entry.rs @@ -445,7 +445,7 @@ impl SegmentEntry for Segment { Ok(records.into_values().collect()) } - fn iter_points(&self) -> Box + '_> { + fn iter_points(&self) -> Box> { // Sorry for that, but I didn't find any way easier. // If you try simply return iterator - it won't work because AtomicRef should exist // If you try to make callback instead - you won't be able to create diff --git a/lib/shard/src/retrieve/retrieve_blocking.rs b/lib/shard/src/retrieve/retrieve_blocking.rs index aebf1b40a9f..0bcae8a0824 100644 --- a/lib/shard/src/retrieve/retrieve_blocking.rs +++ b/lib/shard/src/retrieve/retrieve_blocking.rs @@ -8,6 +8,7 @@ use segment::common::operation_error::{OperationError, OperationResult}; use segment::types::{PointIdType, SeqNumberType, WithPayload, WithVector}; use crate::retrieve::record_internal::RecordInternal; +use crate::segment_holder::SegmentHolder; use crate::segment_holder::locked::LockedSegmentHolder; pub fn retrieve_blocking( @@ -24,11 +25,7 @@ pub fn retrieve_blocking( let hw_counter = hw_measurement_acc.get_counter_cell(); - let Some(segments_guard) = segments.try_read_for(timeout) else { - return Err(OperationError::timeout(timeout, "retrieve points")); - }; - - segments_guard.read_points(points, is_stopped, |ids, segment| { + SegmentHolder::read_points_locked(&segments, points, is_stopped, timeout, |ids, segment| { let mut newer_version_points: Vec<_> = Vec::with_capacity(ids.len()); let mut applied = 0;