Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions encodings/parquet-variant/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VTable;
use vortex_array::arrow::ArrowExport;
use vortex_array::arrow::ArrowExportVTable;
use vortex_array::arrow::ArrowImport;
use vortex_array::arrow::ArrowImportVTable;
use vortex_array::arrow::ArrowSession;
use vortex_array::dtype::DType;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::registry::CachedId;
use vortex_session::registry::Id;

use crate::ParquetVariant;
use crate::ParquetVariantArrayExt;

/// Arrow canonical extension name for Parquet Variant storage.
pub const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant";

static ARROW_PARQUET_VARIANT: CachedId = CachedId::new(PARQUET_VARIANT_ARROW_EXTENSION_NAME);

impl ArrowExportVTable for ParquetVariant {
fn arrow_ext_id(&self) -> Id {
*ARROW_PARQUET_VARIANT
}

fn vortex_id(&self) -> Id {
ParquetVariant.id()
}

fn to_arrow_field(
Comment thread
robert3005 marked this conversation as resolved.
&self,
_name: &str,
_dtype: &DType,
_session: &ArrowSession,
) -> VortexResult<Option<Field>> {
Ok(None)
}

fn execute_arrow(
&self,
array: ArrayRef,
target: &Field,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrowExport> {
if target
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
|| !array.dtype().is_variant()
{
return Ok(ArrowExport::Unsupported(array));
}

let executed = array.execute_until::<ParquetVariant>(ctx)?;
let parquet_array = executed
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?;
let arrow_variant = parquet_array.to_arrow(ctx)?;
Ok(ArrowExport::Exported(Arc::new(arrow_variant.into_inner())))
}
}

impl ArrowImportVTable for ParquetVariant {
fn arrow_ext_id(&self) -> Id {
*ARROW_PARQUET_VARIANT
}

fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>> {
if field
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
{
return Ok(None);
}

Ok(Some(DType::Variant(field.is_nullable().into())))
}

fn from_arrow_array(
&self,
array: ArrowArrayRef,
field: &Field,
dtype: &DType,
) -> VortexResult<ArrowImport> {
if !matches!(dtype, DType::Variant(_))
|| field
.metadata()
.get(EXTENSION_TYPE_NAME_KEY)
.map(String::as_str)
!= Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME)
|| !matches!(array.data_type(), DataType::Struct(_))
{
return Ok(ArrowImport::Unsupported(array));
}

let arrow_variant = ArrowVariantArray::try_new(array.as_struct())?;
let imported = if dtype.is_nullable() {
ParquetVariant::from_arrow_variant_nullable(&arrow_variant)?
} else {
ParquetVariant::from_arrow_variant(&arrow_variant)?
};
Ok(ArrowImport::Imported(imported.into_array()))
}
}
99 changes: 99 additions & 0 deletions encodings/parquet-variant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,110 @@
//! [Arrow canonical extension type]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#parquet-variant

mod array;
mod arrow;
mod kernel;
mod operations;
mod validity;
mod vtable;

use std::sync::Arc;

pub use array::ParquetVariantArrayExt;
pub use arrow::PARQUET_VARIANT_ARROW_EXTENSION_NAME;
use vortex_array::arrow::ArrowSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
pub use vtable::ParquetVariant;
pub use vtable::ParquetVariantArray;

/// Register Parquet Variant array and Arrow extension support with a session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(ParquetVariant);
session.arrow().register_exporter(Arc::new(ParquetVariant));
session.arrow().register_importer(Arc::new(ParquetVariant));
}

#[cfg(test)]
mod arrow_session_tests {
use std::sync::Arc;

use arrow_array::Array as _;
use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::StructArray;
use arrow_array::cast::AsArray;
use arrow_schema::Field;
use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY;
use parquet_variant::Variant as PqVariant;
use parquet_variant_compute::VariantArrayBuilder;
use vortex_array::VortexSessionExecute;
use vortex_array::arrow::ArrowSessionExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::ParquetVariant;

fn session() -> VortexSession {
let session = VortexSession::empty().with::<ArraySession>();
crate::initialize(&session);
session
}

fn arrow_variant_storage() -> StructArray {
let mut builder = VariantArrayBuilder::new(3);
builder.append_variant(PqVariant::from(42i8));
builder.append_variant(PqVariant::from(true));
builder.append_variant(PqVariant::from("vortex"));
builder.build().into_inner()
}

fn arrow_variant_field(storage: &StructArray) -> Field {
Field::new("variant", storage.data_type().clone(), false).with_metadata(
[(
EXTENSION_TYPE_NAME_KEY.to_string(),
"arrow.parquet.variant".to_string(),
)]
.into(),
)
}

#[test]
fn arrow_session_imports_parquet_variant_extension_array() -> VortexResult<()> {
let session = session();
let storage = arrow_variant_storage();
let field = arrow_variant_field(&storage);
let imported = session
.arrow()
.from_arrow_array(Arc::new(storage) as ArrowArrayRef, &field)?;

assert_eq!(imported.dtype(), &DType::Variant(Nullability::NonNullable));
assert!(imported.as_opt::<ParquetVariant>().is_some());
Ok(())
}

#[test]
fn arrow_session_exports_parquet_variant_extension_array() -> VortexResult<()> {
let session = session();
let storage = arrow_variant_storage();
let field = arrow_variant_field(&storage);
let imported = session
.arrow()
.from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?;

let mut ctx = session.create_execution_ctx();
let exported = session
.arrow()
.execute_arrow(imported, Some(&field), &mut ctx)?;
let exported = exported.as_struct();

assert_eq!(exported.len(), storage.len());
assert_eq!(exported.column_names(), storage.column_names());
assert_eq!(exported.fields(), storage.fields());
for (actual, expected) in exported.columns().iter().zip(storage.columns()) {
assert_eq!(actual.to_data(), expected.to_data());
}
Ok(())
}
}
43 changes: 19 additions & 24 deletions vortex-array/src/arrow/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ use crate::dtype::Nullability;
use crate::dtype::StructFields;
use crate::dtype::arrow::FromArrowType;
use crate::dtype::arrow::to_data_type_naive;
use crate::dtype::extension::ExtDTypeRef;
use crate::dtype::extension::ExtId;
use crate::extension::datetime::AnyTemporal;
use crate::extension::uuid::Uuid;
use crate::validity::Validity;
Expand Down Expand Up @@ -99,17 +97,17 @@ pub trait ArrowExportVTable: 'static + Send + Sync + Debug {
/// The Arrow extension ID this plugin produces.
fn arrow_ext_id(&self) -> Id;

/// The Vortex extension ID this plugin maps from. Used only for inference by
/// The Vortex array or extension ID this plugin maps from. Used only for inference by
/// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a
/// dispatch key for [`execute_arrow`][Self::execute_arrow].
fn vortex_ext_id(&self) -> ExtId;
fn vortex_id(&self) -> Id;

/// Build the Arrow [`Field`] this plugin produces for the given Vortex extension
/// `dtype`. Used during schema inference.
fn to_arrow_field(
&self,
name: &str,
dtype: &ExtDTypeRef,
dtype: &DType,
session: &ArrowSession,
) -> VortexResult<Option<Field>>;

Expand All @@ -125,7 +123,7 @@ pub trait ArrowExportVTable: 'static + Send + Sync + Debug {
) -> VortexResult<ArrowExport>;
}

/// Plugin layer for importing an Arrow extension-typed array into a Vortex extension array.
/// Plugin layer for importing an Arrow extension-typed array into a Vortex array.
///
/// Plugins are dispatched by `arrow_ext_id`.
///
Expand All @@ -140,15 +138,16 @@ pub trait ArrowImportVTable: 'static + Send + Sync + Debug {
#[allow(clippy::wrong_self_convention)]
fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>>;

/// Convert an Arrow array into a Vortex extension array of `dtype`.
/// Convert an Arrow array into a Vortex array of `dtype`.
///
/// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot
/// handle the input.
#[allow(clippy::wrong_self_convention)]
fn from_arrow_array(
&self,
array: ArrowArrayRef,
dtype: &ExtDTypeRef,
field: &Field,
dtype: &DType,
) -> VortexResult<ArrowImport>;
}

Expand All @@ -157,7 +156,7 @@ pub type ArrowImportVTableRef = Arc<dyn ArrowImportVTable>;

type ExportMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;
type ImportMap = HashMap<Id, Arc<[ArrowImportVTableRef]>>;
type ExportDTypeMap = HashMap<ExtId, Arc<[ArrowExportVTableRef]>>;
type ExportDTypeMap = HashMap<Id, Arc<[ArrowExportVTableRef]>>;

/// Session-scoped registry of Arrow extension plugins.
///
Expand Down Expand Up @@ -199,11 +198,7 @@ impl ArrowSession {
exporter.arrow_ext_id(),
ArrowExportVTableRef::clone(&exporter),
);
Self::insert(
&self.exporters_by_vortex,
exporter.vortex_ext_id(),
exporter,
);
Self::insert(&self.exporters_by_vortex, exporter.vortex_id(), exporter);
}

/// Register an [`ArrowImportVTable`] under its source Arrow extension name.
Expand Down Expand Up @@ -234,7 +229,7 @@ impl ArrowSession {
.unwrap_or_else(|| Arc::from([]))
}

fn exporters_by_vortex(&self, id: &ExtId) -> Arc<[ArrowExportVTableRef]> {
fn exporters_by_vortex(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> {
self.exporters_by_vortex
.load()
.get(id)
Expand Down Expand Up @@ -286,7 +281,9 @@ impl ArrowSession {
}
DType::Extension(ext) if !ext.is::<AnyTemporal>() => {
for plugin in self.exporters_by_vortex(&ext.id()).iter() {
if let Some(field) = plugin.to_arrow_field(name, ext, self)? {
if let Some(field) =
plugin.to_arrow_field(name, &DType::Extension(ext.clone()), self)?
{
return Ok(field);
}
}
Expand Down Expand Up @@ -490,16 +487,14 @@ impl ArrowSession {
let importers = self.importers(&Id::new(extension_name));
if !importers.is_empty() {
let dtype = self.from_arrow_field(field)?;
if let DType::Extension(ext_dtype) = dtype {
let mut current = array;
for plugin in importers.iter() {
match plugin.from_arrow_array(current, &ext_dtype)? {
ArrowImport::Imported(arr) => return Ok(arr),
ArrowImport::Unsupported(arr) => current = arr,
}
let mut current = array;
for plugin in importers.iter() {
match plugin.from_arrow_array(current, field, &dtype)? {
ArrowImport::Imported(arr) => return Ok(arr),
ArrowImport::Unsupported(arr) => current = arr,
}
return ArrayRef::from_arrow(current.as_ref(), field.is_nullable());
}
return ArrayRef::from_arrow(current.as_ref(), field.is_nullable());
}
}
self.from_arrow_array_canonical(array, field)
Expand Down
12 changes: 7 additions & 5 deletions vortex-array/src/extension/uuid/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::dtype::extension::ExtDType;
use crate::dtype::extension::ExtDTypeRef;
use crate::dtype::extension::ExtId;
use crate::dtype::extension::ExtVTable;
use crate::extension::uuid::Uuid;
use crate::extension::uuid::UuidMetadata;
Expand All @@ -61,15 +59,15 @@ impl ArrowExportVTable for Uuid {
*ARROW_UUID
}

fn vortex_ext_id(&self) -> ExtId {
fn vortex_id(&self) -> Id {
Uuid.id()
}

// Encode all of these.
fn to_arrow_field(
&self,
name: &str,
dtype: &ExtDTypeRef,
dtype: &DType,
_session: &ArrowSession,
) -> VortexResult<Option<Field>> {
let mut field = Field::new(
Expand Down Expand Up @@ -125,8 +123,12 @@ impl ArrowImportVTable for Uuid {
fn from_arrow_array(
&self,
array: ArrowArrayRef,
dtype: &ExtDTypeRef,
_field: &Field,
dtype: &DType,
) -> VortexResult<ArrowImport> {
let DType::Extension(dtype) = dtype else {
return Ok(ArrowImport::Unsupported(array));
};
if !matches!(array.data_type(), DataType::FixedSizeBinary(UUID_BYTE_LEN))
|| !dtype.is::<Uuid>()
{
Expand Down
Loading
Loading