From 5430d3af1051c1c7d93216b54daf4b1c424c6b6e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 10 Jun 2026 16:25:20 +0100 Subject: [PATCH] Add arrow import/export for vortex-json Signed-off-by: Adam Gutglick --- Cargo.lock | 3 + vortex-json/Cargo.toml | 3 + vortex-json/src/arrow.rs | 140 +++++++++++++++++++++++++++++++++++++++ vortex-json/src/lib.rs | 102 ++++++++++++++++++++++++++++ 4 files changed, 248 insertions(+) create mode 100644 vortex-json/src/arrow.rs diff --git a/Cargo.lock b/Cargo.lock index 781932651ca..0823979dd7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10306,8 +10306,11 @@ dependencies = [ name = "vortex-json" version = "0.1.0" dependencies = [ + "arrow-array", + "arrow-schema", "vortex-array", "vortex-error", + "vortex-session", ] [[package]] diff --git a/vortex-json/Cargo.toml b/vortex-json/Cargo.toml index 3b693b96f29..4afd224f6aa 100644 --- a/vortex-json/Cargo.toml +++ b/vortex-json/Cargo.toml @@ -17,5 +17,8 @@ version = { workspace = true } workspace = true [dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } vortex-array = { workspace = true, default-features = false } vortex-error = { workspace = true, default-features = false } +vortex-session = { workspace = true } diff --git a/vortex-json/src/arrow.rs b/vortex-json/src/arrow.rs new file mode 100644 index 00000000000..67dc376c9d0 --- /dev/null +++ b/vortex-json/src/arrow.rs @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Arrow import and export support for the JSON extension dtype. + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use arrow_schema::extension::Json as ArrowJson; +use vortex_array::ArrayRef; +use vortex_array::EmptyMetadata; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowExport; +use vortex_array::arrow::ArrowExportVTable; +use vortex_array::arrow::ArrowImport; +use vortex_array::arrow::ArrowImportVTable; +use vortex_array::arrow::ArrowSession; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::arrow::FromArrowArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::arrow::FromArrowType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::dtype::extension::ExtVTable; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_session::registry::CachedId; +use vortex_session::registry::Id; + +use crate::Json; + +/// Arrow's canonical JSON extension name cached as a registry id. +static ARROW_JSON: CachedId = CachedId::new(ArrowJson::NAME); + +/// Returns whether an Arrow field contains valid canonical JSON extension metadata. +fn has_valid_json_extension(field: &Field) -> bool { + field.extension_type_name() == Some(ArrowJson::NAME) + && ArrowJson::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok() +} + +impl ArrowExportVTable for Json { + fn arrow_ext_id(&self) -> Id { + *ARROW_JSON + } + + fn vortex_id(&self) -> Id { + Json.id() + } + + fn to_arrow_field( + &self, + name: &str, + dtype: &DType, + _session: &ArrowSession, + ) -> VortexResult> { + let DType::Extension(ext_dtype) = dtype else { + return Ok(None); + }; + if !ext_dtype.is::() { + return Ok(None); + } + + let mut field = Field::new(name, DataType::Utf8, dtype.is_nullable()); + field + .try_with_extension_type(ArrowJson::default()) + .vortex_expect("Utf8 is a valid storage type for Arrow JSON"); + Ok(Some(field)) + } + + fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let is_json = array + .dtype() + .as_extension_opt() + .map(|ext_dtype| ext_dtype.is::()) + .unwrap_or(false); + if !is_json { + return Ok(ArrowExport::Unsupported(array)); + } + + ArrowJson::try_new_from_field_metadata(target.data_type(), target.metadata())?; + + let executed = array.execute::(ctx)?; + let storage = executed.storage_array().clone(); + let storage_field = Field::new( + String::new(), + target.data_type().clone(), + storage.dtype().is_nullable(), + ); + let session = ctx.session().clone(); + Ok(ArrowExport::Exported(session.arrow().execute_arrow( + storage, + Some(&storage_field), + ctx, + )?)) + } +} + +impl ArrowImportVTable for Json { + fn arrow_ext_id(&self) -> Id { + *ARROW_JSON + } + + fn from_arrow_field(&self, field: &Field) -> VortexResult> { + if !has_valid_json_extension(field) { + return Ok(None); + } + + let storage_dtype = DType::from_arrow(field); + Ok(Some(DType::Extension( + ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased(), + ))) + } + + fn from_arrow_array( + &self, + array: ArrowArrayRef, + field: &Field, + dtype: &DType, + ) -> VortexResult { + let DType::Extension(ext_dtype) = dtype else { + return Ok(ArrowImport::Unsupported(array)); + }; + if !ext_dtype.is::() || !has_valid_json_extension(field) { + return Ok(ArrowImport::Unsupported(array)); + } + + let storage = ArrayRef::from_arrow(array.as_ref(), field.is_nullable())?; + Ok(ArrowImport::Imported( + ExtensionArray::new(ext_dtype.clone(), storage).into_array(), + )) + } +} diff --git a/vortex-json/src/lib.rs b/vortex-json/src/lib.rs index 609ac44b861..049321c9578 100644 --- a/vortex-json/src/lib.rs +++ b/vortex-json/src/lib.rs @@ -9,6 +9,108 @@ //! Extension type and related functionality for a JSON extension type for Vortex. +mod arrow; mod dtype; +use std::sync::Arc; + pub use dtype::Json; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::session::DTypeSessionExt; +use vortex_session::VortexSession; + +/// Register JSON extension support with a session. +pub fn initialize(session: &VortexSession) { + session.dtypes().register(Json); + session.arrow().register_exporter(Arc::new(Json)); + session.arrow().register_importer(Arc::new(Json)); +} + +#[cfg(test)] +mod tests { + //! Tests for JSON extension Arrow export. + + use std::sync::Arc; + + use arrow_array::Array; + use arrow_array::ArrayRef as ArrowArrayRef; + use arrow_array::StringArray; + use arrow_array::cast::AsArray; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::extension::ExtensionType; + use arrow_schema::extension::Json as ArrowJson; + use vortex_array::EmptyMetadata; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinArray; + use vortex_array::arrow::ArrowSessionExt; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::extension::ExtDType; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::Json; + use crate::initialize; + + /// Export a JSON extension array to Arrow's canonical JSON extension. + #[test] + fn exports_json_extension_array_as_arrow_json() -> VortexResult<()> { + let session = VortexSession::empty(); + initialize(&session); + + let storage = VarBinArray::from_iter( + [Some("{\"id\":1}"), Some("{\"id\":2}")], + vortex_array::dtype::DType::Utf8(Nullability::NonNullable), + ) + .into_array(); + let ext_dtype = ExtDType::::try_new(EmptyMetadata, storage.dtype().clone())?.erased(); + let array = ExtensionArray::new(ext_dtype, storage).into_array(); + + let field = session.arrow().to_arrow_field("data", array.dtype())?; + assert_eq!(field.extension_type_name(), Some(ArrowJson::NAME)); + ArrowJson::try_new_from_field_metadata(field.data_type(), field.metadata())?; + + let exported = session.arrow().execute_arrow( + array, + Some(&field), + &mut session.create_execution_ctx(), + )?; + assert_eq!(exported.data_type(), &DataType::Utf8); + + let strings = exported.as_string::(); + assert_eq!(strings.value(0), "{\"id\":1}"); + assert_eq!(strings.value(1), "{\"id\":2}"); + Ok(()) + } + + /// Import Arrow's canonical JSON extension as a Vortex JSON extension array. + #[test] + fn imports_arrow_json_extension_array_as_vortex_json() -> VortexResult<()> { + let session = VortexSession::empty(); + initialize(&session); + + let mut field = Field::new("data", DataType::Utf8, false); + field.try_with_extension_type(ArrowJson::default())?; + let array = Arc::new(StringArray::from(vec!["{\"id\":1}", "{\"id\":2}"])) as ArrowArrayRef; + + let imported = session.arrow().from_arrow_array(array, &field)?; + let ext_dtype = imported + .dtype() + .as_extension_opt() + .vortex_expect("expected JSON extension dtype"); + assert!(ext_dtype.is::()); + + let exported = session.arrow().execute_arrow( + imported, + Some(&field), + &mut session.create_execution_ctx(), + )?; + let strings = exported.as_string::(); + assert_eq!(strings.value(0), "{\"id\":1}"); + assert_eq!(strings.value(1), "{\"id\":2}"); + Ok(()) + } +}