Skip to content

Commit 1afe65c

Browse files
authored
feat: Implement EqualityDeleteWriter for equality delete files (#583)
Implement the EqualityDeleteWriter following the same PIMPL pattern as DataWriter. The writer accepts Arrow data matching the equality delete schema (columns for the equality field values) and produces metadata with content=kEqualityDeletes, equality_ids set from options, and sort_order_id propagated from options.
1 parent 69cf2d3 commit 1afe65c

6 files changed

Lines changed: 269 additions & 19 deletions

File tree

src/iceberg/data/data_writer.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "iceberg/file_writer.h"
2525
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/partition_spec.h"
2627
#include "iceberg/util/macros.h"
2728

2829
namespace iceberg {
@@ -43,18 +44,11 @@ class DataWriter::Impl {
4344
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
4445
}
4546

46-
Status Write(ArrowArray* data) {
47-
ICEBERG_DCHECK(writer_, "Writer not initialized");
48-
return writer_->Write(data);
49-
}
47+
Status Write(ArrowArray* data) { return writer_->Write(data); }
5048

51-
Result<int64_t> Length() const {
52-
ICEBERG_DCHECK(writer_, "Writer not initialized");
53-
return writer_->length();
54-
}
49+
Result<int64_t> Length() const { return writer_->length(); }
5550

5651
Status Close() {
57-
ICEBERG_DCHECK(writer_, "Writer not initialized");
5852
if (closed_) {
5953
// Idempotent: no-op if already closed
6054
return {};
@@ -100,6 +94,8 @@ class DataWriter::Impl {
10094
.upper_bounds = std::move(upper_bounds_map),
10195
.split_offsets = std::move(split_offsets),
10296
.sort_order_id = options_.sort_order_id,
97+
.partition_spec_id =
98+
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
10399
});
104100

105101
FileWriter::WriteResult result;

src/iceberg/data/equality_delete_writer.cc

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,127 @@
1919

2020
#include "iceberg/data/equality_delete_writer.h"
2121

22+
#include <map>
23+
24+
#include "iceberg/file_writer.h"
25+
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/partition_spec.h"
27+
#include "iceberg/util/macros.h"
28+
2229
namespace iceberg {
2330

2431
class EqualityDeleteWriter::Impl {
2532
public:
33+
static Result<std::unique_ptr<Impl>> Make(EqualityDeleteWriterOptions options) {
34+
WriterOptions writer_options{
35+
.path = options.path,
36+
.schema = options.schema,
37+
.io = options.io,
38+
.properties = WriterProperties::FromMap(options.properties),
39+
};
40+
41+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
42+
WriterFactoryRegistry::Open(options.format, writer_options));
43+
44+
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
45+
}
46+
47+
Status Write(ArrowArray* data) { return writer_->Write(data); }
48+
49+
Result<int64_t> Length() const { return writer_->length(); }
50+
51+
Status Close() {
52+
if (closed_) {
53+
return {};
54+
}
55+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
56+
closed_ = true;
57+
return {};
58+
}
59+
60+
Result<FileWriter::WriteResult> Metadata() {
61+
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
62+
63+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
64+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
65+
auto split_offsets = writer_->split_offsets();
66+
67+
// Serialize literal bounds to binary format
68+
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
69+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
70+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
71+
lower_bounds_map[col_id] = std::move(serialized);
72+
}
73+
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
74+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
75+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
76+
upper_bounds_map[col_id] = std::move(serialized);
77+
}
78+
79+
// TODO(anyone): add encryption key metadata for encrypted delete files
80+
auto data_file = std::make_shared<DataFile>(DataFile{
81+
.content = DataFile::Content::kEqualityDeletes,
82+
.file_path = options_.path,
83+
.file_format = options_.format,
84+
.partition = options_.partition,
85+
.record_count = metrics.row_count.value_or(-1),
86+
.file_size_in_bytes = length,
87+
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
88+
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
89+
.null_value_counts = {metrics.null_value_counts.begin(),
90+
metrics.null_value_counts.end()},
91+
.nan_value_counts = {metrics.nan_value_counts.begin(),
92+
metrics.nan_value_counts.end()},
93+
.lower_bounds = std::move(lower_bounds_map),
94+
.upper_bounds = std::move(upper_bounds_map),
95+
.split_offsets = std::move(split_offsets),
96+
.equality_ids = options_.equality_field_ids,
97+
.sort_order_id = options_.sort_order_id,
98+
.partition_spec_id =
99+
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
100+
});
101+
102+
FileWriter::WriteResult result;
103+
result.data_files.push_back(std::move(data_file));
104+
return result;
105+
}
106+
107+
std::span<const int32_t> equality_field_ids() const {
108+
return options_.equality_field_ids;
109+
}
110+
111+
private:
112+
Impl(EqualityDeleteWriterOptions options, std::unique_ptr<Writer> writer)
113+
: options_(std::move(options)), writer_(std::move(writer)) {}
114+
115+
EqualityDeleteWriterOptions options_;
116+
std::unique_ptr<Writer> writer_;
117+
bool closed_ = false;
26118
};
27119

120+
EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
121+
: impl_(std::move(impl)) {}
122+
28123
EqualityDeleteWriter::~EqualityDeleteWriter() = default;
29124

30-
Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
125+
Result<std::unique_ptr<EqualityDeleteWriter>> EqualityDeleteWriter::Make(
126+
const EqualityDeleteWriterOptions& options) {
127+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
128+
return std::unique_ptr<EqualityDeleteWriter>(new EqualityDeleteWriter(std::move(impl)));
129+
}
130+
131+
Status EqualityDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31132

32-
Result<int64_t> EqualityDeleteWriter::Length() const { return NotImplemented(""); }
133+
Result<int64_t> EqualityDeleteWriter::Length() const { return impl_->Length(); }
33134

34-
Status EqualityDeleteWriter::Close() { return NotImplemented(""); }
135+
Status EqualityDeleteWriter::Close() { return impl_->Close(); }
35136

36137
Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
37-
return NotImplemented("");
138+
return impl_->Metadata();
38139
}
39140

40-
std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const { return {}; }
141+
std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const {
142+
return impl_->equality_field_ids();
143+
}
41144

42145
} // namespace iceberg

src/iceberg/data/equality_delete_writer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,18 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
5050
std::vector<int32_t> equality_field_ids;
5151
std::optional<int32_t> sort_order_id;
5252
std::unordered_map<std::string, std::string> properties;
53+
// TODO(anyone): add key_metadata for encryption
5354
};
5455

5556
/// \brief Writer for Iceberg equality delete files.
5657
class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
5758
public:
5859
~EqualityDeleteWriter() override;
5960

61+
/// \brief Create a new EqualityDeleteWriter instance.
62+
static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
63+
const EqualityDeleteWriterOptions& options);
64+
6065
Status Write(ArrowArray* data) override;
6166
Result<int64_t> Length() const override;
6267
Status Close() override;
@@ -67,6 +72,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
6772
private:
6873
class Impl;
6974
std::unique_ptr<Impl> impl_;
75+
76+
explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
7077
};
7178

7279
} // namespace iceberg

src/iceberg/data/position_delete_writer.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/file_writer.h"
3131
#include "iceberg/manifest/manifest_entry.h"
3232
#include "iceberg/metadata_columns.h"
33+
#include "iceberg/partition_spec.h"
3334
#include "iceberg/schema.h"
3435
#include "iceberg/schema_internal.h"
3536
#include "iceberg/util/macros.h"
@@ -154,6 +155,8 @@ class PositionDeleteWriter::Impl {
154155
.split_offsets = std::move(split_offsets),
155156
.sort_order_id = std::nullopt,
156157
.referenced_data_file = std::move(referenced_data_file),
158+
.partition_spec_id =
159+
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt,
157160
});
158161

159162
FileWriter::WriteResult result;

src/iceberg/test/data_writer_test.cc

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2929
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/data/equality_delete_writer.h"
3031
#include "iceberg/data/position_delete_writer.h"
3132
#include "iceberg/file_format.h"
3233
#include "iceberg/manifest/manifest_entry.h"
@@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
423424
EXPECT_GT(data_file->file_size_in_bytes, 0);
424425
}
425426

427+
class EqualityDeleteWriterTest : public DataWriterTest {
428+
protected:
429+
EqualityDeleteWriterOptions MakeDeleteOptions(
430+
std::vector<int32_t> equality_field_ids = {1, 2},
431+
std::optional<int32_t> sort_order_id = std::nullopt) {
432+
return EqualityDeleteWriterOptions{
433+
.path = "test_eq_deletes.parquet",
434+
.schema = schema_,
435+
.spec = partition_spec_,
436+
.partition = PartitionValues{},
437+
.format = FileFormatType::kParquet,
438+
.io = file_io_,
439+
.equality_field_ids = std::move(equality_field_ids),
440+
.sort_order_id = sort_order_id,
441+
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
442+
};
443+
}
444+
445+
void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) {
446+
auto test_data = CreateTestData();
447+
ArrowArray arrow_array;
448+
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
449+
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
450+
}
451+
};
452+
453+
TEST_F(EqualityDeleteWriterTest, WriteAndClose) {
454+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
455+
ASSERT_THAT(writer_result, IsOk());
456+
auto writer = std::move(writer_result.value());
457+
458+
WriteTestDataToEqualityWriter(writer.get());
459+
460+
auto length_result = writer->Length();
461+
ASSERT_THAT(length_result, IsOk());
462+
EXPECT_GT(length_result.value(), 0);
463+
464+
ASSERT_THAT(writer->Close(), IsOk());
465+
}
466+
467+
TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) {
468+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
469+
ASSERT_THAT(writer_result, IsOk());
470+
auto writer = std::move(writer_result.value());
471+
472+
WriteTestDataToEqualityWriter(writer.get());
473+
ASSERT_THAT(writer->Close(), IsOk());
474+
475+
auto metadata_result = writer->Metadata();
476+
ASSERT_THAT(metadata_result, IsOk());
477+
478+
const auto& write_result = metadata_result.value();
479+
ASSERT_EQ(write_result.data_files.size(), 1);
480+
481+
const auto& data_file = write_result.data_files[0];
482+
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
483+
EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet");
484+
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
485+
EXPECT_GT(data_file->file_size_in_bytes, 0);
486+
487+
// Partition spec id must be set
488+
ASSERT_TRUE(data_file->partition_spec_id.has_value());
489+
EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId);
490+
491+
// Equality field ids must be set
492+
ASSERT_EQ(data_file->equality_ids.size(), 2);
493+
EXPECT_EQ(data_file->equality_ids[0], 1);
494+
EXPECT_EQ(data_file->equality_ids[1], 2);
495+
}
496+
497+
TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) {
498+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
499+
ASSERT_THAT(writer_result, IsOk());
500+
auto writer = std::move(writer_result.value());
501+
502+
auto metadata_result = writer->Metadata();
503+
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
504+
EXPECT_THAT(metadata_result,
505+
HasErrorMessage("Cannot get metadata before closing the writer"));
506+
}
507+
508+
TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) {
509+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
510+
ASSERT_THAT(writer_result, IsOk());
511+
auto writer = std::move(writer_result.value());
512+
513+
WriteTestDataToEqualityWriter(writer.get());
514+
515+
ASSERT_THAT(writer->Close(), IsOk());
516+
ASSERT_THAT(writer->Close(), IsOk());
517+
ASSERT_THAT(writer->Close(), IsOk());
518+
}
519+
520+
TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) {
521+
const int32_t sort_order_id = 7;
522+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1}, sort_order_id));
523+
ASSERT_THAT(writer_result, IsOk());
524+
auto writer = std::move(writer_result.value());
525+
526+
WriteTestDataToEqualityWriter(writer.get());
527+
ASSERT_THAT(writer->Close(), IsOk());
528+
529+
auto metadata_result = writer->Metadata();
530+
ASSERT_THAT(metadata_result, IsOk());
531+
const auto& data_file = metadata_result.value().data_files[0];
532+
ASSERT_TRUE(data_file->sort_order_id.has_value());
533+
EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
534+
}
535+
536+
TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) {
537+
std::vector<int32_t> field_ids = {1, 2, 3};
538+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids));
539+
ASSERT_THAT(writer_result, IsOk());
540+
auto writer = std::move(writer_result.value());
541+
542+
auto ids = writer->equality_field_ids();
543+
ASSERT_EQ(ids.size(), 3);
544+
EXPECT_EQ(ids[0], 1);
545+
EXPECT_EQ(ids[1], 2);
546+
EXPECT_EQ(ids[2], 3);
547+
}
548+
549+
TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) {
550+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
551+
ASSERT_THAT(writer_result, IsOk());
552+
auto writer = std::move(writer_result.value());
553+
554+
WriteTestDataToEqualityWriter(writer.get());
555+
WriteTestDataToEqualityWriter(writer.get());
556+
ASSERT_THAT(writer->Close(), IsOk());
557+
558+
auto metadata_result = writer->Metadata();
559+
ASSERT_THAT(metadata_result, IsOk());
560+
const auto& data_file = metadata_result.value().data_files[0];
561+
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
562+
EXPECT_GT(data_file->file_size_in_bytes, 0);
563+
}
564+
426565
} // namespace iceberg

src/iceberg/type_fwd.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,22 @@ class ManifestEvaluator;
142142
class ResidualEvaluator;
143143
class StrictMetricsEvaluator;
144144

145-
/// \brief Scan.
145+
/// \brief Scan task.
146146
class ChangelogScanTask;
147-
class DataTableScan;
148147
class FileScanTask;
148+
class ScanTask;
149+
150+
/// \brief Table scan
151+
class DataTableScan;
149152
template <typename ScanTaskType>
150153
class IncrementalScan;
151154
class IncrementalAppendScan;
152155
class IncrementalChangelogScan;
153-
class ScanTask;
154156
class TableScan;
157+
158+
/// \brief Scan builder.
155159
template <typename ScanType>
156160
class TableScanBuilder;
157-
158-
// Type aliases for incremental scan builders
159161
using DataTableScanBuilder = TableScanBuilder<DataTableScan>;
160162
using IncrementalAppendScanBuilder = TableScanBuilder<IncrementalAppendScan>;
161163
using IncrementalChangelogScanBuilder = TableScanBuilder<IncrementalChangelogScan>;

0 commit comments

Comments
 (0)