Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ add_subdirectory(util)
if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES
arrow/arrow_fs_file_io.cc
arrow/metadata_column_util.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
avro/avro_direct_encoder.cc
Expand Down
56 changes: 56 additions & 0 deletions src/iceberg/arrow/metadata_column_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <arrow/array.h>
#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/memory_pool.h>

#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/metadata_column_util_internal.h"

namespace iceberg::arrow {

Result<std::shared_ptr<::arrow::Array>> MakeFilePathArray(const std::string& file_path,
int64_t num_rows,
::arrow::MemoryPool* pool) {
::arrow::StringBuilder builder(pool);
ICEBERG_ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
for (int64_t i = 0; i < num_rows; ++i) {
ICEBERG_ARROW_RETURN_NOT_OK(builder.Append(file_path));
}
std::shared_ptr<::arrow::Array> array;
ICEBERG_ARROW_RETURN_NOT_OK(builder.Finish(&array));
return array;
}

Result<std::shared_ptr<::arrow::Array>> MakeRowPositionArray(int64_t start_position,
int64_t num_rows,
::arrow::MemoryPool* pool) {
::arrow::Int64Builder builder(pool);
ICEBERG_ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
for (int64_t i = 0; i < num_rows; ++i) {
ICEBERG_ARROW_RETURN_NOT_OK(builder.Append(start_position + i));
}
std::shared_ptr<::arrow::Array> array;
ICEBERG_ARROW_RETURN_NOT_OK(builder.Finish(&array));
return array;
}

} // namespace iceberg::arrow
65 changes: 65 additions & 0 deletions src/iceberg/arrow/metadata_column_util_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/arrow/metadata_column_util_internal.h
/// Utility functions for populating metadata columns (_file and _pos) in readers.

#include <cstdint>
#include <memory>
#include <string>

#include <arrow/type_fwd.h>

#include "iceberg/result.h"

namespace iceberg::arrow {

/// \brief Context for populating metadata columns during reading.
struct MetadataColumnContext {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this structure to metadata_columns.h?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this but finally decided not to do this. The reason is that there are more metadata columns in that file and we just need only two of them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, thanks for the explanation.

std::string file_path; // The file path to populate _file column.
int64_t next_file_pos = 0; // The next row position for populating _pos column.
};

/// \brief Create a constant string array for _file column.
///
/// Creates an Arrow StringArray where all values are the same file path string.
///
/// \param file_path The file path value.
/// \param num_rows Number of rows in the batch.
/// \param pool Arrow memory pool.
/// \return Arrow StringArray with constant file_path value.
Result<std::shared_ptr<::arrow::Array>> MakeFilePathArray(const std::string& file_path,
int64_t num_rows,
::arrow::MemoryPool* pool);

/// \brief Create a sequential int64 array for _pos column.
///
/// Creates an Arrow Int64Array with sequential values starting from start_position.
///
/// \param start_position Starting row position (inclusive).
/// \param num_rows Number of rows in the batch.
/// \param pool Arrow memory pool.
/// \return Arrow Int64Array with values [start_position, start_position + num_rows).
Result<std::shared_ptr<::arrow::Array>> MakeRowPositionArray(int64_t start_position,
int64_t num_rows,
::arrow::MemoryPool* pool);

} // namespace iceberg::arrow
64 changes: 44 additions & 20 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
* under the License.
*/

#include <ranges>

#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_decimal.h>
#include <arrow/array/builder_nested.h>
Expand All @@ -35,6 +33,7 @@
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/schema.h"
#include "iceberg/schema_util.h"
#include "iceberg/util/checked_cast.h"
Expand All @@ -51,13 +50,15 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const FieldProjection& projection,
const SchemaField& projected_field,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder);

/// \brief Append Avro record data to Arrow struct builder.
Status AppendStructToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const std::span<const FieldProjection>& projections,
const StructType& struct_type,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
if (avro_node->type() != ::avro::AVRO_RECORD) {
return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node));
Expand All @@ -83,9 +84,21 @@ Status AppendStructToBuilder(const ::avro::NodePtr& avro_node,
const auto& avro_field_datum = avro_record.fieldAt(avro_field_index);
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(avro_field_node, avro_field_datum,
field_projection, expected_field,
field_builder));
metadata_context, field_builder));
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull());
} else if (field_projection.kind == FieldProjection::Kind::kMetadata) {
int32_t field_id = expected_field.field_id();
if (field_id == MetadataColumns::kFilePathColumnId) {
auto string_builder =
internal::checked_cast<::arrow::StringBuilder*>(field_builder);
ICEBERG_ARROW_RETURN_NOT_OK(string_builder->Append(metadata_context.file_path));
} else if (field_id == MetadataColumns::kFilePositionColumnId) {
auto int_builder = internal::checked_cast<::arrow::Int64Builder*>(field_builder);
ICEBERG_ARROW_RETURN_NOT_OK(int_builder->Append(metadata_context.next_file_pos));
} else {
return NotSupported("Unsupported metadata column field id: {}", field_id);
}
} else {
return NotImplemented("Unsupported field projection kind: {}",
ToString(field_projection.kind));
Expand All @@ -99,6 +112,7 @@ Status AppendListToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const FieldProjection& element_projection,
const ListType& list_type,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
if (avro_node->type() != ::avro::AVRO_ARRAY) {
return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node));
Expand All @@ -113,8 +127,9 @@ Status AppendListToBuilder(const ::avro::NodePtr& avro_node,
const auto& element_field = list_type.fields().back();

for (const auto& element : avro_array.value()) {
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
element_node, element, element_projection, element_field, value_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(element_node, element,
element_projection, element_field,
metadata_context, value_builder));
}
return {};
}
Expand All @@ -124,7 +139,9 @@ Status AppendMapToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const FieldProjection& key_projection,
const FieldProjection& value_projection,
const MapType& map_type, ::arrow::ArrayBuilder* array_builder) {
const MapType& map_type,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder);

if (avro_node->type() == ::avro::AVRO_MAP) {
Expand All @@ -143,10 +160,12 @@ Status AppendMapToBuilder(const ::avro::NodePtr& avro_node,
auto* item_builder = map_builder->item_builder();

for (const auto& entry : map_entries) {
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
key_node, entry.first, key_projection, key_field, key_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
value_node, entry.second, value_projection, value_field, item_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(key_node, entry.first,
key_projection, key_field,
metadata_context, key_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(value_node, entry.second,
value_projection, value_field,
metadata_context, item_builder));
}

return {};
Expand All @@ -173,10 +192,12 @@ Status AppendMapToBuilder(const ::avro::NodePtr& avro_node,

for (const auto& entry : array_entries) {
const auto& record = entry.value<::avro::GenericRecord>();
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
key_node, record.fieldAt(0), key_projection, key_field, key_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
value_node, record.fieldAt(1), value_projection, value_field, item_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(key_node, record.fieldAt(0),
key_projection, key_field,
metadata_context, key_builder));
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(value_node, record.fieldAt(1),
value_projection, value_field,
metadata_context, item_builder));
}

return {};
Expand All @@ -191,12 +212,13 @@ Status AppendNestedValueToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const std::span<const FieldProjection>& projections,
const NestedType& projected_type,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
switch (projected_type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const StructType&>(projected_type);
return AppendStructToBuilder(avro_node, avro_datum, projections, struct_type,
array_builder);
metadata_context, array_builder);
}

case TypeId::kList: {
Expand All @@ -206,7 +228,7 @@ Status AppendNestedValueToBuilder(const ::avro::NodePtr& avro_node,
}
const auto& list_type = internal::checked_cast<const ListType&>(projected_type);
return AppendListToBuilder(avro_node, avro_datum, projections[0], list_type,
array_builder);
metadata_context, array_builder);
}

case TypeId::kMap: {
Expand All @@ -216,7 +238,7 @@ Status AppendNestedValueToBuilder(const ::avro::NodePtr& avro_node,
}
const auto& map_type = internal::checked_cast<const MapType&>(projected_type);
return AppendMapToBuilder(avro_node, avro_datum, projections[0], projections[1],
map_type, array_builder);
map_type, metadata_context, array_builder);
}

default:
Expand Down Expand Up @@ -420,6 +442,7 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const FieldProjection& projection,
const SchemaField& projected_field,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
if (avro_node->type() == ::avro::AVRO_UNION) {
size_t branch = avro_datum.unionBranch();
Expand All @@ -428,7 +451,7 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
return {};
} else {
return AppendFieldToBuilder(avro_node->leafAt(branch), avro_datum, projection,
projected_field, array_builder);
projected_field, metadata_context, array_builder);
}
}

Expand All @@ -439,7 +462,7 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
} else {
const auto& nested_type = internal::checked_cast<const NestedType&>(projected_type);
return AppendNestedValueToBuilder(avro_node, avro_datum, projection.children,
nested_type, array_builder);
nested_type, metadata_context, array_builder);
}
}

Expand All @@ -449,9 +472,10 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const SchemaProjection& projection,
const Schema& projected_schema,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
return AppendNestedValueToBuilder(avro_node, avro_datum, projection.fields,
projected_schema, array_builder);
projected_schema, metadata_context, array_builder);
}

namespace {
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/avro/avro_data_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <arrow/array/builder_base.h>
#include <avro/GenericDatum.hh>

#include "iceberg/arrow/metadata_column_util_internal.h"
#include "iceberg/schema_util.h"

namespace iceberg::avro {
Expand All @@ -35,12 +36,14 @@ namespace iceberg::avro {
/// \param avro_datum The Avro data to append
/// \param projection Schema projection from `projected_schema` to `avro_node`
/// \param projected_schema The projected schema
/// \param metadata_context Context for populating metadata columns
/// \param array_builder The Arrow array builder to append to (must be a struct builder)
/// \return Status indicating success or failure
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
const ::avro::GenericDatum& avro_datum,
const SchemaProjection& projection,
const Schema& projected_schema,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder);

/// \brief Extract an Avro datum from an Arrow array.
Expand Down
Loading
Loading