Skip to content

Commit 69cf2d3

Browse files
shangxinliclaude
andauthored
feat: Implement PositionDeleteWriter for position delete files (#582)
Implement the PositionDeleteWriter following the same PIMPL pattern as DataWriter. The writer supports both buffered WriteDelete(file_path, pos) calls and direct Write(ArrowArray*) for pre-formed batches. Metadata reports content=kPositionDeletes with sort_order_id=nullopt per spec, and tracks referenced_data_file when all deletes target a single file. --------- Co-authored-by: shangxinli <shangxinli@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d43455d commit 69cf2d3

4 files changed

Lines changed: 366 additions & 8 deletions

File tree

src/iceberg/arrow_c_data_guard_internal.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
namespace iceberg::internal {
2323

2424
ArrowArrayGuard::~ArrowArrayGuard() {
25-
if (array_ != nullptr) {
25+
if (array_ != nullptr && array_->release != nullptr) {
2626
ArrowArrayRelease(array_);
2727
}
2828
}
2929

3030
ArrowSchemaGuard::~ArrowSchemaGuard() {
31-
if (schema_ != nullptr) {
31+
if (schema_ != nullptr && schema_->release != nullptr) {
3232
ArrowSchemaRelease(schema_);
3333
}
3434
}

src/iceberg/data/position_delete_writer.cc

Lines changed: 198 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,219 @@
1919

2020
#include "iceberg/data/position_delete_writer.h"
2121

22+
#include <map>
23+
#include <set>
24+
#include <vector>
25+
26+
#include <nanoarrow/nanoarrow.h>
27+
28+
#include "iceberg/arrow/nanoarrow_status_internal.h"
29+
#include "iceberg/arrow_c_data_guard_internal.h"
30+
#include "iceberg/file_writer.h"
31+
#include "iceberg/manifest/manifest_entry.h"
32+
#include "iceberg/metadata_columns.h"
33+
#include "iceberg/schema.h"
34+
#include "iceberg/schema_internal.h"
35+
#include "iceberg/util/macros.h"
36+
2237
namespace iceberg {
2338

2439
class PositionDeleteWriter::Impl {
2540
public:
41+
static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions options) {
42+
auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
43+
MetadataColumns::kDeleteFilePath,
44+
MetadataColumns::kDeleteFilePos,
45+
});
46+
47+
WriterOptions writer_options{
48+
.path = options.path,
49+
.schema = delete_schema,
50+
.io = options.io,
51+
.properties = WriterProperties::FromMap(options.properties),
52+
};
53+
54+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
55+
WriterFactoryRegistry::Open(options.format, writer_options));
56+
57+
return std::unique_ptr<Impl>(
58+
new Impl(std::move(options), std::move(delete_schema), std::move(writer)));
59+
}
60+
61+
Status Write(ArrowArray* data) {
62+
ICEBERG_PRECHECK(buffered_paths_.empty(),
63+
"Cannot write batch data when there are buffered deletes.");
64+
// TODO(anyone): Extract file paths from ArrowArray to update referenced_paths_.
65+
return writer_->Write(data);
66+
}
67+
68+
Status WriteDelete(std::string_view file_path, int64_t pos) {
69+
// TODO(anyone): check if the sort order of file_path and pos observes the spec.
70+
buffered_paths_.emplace_back(file_path);
71+
buffered_positions_.push_back(pos);
72+
referenced_paths_.emplace(file_path);
73+
74+
if (buffered_paths_.size() >= options_.flush_threshold) {
75+
return FlushBuffer();
76+
}
77+
return {};
78+
}
79+
80+
Result<int64_t> Length() const { return writer_->length(); }
81+
82+
Status Close() {
83+
if (closed_) {
84+
return {};
85+
}
86+
if (!buffered_paths_.empty()) {
87+
ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
88+
}
89+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
90+
closed_ = true;
91+
return {};
92+
}
93+
94+
Result<FileWriter::WriteResult> Metadata() {
95+
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
96+
97+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
98+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
99+
auto split_offsets = writer_->split_offsets();
100+
101+
// Filter out metrics for delete metadata columns (file_path, pos) to avoid
102+
// bloating the manifest, matching Java's PositionDeleteWriter behavior.
103+
// Always remove field counts; also remove bounds when referencing multiple files.
104+
const auto path_id = MetadataColumns::kDeleteFilePathColumnId;
105+
const auto pos_id = MetadataColumns::kDeleteFilePosColumnId;
106+
107+
metrics.value_counts.erase(path_id);
108+
metrics.value_counts.erase(pos_id);
109+
metrics.null_value_counts.erase(path_id);
110+
metrics.null_value_counts.erase(pos_id);
111+
metrics.nan_value_counts.erase(path_id);
112+
metrics.nan_value_counts.erase(pos_id);
113+
114+
if (referenced_paths_.size() > 1) {
115+
metrics.lower_bounds.erase(path_id);
116+
metrics.lower_bounds.erase(pos_id);
117+
metrics.upper_bounds.erase(path_id);
118+
metrics.upper_bounds.erase(pos_id);
119+
}
120+
121+
// Serialize literal bounds to binary format
122+
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
123+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
124+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
125+
lower_bounds_map[col_id] = std::move(serialized);
126+
}
127+
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
128+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
129+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
130+
upper_bounds_map[col_id] = std::move(serialized);
131+
}
132+
133+
// Set referenced_data_file if all deletes reference the same data file
134+
std::optional<std::string> referenced_data_file;
135+
if (referenced_paths_.size() == 1) {
136+
referenced_data_file = *referenced_paths_.begin();
137+
}
138+
139+
auto data_file = std::make_shared<DataFile>(DataFile{
140+
.content = DataFile::Content::kPositionDeletes,
141+
.file_path = options_.path,
142+
.file_format = options_.format,
143+
.partition = options_.partition,
144+
.record_count = metrics.row_count.value_or(-1),
145+
.file_size_in_bytes = length,
146+
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
147+
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
148+
.null_value_counts = {metrics.null_value_counts.begin(),
149+
metrics.null_value_counts.end()},
150+
.nan_value_counts = {metrics.nan_value_counts.begin(),
151+
metrics.nan_value_counts.end()},
152+
.lower_bounds = std::move(lower_bounds_map),
153+
.upper_bounds = std::move(upper_bounds_map),
154+
.split_offsets = std::move(split_offsets),
155+
.sort_order_id = std::nullopt,
156+
.referenced_data_file = std::move(referenced_data_file),
157+
});
158+
159+
FileWriter::WriteResult result;
160+
result.data_files.push_back(std::move(data_file));
161+
return result;
162+
}
163+
164+
private:
165+
Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> delete_schema,
166+
std::unique_ptr<Writer> writer)
167+
: options_(std::move(options)),
168+
delete_schema_(std::move(delete_schema)),
169+
writer_(std::move(writer)) {}
170+
171+
Status FlushBuffer() {
172+
ArrowSchema arrow_schema;
173+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
174+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
175+
176+
ArrowArray array;
177+
ArrowError error;
178+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
179+
ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
180+
internal::ArrowArrayGuard array_guard(&array);
181+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));
182+
183+
for (size_t i = 0; i < buffered_paths_.size(); ++i) {
184+
ArrowStringView path_view(buffered_paths_[i].data(),
185+
static_cast<int64_t>(buffered_paths_[i].size()));
186+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
187+
ArrowArrayAppendString(array.children[0], path_view));
188+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
189+
ArrowArrayAppendInt(array.children[1], buffered_positions_[i]));
190+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array));
191+
}
192+
193+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
194+
ArrowArrayFinishBuildingDefault(&array, &error), error);
195+
196+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array));
197+
198+
buffered_paths_.clear();
199+
buffered_positions_.clear();
200+
return {};
201+
}
202+
203+
PositionDeleteWriterOptions options_;
204+
std::shared_ptr<Schema> delete_schema_;
205+
std::unique_ptr<Writer> writer_;
206+
bool closed_ = false;
207+
std::vector<std::string> buffered_paths_;
208+
std::vector<int64_t> buffered_positions_;
209+
std::set<std::string> referenced_paths_;
26210
};
27211

212+
PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
213+
: impl_(std::move(impl)) {}
214+
28215
PositionDeleteWriter::~PositionDeleteWriter() = default;
29216

30-
Status PositionDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
217+
Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
218+
const PositionDeleteWriterOptions& options) {
219+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
220+
return std::unique_ptr<PositionDeleteWriter>(new PositionDeleteWriter(std::move(impl)));
221+
}
222+
223+
Status PositionDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31224

32225
Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) {
33-
return NotImplemented("");
226+
return impl_->WriteDelete(file_path, pos);
34227
}
35228

36-
Result<int64_t> PositionDeleteWriter::Length() const { return NotImplemented(""); }
229+
Result<int64_t> PositionDeleteWriter::Length() const { return impl_->Length(); }
37230

38-
Status PositionDeleteWriter::Close() { return NotImplemented(""); }
231+
Status PositionDeleteWriter::Close() { return impl_->Close(); }
39232

40233
Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
41-
return NotImplemented("");
234+
return impl_->Metadata();
42235
}
43236

44237
} // namespace iceberg

src/iceberg/data/position_delete_writer.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions {
4646
PartitionValues partition;
4747
FileFormatType format = FileFormatType::kParquet;
4848
std::shared_ptr<FileIO> io;
49-
std::shared_ptr<Schema> row_schema; // Optional row data schema
49+
int64_t flush_threshold = 1000; // Number of buffered deletes before auto-flush
5050
std::unordered_map<std::string, std::string> properties;
5151
};
5252

@@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
5555
public:
5656
~PositionDeleteWriter() override;
5757

58+
/// \brief Create a new PositionDeleteWriter instance.
59+
static Result<std::unique_ptr<PositionDeleteWriter>> Make(
60+
const PositionDeleteWriterOptions& options);
61+
5862
Status Write(ArrowArray* data) override;
5963
Status WriteDelete(std::string_view file_path, int64_t pos);
6064
Result<int64_t> Length() const override;
@@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
6468
private:
6569
class Impl;
6670
std::unique_ptr<Impl> impl_;
71+
72+
explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
6773
};
6874

6975
} // namespace iceberg

0 commit comments

Comments
 (0)