Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 132 additions & 149 deletions src/physical_plan/zarr_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,70 +240,88 @@ impl ExecutionPlan for ZarrExec {
}
}

/// Execute read from remote object store
fn execute_remote(
path: String,
// =============================================================================
// Helper functions to reduce duplication across execute_* functions
// =============================================================================

/// Build projected schema from full schema and projection indices.
///
/// This consolidates the identical schema projection logic used across all execute functions.
fn build_exec_projected_schema(schema: &SchemaRef, projection: Option<&Vec<usize>>) -> SchemaRef {
if let Some(indices) = projection {
Arc::new(Schema::new(
indices
.iter()
.map(|&i| schema.field(i).clone())
.collect::<Vec<_>>(),
))
} else {
schema.clone()
}
}

/// Parameters for async read execution.
///
/// Encapsulates the common parameters passed to all async execute functions.
struct AsyncReadParams {
schema: SchemaRef,
projection: Option<Vec<usize>>,
limit: Option<usize>,
stats: SharedIoStats,
cached_remote: CachedRemoteStore,
coord_filters: Option<CoordFilters>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
use crate::reader::storage::create_async_store;
}

/// Execute an async read with the given store setup function.
///
/// This consolidates the common stream creation pattern used across all async execute functions:
/// 1. Create projected schema
/// 2. Create async stream that sets up store and calls read_zarr_async
/// 3. Collect batches and wrap in RecordBatchStreamAdapter
fn execute_async_read<F, Fut>(
params: AsyncReadParams,
store_setup: F,
debug_name: &'static str,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<
Output = datafusion::error::Result<(
AsyncReadableListableStorage,
ObjectPath,
Option<ZarrStoreMeta>,
)>,
> + Send,
{
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::{self, TryStreamExt};

debug!(path = %path, has_cached_remote = cached_remote.is_some(), "Setting up remote execution stream");

// Create a stream that will perform the async read when polled
let projected_schema = if let Some(ref indices) = projection {
Arc::new(Schema::new(
indices
.iter()
.map(|&i| schema.field(i).clone())
.collect::<Vec<_>>(),
))
} else {
schema.clone()
};
let projected_schema = build_exec_projected_schema(&params.schema, params.projection.as_ref());

// Use try_flatten to handle the Result<Stream, Error> pattern
let stream = stream::once(async move {
debug!("Remote stream polled - starting async execution");
debug!("{} stream polled - starting async execution", debug_name);

// Use cached store and metadata if available
let (store, prefix, cached_meta) = if let Some((store, prefix, meta)) = cached_remote {
info!("Using cached async store and metadata");
(store, prefix, Some(meta))
} else {
debug!("Creating async store (no cache)");
let (store, prefix) = create_async_store(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
debug!(prefix = %prefix, "Async store created");
(store, prefix, None)
};
// Set up the store using the provided function
let (store, prefix, cached_meta) = store_setup().await?;

// Read the data
debug!("Starting read_zarr_async");
debug!("Starting read_zarr_async for {}", debug_name);
let result_stream = read_zarr_async(
store,
&prefix,
schema,
projection,
limit,
Some(stats),
params.schema,
params.projection,
params.limit,
Some(params.stats),
cached_meta,
coord_filters,
params.coord_filters,
)
.await?;

// Collect into batches and return as stream
debug!("Collecting batches");
debug!("Collecting batches from {}", debug_name);
let batches: Vec<RecordBatch> = result_stream.try_collect().await?;
info!(num_batches = batches.len(), "Remote read complete");
info!(num_batches = batches.len(), "{} read complete", debug_name);

Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok)))
})
Expand All @@ -315,6 +333,47 @@ fn execute_remote(
)))
}

/// Execute read from remote object store
fn execute_remote(
path: String,
schema: SchemaRef,
projection: Option<Vec<usize>>,
limit: Option<usize>,
stats: SharedIoStats,
cached_remote: CachedRemoteStore,
coord_filters: Option<CoordFilters>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
use crate::reader::storage::create_async_store;

debug!(path = %path, has_cached_remote = cached_remote.is_some(), "Setting up remote execution stream");

let params = AsyncReadParams {
schema,
projection,
limit,
stats,
coord_filters,
};

execute_async_read(
params,
move || async move {
if let Some((store, prefix, meta)) = cached_remote {
info!("Using cached async store and metadata");
Ok((store, prefix, Some(meta)))
} else {
debug!("Creating async store (no cache)");
let (store, prefix) = create_async_store(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
debug!(prefix = %prefix, "Async store created");
Ok((store, prefix, None))
}
},
"Remote",
)
}

/// Execute read from VirtualiZarr Parquet reference store
fn execute_virtualizarr(
path: String,
Expand All @@ -325,68 +384,31 @@ fn execute_virtualizarr(
coord_filters: Option<CoordFilters>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
use crate::reader::schema_inference::discover_arrays;
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::{self, TryStreamExt};
use zarrs::storage::AsyncReadableListableStorage;

debug!(path = %path, "Setting up VirtualiZarr execution stream");

// Pre-discover metadata from local .zmetadata (avoids async discovery issues)
let cached_meta = discover_arrays(&path).map_err(DataFusionError::External)?;

// Create projected schema
let projected_schema = if let Some(ref indices) = projection {
Arc::new(Schema::new(
indices
.iter()
.map(|&i| schema.field(i).clone())
.collect::<Vec<_>>(),
))
} else {
schema.clone()
let params = AsyncReadParams {
schema,
projection,
limit,
stats,
coord_filters,
};

// Create the stream that performs async read when polled
let stream = stream::once(async move {
debug!("VirtualiZarr stream polled - starting async execution");

// Create VirtualStoreAdapter
let adapter = VirtualStoreAdapter::new(&path).map_err(DataFusionError::External)?;

// Wrap in Arc for zarrs
let store: AsyncReadableListableStorage = Arc::new(adapter);

// Empty prefix - VirtualiZarr stores use relative paths internally
let prefix = ObjectPath::from("");

// Read the data with cached metadata (avoids async discovery)
debug!("Starting read_zarr_async for VirtualiZarr");
let result_stream = read_zarr_async(
store,
&prefix,
schema,
projection,
limit,
Some(stats),
Some(cached_meta), // Use pre-loaded metadata
coord_filters,
)
.await?;

// Collect into batches and return as stream
debug!("Collecting batches from VirtualiZarr");
let batches: Vec<RecordBatch> = result_stream.try_collect().await?;
info!(num_batches = batches.len(), "VirtualiZarr read complete");

Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok)))
})
.try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
projected_schema,
stream,
)))
execute_async_read(
params,
move || async move {
// Create VirtualStoreAdapter
let adapter = VirtualStoreAdapter::new(&path).map_err(DataFusionError::External)?;
let store: AsyncReadableListableStorage = Arc::new(adapter);
let prefix = ObjectPath::from("");
Ok((store, prefix, Some(cached_meta)))
},
"VirtualiZarr",
)
}

/// Execute read from a cached VirtualiZarr adapter (for remote VirtualiZarr stores)
Expand All @@ -398,62 +420,23 @@ fn execute_virtualizarr_with_adapter(
stats: SharedIoStats,
coord_filters: Option<CoordFilters>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::{self, TryStreamExt};

debug!("Setting up remote VirtualiZarr execution with cached adapter");

// Create projected schema
let projected_schema = if let Some(ref indices) = projection {
Arc::new(Schema::new(
indices
.iter()
.map(|&i| schema.field(i).clone())
.collect::<Vec<_>>(),
))
} else {
schema.clone()
let params = AsyncReadParams {
schema,
projection,
limit,
stats,
coord_filters,
};

// Create the stream that performs async read when polled
let stream = stream::once(async move {
debug!("Remote VirtualiZarr stream polled - starting async execution");

// Use the pre-loaded adapter directly
let store: AsyncReadableListableStorage = adapter;

// Empty prefix - VirtualiZarr stores use relative paths internally
let prefix = ObjectPath::from("");

// Read the data (metadata will be discovered from the adapter)
debug!("Starting read_zarr_async for remote VirtualiZarr");
let result_stream = read_zarr_async(
store,
&prefix,
schema,
projection,
limit,
Some(stats),
None, // Let it discover metadata from the adapter
coord_filters,
)
.await?;

// Collect into batches and return as stream
debug!("Collecting batches from remote VirtualiZarr");
let batches: Vec<RecordBatch> = result_stream.try_collect().await?;
info!(
num_batches = batches.len(),
"Remote VirtualiZarr read complete"
);

Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok)))
})
.try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
projected_schema,
stream,
)))
execute_async_read(
params,
move || async move {
let store: AsyncReadableListableStorage = adapter;
let prefix = ObjectPath::from("");
Ok((store, prefix, None))
},
"Remote VirtualiZarr",
)
}
Loading
Loading