From 50e1645e6cd07c58540a3f4bd94ee763372c9761 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 9 Jun 2026 14:12:05 -0700 Subject: [PATCH 1/2] impl ArrowExportTable for WKB Signed-off-by: Andrew Duffy --- Cargo.lock | 55 +++++++++++++++ vortex-geo/Cargo.toml | 3 + vortex-geo/src/extension/wkb.rs | 115 ++++++++++++++++++++++++++++++++ vortex-geo/src/lib.rs | 112 +++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 781932651ca..79e0c622cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4052,6 +4052,45 @@ dependencies = [ "serde", ] +[[package]] +name = "geoarrow" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec42ac7fb4fdcd6982dab92d24faf436f18c36e47c3f813a33619a2728718a30" +dependencies = [ + "geoarrow-array", + "geoarrow-schema", +] + +[[package]] +name = "geoarrow-array" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dafe7b7de3fab1a8b7099fd6a6434ca955fa65065f9c19f0f8a133693f3c2b0e" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "geo-traits", + "geoarrow-schema", + "num-traits", + "wkb", + "wkt", +] + +[[package]] +name = "geoarrow-schema" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4a7edb2a1d87024a93805332a9c8184a0354836271d42c0d18cf628a5e3cd0" +dependencies = [ + "arrow-schema", + "geo-traits", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "get_dir" version = "0.5.0" @@ -10224,8 +10263,11 @@ dependencies = [ name = "vortex-geo" version = "0.1.0" dependencies = [ + "arrow-array", + "arrow-schema", "geo-traits", "geo-types", + "geoarrow", "prost 0.14.4", "vortex-array", "vortex-error", @@ -11385,6 +11427,19 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "wkt" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb2b923ccc882312e559ffaa832a055ba9d1ac0cc8e86b3e25453247e4b81d7" +dependencies = [ + "geo-traits", + "geo-types", + "log", + "num-traits", + "thiserror 1.0.69", +] + [[package]] name = "writeable" version = "0.6.3" diff --git a/vortex-geo/Cargo.toml b/vortex-geo/Cargo.toml index 8033cb8ea31..4e76e2eefcb 100644 --- a/vortex-geo/Cargo.toml +++ b/vortex-geo/Cargo.toml @@ -14,6 +14,9 @@ rust-version.workspace = true version.workspace = true [dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +geoarrow = { workspace = true } prost = { workspace = true } vortex-array = { workspace = true } vortex-error = { workspace = true } diff --git a/vortex-geo/src/extension/wkb.rs b/vortex-geo/src/extension/wkb.rs index 65da396b820..39b3448f49e 100644 --- a/vortex-geo/src/extension/wkb.rs +++ b/vortex-geo/src/extension/wkb.rs @@ -3,11 +3,28 @@ use std::fmt::Display; use std::ops::Deref; +use std::sync::Arc; +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::extension::ExtensionType; +use geoarrow::array::GenericWkbArray; +use geoarrow::array::IntoArrow; +use geoarrow::array::WkbViewArray; +use geoarrow::datatypes::Crs; +use geoarrow::datatypes::Metadata; +use geoarrow::datatypes::WkbType; use prost::Message; use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowExport; +use vortex_array::arrow::ArrowExportVTable; +use vortex_array::arrow::ArrowSession; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; use vortex_array::dtype::extension::ExtId; use vortex_array::dtype::extension::ExtVTable; @@ -17,6 +34,8 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_err; +use vortex_session::registry::CachedId; +use vortex_session::registry::Id; use wkb::reader::GeometryType; use crate::extension::GeoMetadata; @@ -140,3 +159,99 @@ impl ExtVTable for WellKnownBinary { Wkb::try_from_bytes(storage_value.as_binary().as_slice()) } } + +static ARROW_WKB: CachedId = CachedId::new(WkbType::NAME); + +impl ArrowExportVTable for WellKnownBinary { + fn arrow_ext_id(&self) -> Id { + *ARROW_WKB + } + + fn vortex_id(&self) -> Id { + self.id() + } + + fn to_arrow_field( + &self, + name: &str, + dtype: &DType, + session: &ArrowSession, + ) -> VortexResult> { + let ext_type = dtype.as_extension(); + let geo_metadata = ext_type.metadata::(); + + let mut field = session.to_arrow_field(name, ext_type.storage_dtype())?; + field.try_with_extension_type(wkb_type(geo_metadata))?; + + Ok(Some(field)) + } + + fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let is_wkb = array + .dtype() + .as_extension_opt() + .map(|ext| ext.is::()) + .unwrap_or(false); + if !is_wkb { + return Ok(ArrowExport::Unsupported(array)); + } + + let Ok(wkb_meta) = target.try_extension_type::() else { + return Ok(ArrowExport::Unsupported(array)); + }; + + let executed = array.execute::(ctx)?; + let storage = executed.storage_array().clone(); + + let storage_field = Field::new( + String::new(), + target.data_type().clone(), + target.is_nullable(), + ); + let session = ctx.session().clone(); + let arrow_storage = session + .arrow() + .execute_arrow(storage, Some(&storage_field), ctx)?; + + // Round-trip through the GeoArrow WKB array types: this validates that the storage + // is a binary-family Arrow array and produces the canonical physical representation + // expected for a `WkbType` extension field. + let arrow_ref: ArrowArrayRef = match target.data_type() { + DataType::Binary => Arc::new( + GenericWkbArray::::try_from((arrow_storage.as_ref(), wkb_meta)) + .map_err(|e| vortex_err!("failed to construct WkbArray: {e}"))? + .into_arrow(), + ), + DataType::LargeBinary => Arc::new( + GenericWkbArray::::try_from((arrow_storage.as_ref(), wkb_meta)) + .map_err(|e| vortex_err!("failed to construct LargeWkbArray: {e}"))? + .into_arrow(), + ), + DataType::BinaryView => Arc::new( + WkbViewArray::try_from((arrow_storage.as_ref(), wkb_meta)) + .map_err(|e| vortex_err!("failed to construct WkbViewArray: {e}"))? + .into_arrow(), + ), + _ => unreachable!("target data type was validated above"), + }; + + Ok(ArrowExport::Exported(arrow_ref)) + } +} + +fn wkb_type(geo_metadata: &GeoMetadata) -> WkbType { + let metadata = Metadata::new( + geo_metadata + .crs + .as_ref() + .map(|crs| Crs::from_unknown_crs_type(crs.to_string())) + .unwrap_or_default(), + None, + ); + WkbType::new(Arc::new(metadata)) +} diff --git a/vortex-geo/src/lib.rs b/vortex-geo/src/lib.rs index a36a4fdde6b..d8a9b3d0154 100644 --- a/vortex-geo/src/lib.rs +++ b/vortex-geo/src/lib.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + +use vortex_array::arrow::ArrowSessionExt; use vortex_array::dtype::session::DTypeSessionExt; use vortex_session::VortexSession; @@ -11,21 +14,29 @@ pub mod extension; pub fn initialize(session: &VortexSession) { // register geospatial extension types session.dtypes().register(WellKnownBinary); + session.arrow().register_exporter(Arc::new(WellKnownBinary)); } #[cfg(test)] mod tests { use std::sync::LazyLock; + use arrow_array::Array as _; + use arrow_array::cast::AsArray; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::extension::ExtensionType as _; use geo_traits::to_geo::ToGeoGeometry; use geo_types::Coord; use geo_types::Geometry; use geo_types::LineString; use geo_types::Polygon; + use geoarrow::datatypes::WkbType; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::varbin::builder::VarBinBuilder; + use vortex_array::arrow::ArrowSessionExt; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::extension::ExtDType; @@ -92,4 +103,105 @@ mod tests { Ok(()) } + + fn wkb_extension_array() -> VortexResult<(Vec, vortex_array::ArrayRef)> { + let polygon = Geometry::Polygon(Polygon::new( + LineString::new(vec![ + Coord::zero(), + Coord { x: 100.0, y: 0.0 }, + Coord { x: 100.0, y: 100.0 }, + Coord { x: 0.0, y: 100.0 }, + Coord::zero(), + ]), + vec![], + )); + let mut buf = Vec::new(); + wkb::writer::write_geometry(&mut buf, &polygon, &WriteOptions::default()) + .map_err(|e| vortex_err!("writing WKB failed: {e}"))?; + + let mut builder = VarBinBuilder::::with_capacity(3); + builder.append_value(&buf); + builder.append_value(&buf); + builder.append_value(&buf); + + let dtype = ExtDType::::try_new( + GeoMetadata { + crs: Some("EPSG:4326".to_string()), + }, + DType::Binary(Nullability::NonNullable), + )?; + let storage = builder.finish(DType::Binary(Nullability::NonNullable)); + let array = ExtensionArray::new(dtype.erased(), storage.into_array()).into_array(); + Ok((buf, array)) + } + + #[test] + fn export_arrow_field_carries_wkb_extension() -> VortexResult<()> { + let (_, array) = wkb_extension_array()?; + let field = SESSION.arrow().to_arrow_field("geom", array.dtype())?; + assert_eq!(field.extension_type_name(), Some(WkbType::NAME)); + // Vortex's canonical mapping of `DType::Binary` is Arrow's `BinaryView`. + assert_eq!(field.data_type(), &DataType::BinaryView); + Ok(()) + } + + #[test] + fn execute_arrow_exports_wkb_to_binary() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let (wkb_bytes, array) = wkb_extension_array()?; + + let field = Field::new("geom", DataType::Binary, false) + .with_extension_type(WkbType::new(Default::default())); + let exported = SESSION + .arrow() + .execute_arrow(array, Some(&field), &mut ctx)?; + + assert_eq!(exported.data_type(), &DataType::Binary); + let binary = exported.as_binary::(); + assert_eq!(binary.len(), 3); + for idx in 0..3 { + assert_eq!(binary.value(idx), wkb_bytes.as_slice()); + } + Ok(()) + } + + #[test] + fn execute_arrow_exports_wkb_to_large_binary() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let (wkb_bytes, array) = wkb_extension_array()?; + + let field = Field::new("geom", DataType::LargeBinary, false) + .with_extension_type(WkbType::new(Default::default())); + let exported = SESSION + .arrow() + .execute_arrow(array, Some(&field), &mut ctx)?; + + assert_eq!(exported.data_type(), &DataType::LargeBinary); + let binary = exported.as_binary::(); + assert_eq!(binary.len(), 3); + for idx in 0..3 { + assert_eq!(binary.value(idx), wkb_bytes.as_slice()); + } + Ok(()) + } + + #[test] + fn execute_arrow_exports_wkb_to_binary_view() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let (wkb_bytes, array) = wkb_extension_array()?; + + let field = Field::new("geom", DataType::BinaryView, false) + .with_extension_type(WkbType::new(Default::default())); + let exported = SESSION + .arrow() + .execute_arrow(array, Some(&field), &mut ctx)?; + + assert_eq!(exported.data_type(), &DataType::BinaryView); + let binary = exported.as_binary_view(); + assert_eq!(binary.len(), 3); + for idx in 0..3 { + assert_eq!(binary.value(idx), wkb_bytes.as_slice()); + } + Ok(()) + } } From d15353fff2e0a080b3f5616a8ab3e6c0ae05ae48 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 9 Jun 2026 14:22:28 -0700 Subject: [PATCH 2/2] ArrowImportTable impl Signed-off-by: Andrew Duffy --- vortex-geo/src/extension/wkb.rs | 60 +++++++++++++++++ vortex-geo/src/lib.rs | 116 ++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) diff --git a/vortex-geo/src/extension/wkb.rs b/vortex-geo/src/extension/wkb.rs index 39b3448f49e..172da41b93b 100644 --- a/vortex-geo/src/extension/wkb.rs +++ b/vortex-geo/src/extension/wkb.rs @@ -18,12 +18,16 @@ use geoarrow::datatypes::WkbType; use prost::Message; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::arrow::ArrowExport; use vortex_array::arrow::ArrowExportVTable; +use vortex_array::arrow::ArrowImport; +use vortex_array::arrow::ArrowImportVTable; use vortex_array::arrow::ArrowSession; use vortex_array::arrow::ArrowSessionExt; +use vortex_array::arrow::FromArrowArray; use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; use vortex_array::dtype::extension::ExtId; @@ -244,6 +248,49 @@ impl ArrowExportVTable for WellKnownBinary { } } +impl ArrowImportVTable for WellKnownBinary { + fn arrow_ext_id(&self) -> Id { + *ARROW_WKB + } + + fn from_arrow_field(&self, field: &Field) -> VortexResult> { + let Ok(wkb_meta) = field.try_extension_type::() else { + return Ok(None); + }; + + let storage_dtype = DType::Binary(field.is_nullable().into()); + Ok(Some(DType::Extension( + ExtDType::try_with_vtable(WellKnownBinary, geo_metadata(&wkb_meta), storage_dtype)? + .erased(), + ))) + } + + fn from_arrow_array( + &self, + array: ArrowArrayRef, + field: &Field, + dtype: &DType, + ) -> VortexResult { + let Some(ext_dtype) = dtype.as_extension_opt() else { + return Ok(ArrowImport::Unsupported(array)); + }; + if !ext_dtype.is::() + || field.try_extension_type::().is_err() + || !matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView + ) + { + return Ok(ArrowImport::Unsupported(array)); + } + + let storage = ArrayRef::from_arrow(array.as_ref(), field.is_nullable())?; + Ok(ArrowImport::Imported( + ExtensionArray::new(ext_dtype.clone(), storage).into_array(), + )) + } +} + fn wkb_type(geo_metadata: &GeoMetadata) -> WkbType { let metadata = Metadata::new( geo_metadata @@ -255,3 +302,16 @@ fn wkb_type(geo_metadata: &GeoMetadata) -> WkbType { ); WkbType::new(Arc::new(metadata)) } + +fn geo_metadata(wkb_type: &WkbType) -> GeoMetadata { + let crs = wkb_type.metadata().crs().crs_value().map(|value| { + // `Crs::from_unknown_crs_type` stores the user's string verbatim as a JSON string + // value, so prefer the raw string when available to round-trip cleanly. For other + // CRS encodings (PROJJSON object, etc.), fall back to the JSON-encoded form. + value + .as_str() + .map(str::to_string) + .unwrap_or_else(|| value.to_string()) + }); + GeoMetadata { crs } +} diff --git a/vortex-geo/src/lib.rs b/vortex-geo/src/lib.rs index d8a9b3d0154..513caf85d92 100644 --- a/vortex-geo/src/lib.rs +++ b/vortex-geo/src/lib.rs @@ -15,13 +15,19 @@ pub fn initialize(session: &VortexSession) { // register geospatial extension types session.dtypes().register(WellKnownBinary); session.arrow().register_exporter(Arc::new(WellKnownBinary)); + session.arrow().register_importer(Arc::new(WellKnownBinary)); } #[cfg(test)] mod tests { + use std::sync::Arc; use std::sync::LazyLock; use arrow_array::Array as _; + use arrow_array::ArrayRef as ArrowArrayRef; + use arrow_array::BinaryArray; + use arrow_array::BinaryViewArray; + use arrow_array::LargeBinaryArray; use arrow_array::cast::AsArray; use arrow_schema::DataType; use arrow_schema::Field; @@ -31,6 +37,8 @@ mod tests { use geo_types::Geometry; use geo_types::LineString; use geo_types::Polygon; + use geoarrow::datatypes::Crs; + use geoarrow::datatypes::Metadata; use geoarrow::datatypes::WkbType; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; @@ -204,4 +212,112 @@ mod tests { } Ok(()) } + + fn wkb_field(name: &str, data_type: DataType, nullable: bool, crs: Option<&str>) -> Field { + let crs = crs + .map(|crs| Crs::from_unknown_crs_type(crs.to_string())) + .unwrap_or_default(); + let wkb_type = WkbType::new(Arc::new(Metadata::new(crs, None))); + Field::new(name, data_type, nullable).with_extension_type(wkb_type) + } + + fn assert_imported_wkb_dtype(dtype: &DType, expected_crs: Option<&str>, nullable: bool) { + let DType::Extension(ext) = dtype else { + panic!("expected Extension dtype, got {dtype}"); + }; + assert!(ext.is::()); + assert_eq!(ext.storage_dtype(), &DType::Binary(nullable.into())); + let geo = ext.metadata::(); + assert_eq!(geo.crs.as_deref(), expected_crs); + } + + #[test] + fn import_arrow_field_recovers_wkb_extension() -> VortexResult<()> { + let field = wkb_field("geom", DataType::Binary, false, Some("EPSG:4326")); + let dtype = SESSION.arrow().from_arrow_field(&field)?; + assert_imported_wkb_dtype(&dtype, Some("EPSG:4326"), false); + Ok(()) + } + + #[test] + fn import_arrow_field_without_crs() -> VortexResult<()> { + let field = wkb_field("geom", DataType::BinaryView, true, None); + let dtype = SESSION.arrow().from_arrow_field(&field)?; + assert_imported_wkb_dtype(&dtype, None, true); + Ok(()) + } + + #[test] + fn import_arrow_array_from_binary() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let (wkb_bytes, _) = wkb_extension_array()?; + let arrow: ArrowArrayRef = Arc::new(BinaryArray::from_iter_values([ + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + ])); + let field = wkb_field("geom", DataType::Binary, false, Some("EPSG:4326")); + + let imported = SESSION.arrow().from_arrow_array(arrow, &field)?; + assert_imported_wkb_dtype(imported.dtype(), Some("EPSG:4326"), false); + + for idx in 0..3 { + let geom = imported.execute_scalar(idx, &mut ctx)?; + assert_eq!(geom.value().unwrap().as_binary().as_slice(), wkb_bytes); + } + Ok(()) + } + + #[test] + fn import_arrow_array_from_large_binary() -> VortexResult<()> { + let (wkb_bytes, _) = wkb_extension_array()?; + let arrow: ArrowArrayRef = Arc::new(LargeBinaryArray::from_iter_values([ + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + ])); + let field = wkb_field("geom", DataType::LargeBinary, false, Some("EPSG:4326")); + + let imported = SESSION.arrow().from_arrow_array(arrow, &field)?; + assert_imported_wkb_dtype(imported.dtype(), Some("EPSG:4326"), false); + assert_eq!(imported.len(), 2); + Ok(()) + } + + #[test] + fn import_arrow_array_from_binary_view() -> VortexResult<()> { + let (wkb_bytes, _) = wkb_extension_array()?; + let arrow: ArrowArrayRef = Arc::new(BinaryViewArray::from_iter_values([ + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + wkb_bytes.as_slice(), + ])); + let field = wkb_field("geom", DataType::BinaryView, false, None); + + let imported = SESSION.arrow().from_arrow_array(arrow, &field)?; + assert_imported_wkb_dtype(imported.dtype(), None, false); + assert_eq!(imported.len(), 4); + Ok(()) + } + + #[test] + fn import_arrow_array_roundtrips_through_export() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let (wkb_bytes, original) = wkb_extension_array()?; + + let field = Field::new("geom", DataType::BinaryView, false) + .with_extension_type(WkbType::new(Default::default())); + let exported = SESSION + .arrow() + .execute_arrow(original, Some(&field), &mut ctx)?; + let arrow_field = wkb_field("geom", DataType::BinaryView, false, Some("EPSG:4326")); + let reimported = SESSION.arrow().from_arrow_array(exported, &arrow_field)?; + + assert_imported_wkb_dtype(reimported.dtype(), Some("EPSG:4326"), false); + for idx in 0..3 { + let geom = reimported.execute_scalar(idx, &mut ctx)?; + assert_eq!(geom.value().unwrap().as_binary().as_slice(), wkb_bytes); + } + Ok(()) + } }