From ad519929ceb895674a19b1aac873505a568e1176 Mon Sep 17 00:00:00 2001 From: jayendra13 Date: Wed, 4 Feb 2026 07:06:11 +0530 Subject: [PATCH 1/2] Unify execute functions in zarr_exec.rs (Phase 2) Extract common patterns from execute_remote, execute_virtualizarr, and execute_virtualizarr_with_adapter into shared helpers: - build_exec_projected_schema(): Common schema projection logic - AsyncReadParams: Encapsulates common async read parameters - execute_async_read(): Generic async execution with store setup closure This consolidates the repeated stream creation pattern: 1. Build projected schema 2. Create async stream with store setup 3. Call read_zarr_async 4. Collect batches and wrap in RecordBatchStreamAdapter Impact: -17 lines (132 additions, 149 deletions) with clearer separation of concerns. Each execute function now focuses only on its unique store setup logic. Part of: https://github.com/jayendra13/zarr-datafusion/issues/5 --- src/physical_plan/zarr_exec.rs | 281 ++++++++++++++++----------------- 1 file changed, 132 insertions(+), 149 deletions(-) diff --git a/src/physical_plan/zarr_exec.rs b/src/physical_plan/zarr_exec.rs index 1a168d5..d07a1fd 100644 --- a/src/physical_plan/zarr_exec.rs +++ b/src/physical_plan/zarr_exec.rs @@ -240,70 +240,88 @@ impl ExecutionPlan for ZarrExec { } } -/// Execute read from remote object store -fn execute_remote( - path: String, +// ============================================================================= +// Helper functions to reduce duplication across execute_* functions +// ============================================================================= + +/// Build projected schema from full schema and projection indices. +/// +/// This consolidates the identical schema projection logic used across all execute functions. +fn build_exec_projected_schema(schema: &SchemaRef, projection: Option<&Vec>) -> SchemaRef { + if let Some(indices) = projection { + Arc::new(Schema::new( + indices + .iter() + .map(|&i| schema.field(i).clone()) + .collect::>(), + )) + } else { + schema.clone() + } +} + +/// Parameters for async read execution. +/// +/// Encapsulates the common parameters passed to all async execute functions. +struct AsyncReadParams { schema: SchemaRef, projection: Option>, limit: Option, stats: SharedIoStats, - cached_remote: CachedRemoteStore, coord_filters: Option, -) -> datafusion::error::Result { - use crate::reader::storage::create_async_store; +} + +/// Execute an async read with the given store setup function. +/// +/// This consolidates the common stream creation pattern used across all async execute functions: +/// 1. Create projected schema +/// 2. Create async stream that sets up store and calls read_zarr_async +/// 3. Collect batches and wrap in RecordBatchStreamAdapter +fn execute_async_read( + params: AsyncReadParams, + store_setup: F, + debug_name: &'static str, +) -> datafusion::error::Result +where + F: FnOnce() -> Fut + Send + 'static, + Fut: std::future::Future< + Output = datafusion::error::Result<( + AsyncReadableListableStorage, + ObjectPath, + Option, + )>, + > + Send, +{ use arrow::record_batch::RecordBatch; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream::{self, TryStreamExt}; - debug!(path = %path, has_cached_remote = cached_remote.is_some(), "Setting up remote execution stream"); - - // Create a stream that will perform the async read when polled - let projected_schema = if let Some(ref indices) = projection { - Arc::new(Schema::new( - indices - .iter() - .map(|&i| schema.field(i).clone()) - .collect::>(), - )) - } else { - schema.clone() - }; + let projected_schema = build_exec_projected_schema(¶ms.schema, params.projection.as_ref()); - // Use try_flatten to handle the Result pattern let stream = stream::once(async move { - debug!("Remote stream polled - starting async execution"); + debug!("{} stream polled - starting async execution", debug_name); - // Use cached store and metadata if available - let (store, prefix, cached_meta) = if let Some((store, prefix, meta)) = cached_remote { - info!("Using cached async store and metadata"); - (store, prefix, Some(meta)) - } else { - debug!("Creating async store (no cache)"); - let (store, prefix) = create_async_store(&path) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; - debug!(prefix = %prefix, "Async store created"); - (store, prefix, None) - }; + // Set up the store using the provided function + let (store, prefix, cached_meta) = store_setup().await?; // Read the data - debug!("Starting read_zarr_async"); + debug!("Starting read_zarr_async for {}", debug_name); let result_stream = read_zarr_async( store, &prefix, - schema, - projection, - limit, - Some(stats), + params.schema, + params.projection, + params.limit, + Some(params.stats), cached_meta, - coord_filters, + params.coord_filters, ) .await?; // Collect into batches and return as stream - debug!("Collecting batches"); + debug!("Collecting batches from {}", debug_name); let batches: Vec = result_stream.try_collect().await?; - info!(num_batches = batches.len(), "Remote read complete"); + info!(num_batches = batches.len(), "{} read complete", debug_name); Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok))) }) @@ -315,6 +333,47 @@ fn execute_remote( ))) } +/// Execute read from remote object store +fn execute_remote( + path: String, + schema: SchemaRef, + projection: Option>, + limit: Option, + stats: SharedIoStats, + cached_remote: CachedRemoteStore, + coord_filters: Option, +) -> datafusion::error::Result { + use crate::reader::storage::create_async_store; + + debug!(path = %path, has_cached_remote = cached_remote.is_some(), "Setting up remote execution stream"); + + let params = AsyncReadParams { + schema, + projection, + limit, + stats, + coord_filters, + }; + + execute_async_read( + params, + move || async move { + if let Some((store, prefix, meta)) = cached_remote { + info!("Using cached async store and metadata"); + Ok((store, prefix, Some(meta))) + } else { + debug!("Creating async store (no cache)"); + let (store, prefix) = create_async_store(&path) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + debug!(prefix = %prefix, "Async store created"); + Ok((store, prefix, None)) + } + }, + "Remote", + ) +} + /// Execute read from VirtualiZarr Parquet reference store fn execute_virtualizarr( path: String, @@ -325,68 +384,31 @@ fn execute_virtualizarr( coord_filters: Option, ) -> datafusion::error::Result { use crate::reader::schema_inference::discover_arrays; - use arrow::record_batch::RecordBatch; - use datafusion::physical_plan::stream::RecordBatchStreamAdapter; - use futures::stream::{self, TryStreamExt}; - use zarrs::storage::AsyncReadableListableStorage; debug!(path = %path, "Setting up VirtualiZarr execution stream"); // Pre-discover metadata from local .zmetadata (avoids async discovery issues) let cached_meta = discover_arrays(&path).map_err(DataFusionError::External)?; - // Create projected schema - let projected_schema = if let Some(ref indices) = projection { - Arc::new(Schema::new( - indices - .iter() - .map(|&i| schema.field(i).clone()) - .collect::>(), - )) - } else { - schema.clone() + let params = AsyncReadParams { + schema, + projection, + limit, + stats, + coord_filters, }; - // Create the stream that performs async read when polled - let stream = stream::once(async move { - debug!("VirtualiZarr stream polled - starting async execution"); - - // Create VirtualStoreAdapter - let adapter = VirtualStoreAdapter::new(&path).map_err(DataFusionError::External)?; - - // Wrap in Arc for zarrs - let store: AsyncReadableListableStorage = Arc::new(adapter); - - // Empty prefix - VirtualiZarr stores use relative paths internally - let prefix = ObjectPath::from(""); - - // Read the data with cached metadata (avoids async discovery) - debug!("Starting read_zarr_async for VirtualiZarr"); - let result_stream = read_zarr_async( - store, - &prefix, - schema, - projection, - limit, - Some(stats), - Some(cached_meta), // Use pre-loaded metadata - coord_filters, - ) - .await?; - - // Collect into batches and return as stream - debug!("Collecting batches from VirtualiZarr"); - let batches: Vec = result_stream.try_collect().await?; - info!(num_batches = batches.len(), "VirtualiZarr read complete"); - - Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok))) - }) - .try_flatten(); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - projected_schema, - stream, - ))) + execute_async_read( + params, + move || async move { + // Create VirtualStoreAdapter + let adapter = VirtualStoreAdapter::new(&path).map_err(DataFusionError::External)?; + let store: AsyncReadableListableStorage = Arc::new(adapter); + let prefix = ObjectPath::from(""); + Ok((store, prefix, Some(cached_meta))) + }, + "VirtualiZarr", + ) } /// Execute read from a cached VirtualiZarr adapter (for remote VirtualiZarr stores) @@ -398,62 +420,23 @@ fn execute_virtualizarr_with_adapter( stats: SharedIoStats, coord_filters: Option, ) -> datafusion::error::Result { - use arrow::record_batch::RecordBatch; - use datafusion::physical_plan::stream::RecordBatchStreamAdapter; - use futures::stream::{self, TryStreamExt}; - debug!("Setting up remote VirtualiZarr execution with cached adapter"); - // Create projected schema - let projected_schema = if let Some(ref indices) = projection { - Arc::new(Schema::new( - indices - .iter() - .map(|&i| schema.field(i).clone()) - .collect::>(), - )) - } else { - schema.clone() + let params = AsyncReadParams { + schema, + projection, + limit, + stats, + coord_filters, }; - // Create the stream that performs async read when polled - let stream = stream::once(async move { - debug!("Remote VirtualiZarr stream polled - starting async execution"); - - // Use the pre-loaded adapter directly - let store: AsyncReadableListableStorage = adapter; - - // Empty prefix - VirtualiZarr stores use relative paths internally - let prefix = ObjectPath::from(""); - - // Read the data (metadata will be discovered from the adapter) - debug!("Starting read_zarr_async for remote VirtualiZarr"); - let result_stream = read_zarr_async( - store, - &prefix, - schema, - projection, - limit, - Some(stats), - None, // Let it discover metadata from the adapter - coord_filters, - ) - .await?; - - // Collect into batches and return as stream - debug!("Collecting batches from remote VirtualiZarr"); - let batches: Vec = result_stream.try_collect().await?; - info!( - num_batches = batches.len(), - "Remote VirtualiZarr read complete" - ); - - Ok::<_, DataFusionError>(stream::iter(batches.into_iter().map(Ok))) - }) - .try_flatten(); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - projected_schema, - stream, - ))) + execute_async_read( + params, + move || async move { + let store: AsyncReadableListableStorage = adapter; + let prefix = ObjectPath::from(""); + Ok((store, prefix, None)) + }, + "Remote VirtualiZarr", + ) } From 7c66defb1738f25f42868f0011d92b2dbb4562dc Mon Sep 17 00:00:00 2001 From: jayendra13 Date: Wed, 4 Feb 2026 07:17:06 +0530 Subject: [PATCH 2/2] Consolidate schema inference with build_schema_from_store_meta (Phase 3) Extract the duplicated schema building logic from three functions into a single `build_schema_from_store_meta()` helper: - infer_schema_from_zmetadata_json - infer_schema_with_meta - infer_schema_with_meta_async All three had identical logic for: 1. Building Dictionary-encoded coordinate fields (with CF time support) 2. Building regular array fields for data variables The helper centralizes this logic, making future schema changes easier to maintain consistently. Impact: -50 lines (57 additions, 107 deletions) Part of: https://github.com/jayendra13/zarr-datafusion/issues/5 --- src/reader/schema_inference.rs | 164 ++++++++++++--------------------- 1 file changed, 57 insertions(+), 107 deletions(-) diff --git a/src/reader/schema_inference.rs b/src/reader/schema_inference.rs index 92754e4..338bb61 100644 --- a/src/reader/schema_inference.rs +++ b/src/reader/schema_inference.rs @@ -44,6 +44,56 @@ pub enum ZarrVersion { V3, } +// ============================================================================= +// Schema building helper +// ============================================================================= + +/// Build an Arrow schema from Zarr store metadata. +/// +/// This consolidates the identical schema building logic used in: +/// - `infer_schema_from_zmetadata_json` +/// - `infer_schema_with_meta` +/// - `infer_schema_with_meta_async` +/// +/// Coordinates use Dictionary encoding for memory efficiency. +/// CF time coordinates use Dictionary with Timestamp(Microsecond, UTC) values. +pub fn build_schema_from_store_meta(meta: &ZarrStoreMeta) -> Schema { + let mut fields: Vec = Vec::new(); + + // Coordinates use Dictionary encoding for memory efficiency + // CF time coordinates use Dictionary with Timestamp values + for coord in &meta.coords { + let data_type = if coord + .cf_time_attrs + .as_ref() + .is_some_and(|a| a.is_time_coordinate()) + { + // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values + DataType::Dictionary( + Box::new(DataType::Int16), + Box::new(DataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".into()), + )), + ) + } else { + zarr_dtype_to_arrow_dictionary(&coord.data_type) + }; + fields.push(Field::new(&coord.name, data_type, false)); + } + + // Data variables use regular arrays + for var in &meta.data_vars { + fields.push(Field::new( + &var.name, + zarr_dtype_to_arrow(&var.data_type), + true, + )); + } + + Schema::new(fields) +} + /// Detect Zarr version by checking metadata files pub fn detect_zarr_version( store_path: &str, @@ -146,45 +196,9 @@ pub fn discover_arrays( pub fn infer_schema_from_zmetadata_json( metadata: &serde_json::Value, ) -> Result<(Schema, ZarrStoreMeta), Box> { - use arrow::datatypes::{DataType, Field, TimeUnit}; - let meta = discover_arrays_from_json(metadata)?.ok_or("No arrays found in .zmetadata JSON")?; - - // Build schema from meta (same logic as infer_schema_with_meta) - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } - - Ok((Schema::new(fields), meta)) + let schema = build_schema_from_store_meta(&meta); + Ok((schema, meta)) } /// Discover arrays from a pre-loaded .zmetadata JSON value @@ -727,44 +741,12 @@ pub fn infer_schema_with_meta( store_path: &str, ) -> Result<(Schema, ZarrStoreMeta), Box> { let meta = discover_arrays(store_path)?; - - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } + let schema = build_schema_from_store_meta(&meta); // Note: Schema metadata causes issues with DataFusion's optimizer schema comparisons. // Instead of storing metadata in the schema, we return ZarrStoreMeta which contains // all dimension info. The CLI can access this via the ZarrTable struct. - Ok((Schema::new(fields), meta)) + Ok((schema, meta)) } /// Parse CF time attributes from a JSON attributes object @@ -1316,45 +1298,13 @@ pub async fn infer_schema_with_meta_async( ) -> Result<(Schema, ZarrStoreMeta), Box> { debug!("Starting async schema inference"); let meta = discover_arrays_async(store, prefix).await?; - - let mut fields: Vec = Vec::new(); - - // Coordinates use Dictionary encoding for memory efficiency - // CF time coordinates use Dictionary with Timestamp values - for coord in &meta.coords { - let data_type = if coord - .cf_time_attrs - .as_ref() - .is_some_and(|a| a.is_time_coordinate()) - { - // CF time coordinate: Dictionary with Timestamp(Microsecond, UTC) values - DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Timestamp( - TimeUnit::Microsecond, - Some("UTC".into()), - )), - ) - } else { - zarr_dtype_to_arrow_dictionary(&coord.data_type) - }; - fields.push(Field::new(&coord.name, data_type, false)); - } - - // Data variables use regular arrays - for var in &meta.data_vars { - fields.push(Field::new( - &var.name, - zarr_dtype_to_arrow(&var.data_type), - true, - )); - } + let schema = build_schema_from_store_meta(&meta); // Note: Schema metadata causes issues with DataFusion's optimizer schema comparisons. // Instead of storing metadata in the schema, we return ZarrStoreMeta which contains // all dimension info. The CLI can access this via the ZarrTable struct. - info!(num_fields = fields.len(), "Schema inferred"); - Ok((Schema::new(fields), meta)) + info!(num_fields = schema.fields().len(), "Schema inferred"); + Ok((schema, meta)) } #[cfg(test)]