Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions lib/collection/src/collection_manager/segments_searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,18 @@ impl SegmentsSearcher {

// Using block to ensure `segments` variable is dropped in the end of it
let (locked_segments, searches): (Vec<_>, Vec<_>) = {
// Unfortunately, we have to do `segments.read()` twice, once in blocking task
// and once here, due to `Send` bounds :/
let Some(segments_lock) = segments.try_read_for(timeout) else {
return Err(CollectionError::timeout(timeout, "search"));
let segments: Vec<_> = {
// Unfortunately, we have to do `segments.read()` twice, once in blocking task
// and once here, due to `Send` bounds :/
let Some(segments_lock) = segments.try_read_for(timeout) else {
return Err(CollectionError::timeout(timeout, "search"));
};

// Collect the segments first so we don't lock the segment holder during the operations.
segments_lock
.non_appendable_then_appendable_segments()
.collect()
};
let segments = segments_lock.non_appendable_then_appendable_segments();

// Probabilistic sampling for the `limit` parameter avoids over-fetching points from segments.
// e.g. 10 segments with limit 1000 would fetch 10000 points in total and discard 9000 points.
Expand All @@ -234,10 +240,11 @@ impl SegmentsSearcher {
// - more than 1 segment
// - segments are not empty
let use_sampling = sampling_enabled
&& segments_lock.len() > 1
&& segments.len() > 1
&& query_context_arc.available_point_count() > 0;

segments
.into_iter()
.map(|segment| {
let query_context_arc_segment = query_context_arc.clone();
// update timeout
Expand Down Expand Up @@ -409,15 +416,22 @@ impl SegmentsSearcher {
let filter = filter.cloned();
let points = runtime_handle.spawn_blocking(move || {
let is_stopped = stopping_guard.get_is_stopped();
let segments = match timeout {
None => Ok(segments.read()),
Some(t) => segments
.try_read_for(t)
.ok_or_else(|| CollectionError::timeout(t, "read_filtered")),
}?;

// Collect the segments first so we don't lock the segment holder during the operations.
let segments: Vec<_> = {
match timeout {
None => Ok(segments.read()),
Some(t) => segments
.try_read_for(t)
.ok_or_else(|| CollectionError::timeout(t, "read_filtered")),
}?
.non_appendable_then_appendable_segments()
.collect()
};

let hw_counter = hw_measurement_acc.get_counter_cell();
let all_points: BTreeSet<_> = segments
.non_appendable_then_appendable_segments()
.into_iter()
.flat_map(|segment| {
segment.get().read().read_filtered(
None,
Expand Down Expand Up @@ -446,11 +460,18 @@ impl SegmentsSearcher {
let limit = arc_ctx.limit;

let mut futures = {
let Some(segments_guard) = segments.try_read_for(timeout) else {
return Err(CollectionError::timeout(timeout, "rescore_with_formula"));
let segments: Vec<_> = {
let Some(segments_guard) = segments.try_read_for(timeout) else {
return Err(CollectionError::timeout(timeout, "rescore_with_formula"));
};
// Collect the segments first so we don't lock the segment holder during the operations.
segments_guard
.non_appendable_then_appendable_segments()
.collect()
};
segments_guard
.non_appendable_then_appendable_segments()

segments
.into_iter()
.map(|segment| {
let handle = runtime_handle.spawn_blocking({
let arc_ctx = arc_ctx.clone();
Expand Down
21 changes: 16 additions & 5 deletions lib/collection/src/shards/local_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,10 +888,16 @@ impl LocalShard {
// clone filter for spawning task
let filter = filter.cloned();
let cardinality = tokio::task::spawn_blocking(move || {
let segments = segments.read(); // blocking sync lock
segments
// Collect the segments first so we don't lock the segment holder during the operations.
let segments = segments
.read()
.iter()
.map(|(_id, segment)| {
.map(|i| i.1.clone())
.collect::<Vec<_>>();

segments
.into_iter()
.map(|segment| {
segment
.get()
.read() // blocking sync lock
Expand Down Expand Up @@ -985,14 +991,19 @@ impl LocalShard {

let segments = self.segments.clone();
let segment_info = tokio::task::spawn_blocking(move || {
let segments = segments.read(); // blocking sync lock
// Collect the segments first so we don't lock the segment holder during the operations.
let segments = segments
.read()
.iter()
.map(|i| i.1.clone())
.collect::<Vec<_>>();

let mut schema: HashMap<PayloadKeyType, PayloadIndexInfo> = Default::default();
let mut indexed_vectors_count = 0;
let mut points_count = 0;
let mut segments_count = 0;

for (_idx, segment) in segments.iter() {
for segment in segments {
segments_count += 1;

let segment_info = segment.get().read().info();
Expand Down
1 change: 0 additions & 1 deletion lib/collection/src/shards/local_shard/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ impl LocalShard {
}
}

#[allow(clippy::too_many_arguments)]
async fn fusion_rescore(
&self,
sources: impl Iterator<Item = Vec<ScoredPoint>>,
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/shards/local_shard/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl LocalShard {

let search_request = SegmentsSearcher::search(
self.segments.clone(),
Arc::clone(&core_request),
core_request.clone(),
search_runtime_handle,
true,
query_context,
Expand Down
8 changes: 6 additions & 2 deletions lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,12 @@ impl ShardReplicaSet {
let mut total_payload_size = 0;
let mut total_points = 0;

for segment in local.segments.read().iter() {
let size_info = segment.1.get().read().size_info();
// Collect the segments first so we don't have the segment holder locked for the entire duration of the loop.
let segments: Vec<_> =
local.segments.read().iter().map(|i| i.1).cloned().collect();

for segment in segments {
let size_info = segment.get().read().size_info();
total_vector_size += size_info.vectors_size_bytes;
total_payload_size += size_info.payloads_size_bytes;
total_points += size_info.num_points;
Expand Down
7 changes: 5 additions & 2 deletions lib/edge/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,14 @@ impl EdgeShard {

let mut rescored_results = Vec::new();

for segment in self
// Collect the segments first so we don't lock the segment holder during the operations.
let segments = self
.segments
.read()
.non_appendable_then_appendable_segments()
{
.collect::<Vec<_>>();

for segment in segments {
let rescored_result = segment
.get()
.read()
Expand Down
2 changes: 1 addition & 1 deletion lib/segment/src/segment/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl SegmentEntry for Segment {
Ok(records.into_values().collect())
}

fn iter_points(&self) -> Box<dyn Iterator<Item = PointIdType> + '_> {
fn iter_points(&self) -> Box<dyn Iterator<Item = PointIdType>> {
// Sorry for that, but I didn't find any way easier.
// If you try simply return iterator - it won't work because AtomicRef should exist
// If you try to make callback instead - you won't be able to create <dyn SegmentEntry>
Expand Down
7 changes: 2 additions & 5 deletions lib/shard/src/retrieve/retrieve_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use segment::common::operation_error::{OperationError, OperationResult};
use segment::types::{PointIdType, SeqNumberType, WithPayload, WithVector};

use crate::retrieve::record_internal::RecordInternal;
use crate::segment_holder::SegmentHolder;
use crate::segment_holder::locked::LockedSegmentHolder;

pub fn retrieve_blocking(
Expand All @@ -24,11 +25,7 @@ pub fn retrieve_blocking(

let hw_counter = hw_measurement_acc.get_counter_cell();

let Some(segments_guard) = segments.try_read_for(timeout) else {
return Err(OperationError::timeout(timeout, "retrieve points"));
};

segments_guard.read_points(points, is_stopped, |ids, segment| {
SegmentHolder::read_points_locked(&segments, points, is_stopped, timeout, |ids, segment| {
let mut newer_version_points: Vec<_> = Vec::with_capacity(ids.len());

let mut applied = 0;
Expand Down