Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ set(ICEBERG_SOURCES
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
manifest/v2_metadata.cc
manifest/v3_metadata.cc
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ICEBERG_EXPORT Writer {
virtual Result<Metrics> metrics() = 0;

/// \brief Get the file length.
/// Only valid after the file is closed.
/// This can be called while the writer is still open or after the file is closed.
virtual Result<int64_t> length() = 0;

/// \brief Returns a list of recommended split locations, if applicable, empty
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ enum class ManifestStatus {

/// \brief Get the relative manifest status type from int
ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
int status) noexcept {
int32_t status) noexcept {
switch (status) {
case 0:
return ManifestStatus::kExisting;
Expand Down Expand Up @@ -387,7 +387,7 @@ ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexc

/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
int content) noexcept {
int32_t content) noexcept {
switch (content) {
case 0:
return DataFile::Content::kData;
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }

Result<Metrics> ManifestWriter::metrics() const { return writer_->metrics(); }

Result<int64_t> ManifestWriter::length() const { return writer_->length(); }

Result<ManifestFile> ManifestWriter::ToManifestFile() const {
if (!closed_) [[unlikely]] {
return Invalid("Cannot get ManifestFile before closing the writer.");
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/manifest/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class ICEBERG_EXPORT ManifestWriter {
/// \note Only valid after the file is closed.
Result<Metrics> metrics() const;

/// \brief Get the current length of the manifest file in bytes.
/// \return The current length of the file, or an error if the operation fails.
Result<int64_t> length() const;

/// \brief Get the ManifestFile object.
/// \note Only valid after the file is closed.
Result<ManifestFile> ToManifestFile() const;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/manifest/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install_headers(
'manifest_list.h',
'manifest_reader.h',
'manifest_writer.h',
'rolling_manifest_writer.h',
],
subdir: 'iceberg/manifest',
)
121 changes: 121 additions & 0 deletions src/iceberg/manifest/rolling_manifest_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/manifest/rolling_manifest_writer.h"

#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"
#include "iceberg/util/macros.h"

namespace iceberg {

RollingManifestWriter::RollingManifestWriter(
ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes)
: manifest_writer_factory_(std::move(manifest_writer_factory)),
target_file_size_in_bytes_(target_file_size_in_bytes) {}

RollingManifestWriter::~RollingManifestWriter() {
// Ensure we close the current writer if not already closed
if (!closed_) {
(void)CloseCurrentWriter();
}
}

Status RollingManifestWriter::WriteAddedEntry(
std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(
writer->WriteAddedEntry(std::move(file), data_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::WriteExistingEntry(
std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(
std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::WriteDeletedEntry(
std::shared_ptr<DataFile> file, int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number) {
ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter());
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(
std::move(file), data_sequence_number, file_sequence_number));
current_file_rows_++;
return {};
}

Status RollingManifestWriter::Close() {
if (!closed_) {
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
closed_ = true;
}
return {};
}

Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const {
if (!closed_) {
return Invalid("Cannot get ManifestFile list from unclosed writer");
}
return manifest_files_;
}

Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() {
if (current_writer_ == nullptr) {
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
} else if (ShouldRollToNewFile()) {
ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter());
ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_());
}

return current_writer_.get();
}

bool RollingManifestWriter::ShouldRollToNewFile() const {
if (current_writer_ == nullptr) {
return false;
}
// Roll when row count is a multiple of ROWS_DIVISOR and file size >= target
if (current_file_rows_ % kRowsDivisor == 0) {
auto length_result = current_writer_->length();
if (length_result.has_value()) {
return length_result.value() >= target_file_size_in_bytes_;
}
// If we can't get the length, don't roll
}
return false;
}

Status RollingManifestWriter::CloseCurrentWriter() {
if (current_writer_ != nullptr) {
ICEBERG_RETURN_UNEXPECTED(current_writer_->Close());
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, current_writer_->ToManifestFile());
manifest_files_.push_back(std::move(manifest_file));
current_writer_.reset();
current_file_rows_ = 0;
}
return {};
}

} // namespace iceberg
129 changes: 129 additions & 0 deletions src/iceberg/manifest/rolling_manifest_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest/rolling_manifest_writer.h
/// Rolling manifest writer that can produce multiple manifest files.

#include <functional>
#include <memory>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/result.h"

namespace iceberg {

/// \brief A rolling manifest writer that can produce multiple manifest files.
///
/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest
/// files.
class ICEBERG_EXPORT RollingManifestWriter {
public:
/// \brief Factory function type for creating ManifestWriter instances.
using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>;

/// \brief Construct a rolling manifest writer.
/// \param manifest_writer_factory Factory function to create new ManifestWriter
/// instances.
/// \param target_file_size_in_bytes Target file size in bytes. When the current
/// file reaches this size (and row count is a multiple of 250), a new file
/// will be created.
RollingManifestWriter(ManifestWriterFactory manifest_writer_factory,
int64_t target_file_size_in_bytes);

~RollingManifestWriter();

/// \brief Add an added entry for a file.
///
/// \param file a data file
/// \return Status::OK() if the entry was written successfully
/// \note The entry's snapshot ID will be this manifest's snapshot ID. The
/// entry's data sequence number will be the provided data sequence number.
/// The entry's file sequence number will be assigned at commit.
Status WriteAddedEntry(std::shared_ptr<DataFile> file,
std::optional<int64_t> data_sequence_number = std::nullopt);

/// \brief Add an existing entry for a file.
///
/// \param file an existing data file
/// \param file_snapshot_id snapshot ID when the data file was added to the table
/// \param data_sequence_number a data sequence number of the file (assigned when
/// the file was added)
/// \param file_sequence_number a file sequence number (assigned when the file
/// was added)
/// \return Status::OK() if the entry was written successfully
/// \note The original data and file sequence numbers, snapshot ID, which were
/// assigned at commit, must be preserved when adding an existing entry.
Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id,
int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number = std::nullopt);

/// \brief Add a delete entry for a file.
///
/// \param file a deleted data file
/// \param data_sequence_number a data sequence number of the file (assigned when
/// the file was added)
/// \param file_sequence_number a file sequence number (assigned when the file
/// was added)
/// \return Status::OK() if the entry was written successfully
/// \note The entry's snapshot ID will be this manifest's snapshot ID. However,
/// the original data and file sequence numbers of the file must be preserved
/// when the file is marked as deleted.
Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number,
std::optional<int64_t> file_sequence_number = std::nullopt);

/// \brief Close the rolling manifest writer.
Status Close();

/// \brief Get the list of manifest files produced by this writer.
/// \return A vector of ManifestFile objects
/// \note Only valid after the writer is closed.
Result<std::vector<ManifestFile>> ToManifestFiles() const;

private:
/// \brief Get or create the current writer, rolling to a new file if needed.
/// \return The current ManifestWriter, or an error if creation fails
Result<ManifestWriter*> CurrentWriter();

/// \brief Check if we should roll to a new file.
///
/// This method checks if the current file has reached the target size
/// or the number of rows has reached the threshold. If so, it rolls to a new file.
bool ShouldRollToNewFile() const;

/// \brief Close the current writer and add its ManifestFile to the list.
Status CloseCurrentWriter();

static constexpr int64_t kRowsDivisor = 250;

ManifestWriterFactory manifest_writer_factory_;
int64_t target_file_size_in_bytes_;
std::vector<ManifestFile> manifest_files_;

int64_t current_file_rows_{0};
std::unique_ptr<ManifestWriter> current_writer_{nullptr};
bool closed_{false};
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ iceberg_sources = files(
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
'manifest/v2_metadata.cc',
'manifest/v3_metadata.cc',
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ if(ICEBERG_BUILD_BUNDLE)
manifest_list_versions_test.cc
manifest_reader_stats_test.cc
manifest_reader_test.cc
manifest_writer_versions_test.cc)
manifest_writer_versions_test.cc
rolling_manifest_writer_test.cc)

add_iceberg_test(arrow_test
USE_BUNDLE
Expand Down
Loading
Loading