Skip to content

Commit 8087ed1

Browse files
manuzhangcodex
andcommitted
fix: address unknown type review feedback
Enforce unknown as a v3-only optional type in schema validation and update paths, allow schema-update promotion from unknown to primitive types, reject unknown-to-nested projection, and reject unsupported Parquet writes for unknown list or map leaves. Co-authored-by: Codex <codex@openai.com>
1 parent d5b8661 commit 8087ed1

10 files changed

Lines changed: 285 additions & 20 deletions

File tree

src/iceberg/parquet/parquet_writer.cc

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
#include "iceberg/parquet/parquet_writer.h"
2121

2222
#include <memory>
23+
#include <optional>
2324
#include <string_view>
25+
#include <utility>
26+
#include <vector>
2427

2528
#include <arrow/c/bridge.h>
2629
#include <arrow/record_batch.h>
@@ -33,7 +36,11 @@
3336

3437
#include "iceberg/arrow/arrow_io_internal.h"
3538
#include "iceberg/arrow/arrow_status_internal.h"
39+
#include "iceberg/parquet/parquet_data_util_internal.h"
40+
#include "iceberg/schema.h"
3641
#include "iceberg/schema_internal.h"
42+
#include "iceberg/schema_util.h"
43+
#include "iceberg/type.h"
3744
#include "iceberg/util/macros.h"
3845

3946
namespace iceberg::parquet {
@@ -81,6 +88,75 @@ Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& propertie
8188
return level;
8289
}
8390

91+
Result<std::shared_ptr<Type>> PruneUnknownType(const std::shared_ptr<Type>& type);
92+
93+
Result<std::optional<SchemaField>> PruneUnknownField(const SchemaField& field,
94+
bool allow_skip_unknown) {
95+
if (field.type()->type_id() == TypeId::kUnknown) {
96+
if (!allow_skip_unknown) {
97+
return NotSupported(
98+
"Cannot write unknown field '{}' inside a list or map to Parquet",
99+
field.name());
100+
}
101+
return std::nullopt;
102+
}
103+
104+
ICEBERG_ASSIGN_OR_RAISE(auto pruned_type, PruneUnknownType(field.type()));
105+
return SchemaField(field.field_id(), field.name(), std::move(pruned_type),
106+
field.optional(), field.doc());
107+
}
108+
109+
Result<std::shared_ptr<Type>> PruneUnknownType(const std::shared_ptr<Type>& type) {
110+
switch (type->type_id()) {
111+
case TypeId::kUnknown:
112+
return NotSupported("Cannot write unknown field to Parquet");
113+
case TypeId::kStruct: {
114+
const auto& struct_type = static_cast<const StructType&>(*type);
115+
std::vector<SchemaField> fields;
116+
for (const auto& field : struct_type.fields()) {
117+
ICEBERG_ASSIGN_OR_RAISE(auto pruned_field,
118+
PruneUnknownField(field, /*allow_skip_unknown=*/true));
119+
if (pruned_field.has_value()) {
120+
fields.emplace_back(std::move(pruned_field.value()));
121+
}
122+
}
123+
return std::make_shared<StructType>(std::move(fields));
124+
}
125+
case TypeId::kList: {
126+
const auto& list_type = static_cast<const ListType&>(*type);
127+
ICEBERG_ASSIGN_OR_RAISE(
128+
auto pruned_element,
129+
PruneUnknownField(list_type.element(), /*allow_skip_unknown=*/false));
130+
return std::make_shared<ListType>(std::move(pruned_element.value()));
131+
}
132+
case TypeId::kMap: {
133+
const auto& map_type = static_cast<const MapType&>(*type);
134+
ICEBERG_ASSIGN_OR_RAISE(
135+
auto pruned_key,
136+
PruneUnknownField(map_type.key(), /*allow_skip_unknown=*/false));
137+
ICEBERG_ASSIGN_OR_RAISE(
138+
auto pruned_value,
139+
PruneUnknownField(map_type.value(), /*allow_skip_unknown=*/false));
140+
return std::make_shared<MapType>(std::move(pruned_key.value()),
141+
std::move(pruned_value.value()));
142+
}
143+
default:
144+
return type;
145+
}
146+
}
147+
148+
Result<std::shared_ptr<Schema>> PruneUnknownFields(const Schema& schema) {
149+
std::vector<SchemaField> fields;
150+
for (const auto& field : schema.fields()) {
151+
ICEBERG_ASSIGN_OR_RAISE(auto pruned_field,
152+
PruneUnknownField(field, /*allow_skip_unknown=*/true));
153+
if (pruned_field.has_value()) {
154+
fields.emplace_back(std::move(pruned_field.value()));
155+
}
156+
}
157+
return std::make_shared<Schema>(std::move(fields), schema.schema_id());
158+
}
159+
84160
} // namespace
85161

86162
class ParquetWriter::Impl {
@@ -97,8 +173,17 @@ class ParquetWriter::Impl {
97173
auto writer_properties = properties_builder.memory_pool(pool_)->build();
98174
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();
99175

176+
ArrowSchema input_c_schema;
177+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &input_c_schema));
178+
ICEBERG_ARROW_ASSIGN_OR_RETURN(input_arrow_schema_,
179+
::arrow::ImportSchema(&input_c_schema));
180+
181+
ICEBERG_ASSIGN_OR_RAISE(write_schema_, PruneUnknownFields(*options.schema));
182+
ICEBERG_ASSIGN_OR_RAISE(write_projection_, Project(*write_schema_, *options.schema,
183+
/*prune_source=*/false));
184+
100185
ArrowSchema c_schema;
101-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
186+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &c_schema));
102187
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));
103188

104189
std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
@@ -123,8 +208,12 @@ class ParquetWriter::Impl {
123208
}
124209

125210
Status Write(ArrowArray* array) {
126-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
127-
::arrow::ImportRecordBatch(array, arrow_schema_));
211+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
212+
auto input_batch, ::arrow::ImportRecordBatch(array, input_arrow_schema_));
213+
ICEBERG_ASSIGN_OR_RAISE(
214+
auto batch,
215+
ProjectRecordBatch(std::move(input_batch), arrow_schema_, *write_schema_,
216+
write_projection_, arrow::MetadataColumnContext{}, pool_));
128217

129218
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
130219

@@ -168,6 +257,11 @@ class ParquetWriter::Impl {
168257
private:
169258
// TODO(gangwu): make memory pool configurable
170259
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
260+
// The schema accepted from callers.
261+
std::shared_ptr<::arrow::Schema> input_arrow_schema_;
262+
// The Iceberg schema that has v3 unknown fields removed for physical writes.
263+
std::shared_ptr<Schema> write_schema_;
264+
SchemaProjection write_projection_;
171265
// Schema to write from the Parquet file.
172266
std::shared_ptr<::arrow::Schema> arrow_schema_;
173267
// The output stream to write Parquet file.

src/iceberg/schema.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
4646
int32_t schema_id,
4747
std::vector<int32_t> identifier_field_ids) {
4848
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
49+
for (const auto& field : schema->fields()) {
50+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
51+
}
4952

5053
if (!identifier_field_ids.empty()) {
5154
auto id_to_parent = IndexParents(*schema);
@@ -63,6 +66,9 @@ Result<std::unique_ptr<Schema>> Schema::Make(
6366
std::vector<SchemaField> fields, int32_t schema_id,
6467
const std::vector<std::string>& identifier_field_names) {
6568
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
69+
for (const auto& field : schema->fields()) {
70+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
71+
}
6672

6773
std::vector<int32_t> fresh_identifier_ids;
6874
for (const auto& name : identifier_field_names) {
@@ -288,6 +294,7 @@ Status Schema::Validate(int32_t format_version) const {
288294
// Check each field's type and defaults
289295
for (const auto& [field_id, field_ref] : id_to_field.get()) {
290296
const auto& field = field_ref.get();
297+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
291298

292299
// Check if the field's type requires a minimum format version
293300
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());

src/iceberg/schema_field.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,26 @@
2424

2525
#include "iceberg/type.h"
2626
#include "iceberg/util/formatter.h" // IWYU pragma: keep
27+
#include "iceberg/util/macros.h"
2728

2829
namespace iceberg {
2930

31+
namespace {
32+
33+
Status ValidateNestedFields(const Type& type) {
34+
if (!type.is_nested()) {
35+
return {};
36+
}
37+
38+
const auto& nested_type = static_cast<const NestedType&>(type);
39+
for (const auto& field : nested_type.fields()) {
40+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
41+
}
42+
return {};
43+
}
44+
45+
} // namespace
46+
3047
SchemaField::SchemaField(int32_t field_id, std::string_view name,
3148
std::shared_ptr<Type> type, bool optional, std::string_view doc)
3249
: field_id_(field_id),
@@ -62,6 +79,10 @@ Status SchemaField::Validate() const {
6279
if (type_ == nullptr) [[unlikely]] {
6380
return InvalidSchema("SchemaField cannot have null type");
6481
}
82+
if (type_->type_id() == TypeId::kUnknown && !optional_) [[unlikely]] {
83+
return InvalidSchema("Unknown type field '{}' must be optional", name_);
84+
}
85+
ICEBERG_RETURN_UNEXPECTED(ValidateNestedFields(*type_));
6586
return {};
6687
}
6788

src/iceberg/schema_util.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,8 @@ Result<FieldProjection> ProjectField(const SchemaField& expected_field,
101101

102102
if (source_field.type()->type_id() == TypeId::kUnknown &&
103103
expected_field.type()->is_nested()) {
104-
if (!expected_field.optional()) {
105-
return InvalidSchema("Cannot project required field with id {} as null",
106-
expected_field.field_id());
107-
}
108-
projection.kind = FieldProjection::Kind::kNull;
109-
return projection;
104+
return InvalidSchema("Cannot project unknown field with id {} as nested type {}",
105+
expected_field.field_id(), *expected_field.type());
110106
}
111107

112108
if (source_field.type()->type_id() == TypeId::kUnknown && !expected_field.optional()) {

src/iceberg/table_metadata.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ struct ICEBERG_EXPORT TableMetadata {
7777
static constexpr int64_t kInitialSequenceNumber = 0;
7878
static constexpr int64_t kInitialRowId = 0;
7979

80-
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};
80+
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {
81+
{TypeId::kUnknown, 3},
82+
};
8183

8284
/// An integer version number for the format
8385
int8_t format_version;

src/iceberg/test/parquet_test.cc

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,88 @@ TEST_F(ParquetReaderTest, ReadNestedUnknownProjection) {
533533
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
534534
}
535535

536+
TEST_F(ParquetReaderTest, WriteSkipsUnknownFields) {
537+
temp_parquet_file_ = "write_skips_unknown.parquet";
538+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
539+
SchemaField::MakeRequired(1, "id", int32()),
540+
SchemaField::MakeOptional(2, "mystery", unknown()),
541+
SchemaField::MakeOptional(3, "profile",
542+
std::make_shared<StructType>(std::vector<SchemaField>{
543+
SchemaField::MakeOptional(4, "name", string()),
544+
SchemaField::MakeOptional(5, "mystery", unknown()),
545+
})),
546+
});
547+
548+
ArrowSchema arrow_c_schema;
549+
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
550+
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
551+
auto array = ::arrow::json::ArrayFromJSONString(arrow_type,
552+
R"([
553+
{"id": 1, "mystery": null, "profile": {"name": "Alice", "mystery": null}},
554+
{"id": 2, "mystery": null, "profile": {"name": "Bob", "mystery": null}}
555+
])")
556+
.ValueOrDie();
557+
558+
WriterProperties writer_properties;
559+
writer_properties.Set(WriterProperties::kParquetCompression,
560+
std::string("uncompressed"));
561+
ASSERT_THAT(WriteArray(array, {.path = temp_parquet_file_,
562+
.schema = schema,
563+
.io = file_io_,
564+
.properties = std::move(writer_properties)}),
565+
IsOk());
566+
567+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
568+
auto input_stream = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
569+
auto metadata = ::parquet::ReadMetaData(input_stream);
570+
ASSERT_EQ(metadata->schema()->num_columns(), 2);
571+
EXPECT_THAT(metadata->schema()->ToString(), ::testing::HasSubstr("id"));
572+
EXPECT_THAT(metadata->schema()->ToString(), ::testing::HasSubstr("name"));
573+
EXPECT_THAT(metadata->schema()->ToString(),
574+
::testing::Not(::testing::HasSubstr("mystery")));
575+
576+
ICEBERG_UNWRAP_OR_FAIL(
577+
auto reader,
578+
ReaderFactoryRegistry::Open(
579+
FileFormatType::kParquet,
580+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema}));
581+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader,
582+
R"([
583+
{"id": 1, "mystery": null, "profile": {"name": "Alice", "mystery": null}},
584+
{"id": 2, "mystery": null, "profile": {"name": "Bob", "mystery": null}}
585+
])"));
586+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
587+
}
588+
589+
TEST_F(ParquetReaderTest, WriteRejectsUnknownListOrMapFields) {
590+
auto list_schema = std::make_shared<Schema>(std::vector<SchemaField>{
591+
SchemaField::MakeOptional(
592+
1, "mysteries",
593+
std::make_shared<ListType>(SchemaField::MakeOptional(2, "element", unknown()))),
594+
});
595+
auto list_writer_result = WriterFactoryRegistry::Open(
596+
FileFormatType::kParquet,
597+
{.path = "unknown_list.parquet", .schema = list_schema, .io = file_io_});
598+
ASSERT_THAT(list_writer_result, IsError(ErrorKind::kNotSupported));
599+
ASSERT_THAT(list_writer_result,
600+
HasErrorMessage("Cannot write unknown field 'element' inside a list or map "
601+
"to Parquet"));
602+
603+
auto map_schema = std::make_shared<Schema>(std::vector<SchemaField>{
604+
SchemaField::MakeOptional(
605+
1, "properties",
606+
std::make_shared<MapType>(SchemaField::MakeRequired(2, "key", string()),
607+
SchemaField::MakeOptional(3, "value", unknown()))),
608+
});
609+
auto map_writer_result = WriterFactoryRegistry::Open(
610+
FileFormatType::kParquet,
611+
{.path = "unknown_map.parquet", .schema = map_schema, .io = file_io_});
612+
ASSERT_THAT(map_writer_result, IsError(ErrorKind::kNotSupported));
613+
ASSERT_THAT(map_writer_result,
614+
HasErrorMessage("Cannot write unknown field 'value' inside a list or map "
615+
"to Parquet"));
616+
}
617+
536618
class ParquetReadWrite : public ::testing::Test {
537619
protected:
538620
static void SetUpTestSuite() { parquet::RegisterAll(); }

src/iceberg/test/schema_test.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,53 @@ TEST(SchemaTest, Equality) {
9696
ASSERT_EQ(schema5, schema1);
9797
}
9898

99+
TEST(SchemaTest, UnknownTypeRequiresFormatV3) {
100+
iceberg::Schema schema({
101+
iceberg::SchemaField::MakeOptional(1, "mystery", iceberg::unknown()),
102+
iceberg::SchemaField::MakeOptional(
103+
2, "profile",
104+
iceberg::struct_({
105+
iceberg::SchemaField::MakeOptional(3, "nested", iceberg::unknown()),
106+
})),
107+
});
108+
109+
EXPECT_THAT(schema.Validate(2), iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
110+
EXPECT_THAT(schema.Validate(2),
111+
iceberg::HasErrorMessage("unknown is not supported until v3"));
112+
EXPECT_THAT(schema.Validate(3), iceberg::IsOk());
113+
}
114+
115+
TEST(SchemaTest, RequiredUnknownFieldIsInvalid) {
116+
iceberg::Schema direct_schema(
117+
{iceberg::SchemaField::MakeRequired(1, "mystery", iceberg::unknown())});
118+
EXPECT_THAT(direct_schema.Validate(3),
119+
iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
120+
EXPECT_THAT(direct_schema.Validate(3),
121+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
122+
123+
auto schema = iceberg::Schema::Make(
124+
{iceberg::SchemaField::MakeRequired(1, "mystery", iceberg::unknown())}, 100,
125+
std::vector<int32_t>{});
126+
127+
EXPECT_THAT(schema, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
128+
EXPECT_THAT(schema,
129+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
130+
}
131+
132+
TEST(SchemaTest, RequiredNestedUnknownFieldIsInvalid) {
133+
auto schema = iceberg::Schema::Make(
134+
{iceberg::SchemaField::MakeOptional(
135+
1, "profile",
136+
iceberg::struct_({
137+
iceberg::SchemaField::MakeRequired(2, "mystery", iceberg::unknown()),
138+
}))},
139+
100, std::vector<int32_t>{});
140+
141+
EXPECT_THAT(schema, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
142+
EXPECT_THAT(schema,
143+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
144+
}
145+
99146
TEST(SchemaTest, IdentifierFields) {
100147
using iceberg::ErrorKind;
101148
using iceberg::Schema;

0 commit comments

Comments
 (0)