Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

#include "iceberg/expression/binder.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/residual_evaluator.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_group.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/timepoint.h"
Expand Down Expand Up @@ -294,6 +296,26 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
return MakeArrowArrayStream(std::move(reader));
}

// ChangelogScanTask implementation

int64_t ChangelogScanTask::size_bytes() const {
int64_t total_size = data_file_->file_size_in_bytes;
for (const auto& delete_file : delete_files_) {
ICEBERG_DCHECK(delete_file->content_size_in_bytes.has_value(),
"Delete file content size must be available");
total_size +=
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value()
: delete_file->file_size_in_bytes);
}
return total_size;
}

int32_t ChangelogScanTask::files_count() const { return 1 + delete_files_.size(); }

int64_t ChangelogScanTask::estimated_row_count() const {
return data_file_->record_count;
}

// Generic template implementation for Make
template <typename ScanType>
Result<std::unique_ptr<TableScanBuilder<ScanType>>> TableScanBuilder<ScanType>::Make(
Expand Down Expand Up @@ -747,11 +769,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFi
// IncrementalChangelogScan implementation

Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalChangelogScan is not implemented");
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<IncrementalChangelogScan>(new IncrementalChangelogScan(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}

Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
Expand All @@ -762,7 +786,125 @@ IncrementalChangelogScan::PlanFiles() const {
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
ICEBERG_ASSIGN_OR_RAISE(
auto ancestors_snapshots,
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
from_snapshot_id_exclusive));

std::vector<std::pair<std::shared_ptr<Snapshot>, std::unique_ptr<SnapshotCache>>>
changelog_snapshots;

for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
auto operation = snapshot->Operation();
if (!operation.has_value() || operation.value() != DataOperation::kReplace) {
auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
snapshot_cache->DeleteManifests(io_));
if (!delete_manifests.empty()) {
return NotSupported(
"Delete files are currently not supported in changelog scans");
}
changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
}
}
if (changelog_snapshots.empty()) {
return std::vector<std::shared_ptr<ChangelogScanTask>>{};
}

std::unordered_set<int64_t> snapshot_ids;
std::unordered_map<int64_t, int32_t> snapshot_ordinals;
for (const auto& snapshot : changelog_snapshots) {
snapshot_ids.insert(snapshot.first->snapshot_id);
snapshot_ordinals[snapshot.first->snapshot_id] = snapshot_ordinals.size();
}

std::vector<ManifestFile> data_manifests;
std::unordered_set<std::string> seen_manifest_paths;
for (const auto& snapshot : changelog_snapshots) {
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot.second->DataManifests(io_));
for (auto& manifest : manifests) {
if (snapshot_ids.contains(manifest.added_snapshot_id) &&
seen_manifest_paths.insert(manifest.manifest_path).second) {
data_manifests.push_back(manifest);
}
}
}
if (data_manifests.empty()) {
return std::vector<std::shared_ptr<ChangelogScanTask>>{};
}

TableMetadataCache metadata_cache(metadata_.get());
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());

ICEBERG_ASSIGN_OR_RAISE(
auto manifest_group,
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests),
/*delete_manifests=*/{}));

manifest_group->CaseSensitive(context_.case_sensitive)
.Select(ScanColumns())
.FilterData(filter())
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
return entry.snapshot_id.has_value() &&
snapshot_ids.contains(entry.snapshot_id.value());
})
.IgnoreExisting()
.ColumnsToKeepStats(context_.columns_to_keep_stats);

if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}

auto create_tasks_func =
[&snapshot_ordinals](
std::vector<ManifestEntry>&& entries,
const TaskContext& ctx) -> Result<std::vector<std::shared_ptr<ScanTask>>> {
std::vector<std::shared_ptr<ScanTask>> tasks;
tasks.reserve(entries.size());

for (auto& entry : entries) {
ICEBERG_PRECHECK(entry.snapshot_id.has_value() && entry.data_file,
"Invalid manifest entry with missing snapshot id or data file");

int64_t commit_snapshot_id = entry.snapshot_id.value();
auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
ICEBERG_PRECHECK(ordinal_it != snapshot_ordinals.end(),
"Invalid manifest entry with missing snapshot ordinal");

int32_t change_ordinal = ordinal_it->second;

if (ctx.drop_stats) {
ContentFileUtil::DropAllStats(*entry.data_file);
} else if (!ctx.columns_to_keep_stats.empty()) {
ContentFileUtil::DropUnselectedStats(*entry.data_file, ctx.columns_to_keep_stats);
}

ICEBERG_ASSIGN_OR_RAISE(auto residual,
ctx.residuals->ResidualFor(entry.data_file->partition));

switch (entry.status) {
case ManifestStatus::kAdded:
tasks.push_back(std::make_shared<AddedRowsScanTask>(
change_ordinal, commit_snapshot_id, std::move(entry.data_file),
std::vector<std::shared_ptr<DataFile>>{}, std::move(residual)));
break;
case ManifestStatus::kDeleted:
tasks.push_back(std::make_shared<DeletedDataFileScanTask>(
change_ordinal, commit_snapshot_id, std::move(entry.data_file),
std::vector<std::shared_ptr<DataFile>>{}, std::move(residual)));
break;
case ManifestStatus::kExisting:
return InvalidArgument("Unexpected entry status: EXISTING");
}
}
return tasks;
};

ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->Plan(create_tasks_func));
return tasks | std::views::transform([](const auto& task) {
return std::static_pointer_cast<ChangelogScanTask>(task);
}) |
std::ranges::to<std::vector>();
}

} // namespace iceberg
127 changes: 123 additions & 4 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,122 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
std::shared_ptr<Expression> residual_filter_;
};

enum class ChangelogOperation : uint8_t {
kInsert,
kDelete,
kUpdateBefore,
kUpdateAfter,
};

/// \brief A scan task for reading changelog entries between snapshots.
class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
public:
/// \brief Construct an AddedRowsScanTask.
///
/// \param change_ordinal Position in the changelog order (0-based).
/// \param commit_snapshot_id The snapshot ID that committed this change.
/// \param data_file The data file containing the added rows.
/// \param delete_files Delete files that apply to this data file.
/// \param residual_filter Optional residual filter to apply after reading.
ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
std::shared_ptr<DataFile> data_file,
std::vector<std::shared_ptr<DataFile>> delete_files = {},
std::shared_ptr<Expression> residual_filter = nullptr)
: change_ordinal_(change_ordinal),
commit_snapshot_id_(commit_snapshot_id),
data_file_(std::move(data_file)),
delete_files_(std::move(delete_files)),
residual_filter_(std::move(residual_filter)) {}

Kind kind() const override { return Kind::kChangelogScanTask; }
// TODO(): Return actual values once member fields are implemented
int64_t size_bytes() const override { return 0; }
int32_t files_count() const override { return 0; }
int64_t estimated_row_count() const override { return 0; }

int64_t size_bytes() const override;
int32_t files_count() const override;
int64_t estimated_row_count() const override;

virtual ChangelogOperation operation() const = 0;

/// \brief The position of this change in the changelog order (0-based).
int32_t change_ordinal() const { return change_ordinal_; }

/// \brief The snapshot ID that committed this change.
int64_t commit_snapshot_id() const { return commit_snapshot_id_; }

/// \brief Residual filter to apply after reading.
const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }

protected:
int32_t change_ordinal_;
int64_t commit_snapshot_id_;
std::shared_ptr<DataFile> data_file_;
std::vector<std::shared_ptr<DataFile>> delete_files_;
std::shared_ptr<Expression> residual_filter_;
};

/// \brief A scan task for inserts generated by adding a data file to the table.
///
/// This task represents data files that were added to the table, along with any
/// delete files that should be applied when reading the data.
///
/// Added data files may have matching delete files. This may happen if a
/// matching position delete file is committed in the same snapshot or if changes
/// for multiple snapshots are squashed together.
///
/// Suppose snapshot S1 adds data files F1, F2, F3 and a position delete file,
/// D1, that marks particular records in F1 as deleted. A scan for changes
/// generated by S1 should include the following tasks:
/// - AddedRowsScanTask(file=F1, deletes=[D1], snapshot=S1)
/// - AddedRowsScanTask(file=F2, deletes=[], snapshot=S1)
/// - AddedRowsScanTask(file=F3, deletes=[], snapshot=S1)
///
/// Readers consuming these tasks should produce added records with metadata
/// like change ordinal and commit snapshot ID.
class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {
public:
using ChangelogScanTask::ChangelogScanTask;

ChangelogOperation operation() const override { return ChangelogOperation::kInsert; }

/// \brief The data file containing the added rows.
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }

/// \brief A list of delete files to apply when reading the data file in this task.
///
/// @return A list of delete files to apply
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
return delete_files_;
}
};

/// \brief A scan task for deletes generated by removing a data file from the table.
///
/// All historical delete files added earlier must be applied while reading the data file.
/// This is required to output only those data records that were live when the data file
/// was removed.
///
/// Suppose snapshot S1 contains data files F1, F2, F3. Then snapshot S2 adds a position
/// delete file, D1, that deletes records from F2 and snapshot S3 removes F2 entirely. A
/// scan for changes generated by S3 should include the following task:
/// - DeletedDataFileScanTask(file=F2, existing-deletes=[D1], snapshot=S3)
///
/// Readers consuming these tasks should produce deleted records with metadata like
/// change ordinal and commit snapshot ID.
class ICEBERG_EXPORT DeletedDataFileScanTask : public ChangelogScanTask {
public:
using ChangelogScanTask::ChangelogScanTask;

ChangelogOperation operation() const override { return ChangelogOperation::kDelete; }

/// \brief The data file that was deleted.
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }

/// \brief A list of previously added delete files to apply when reading the
/// data file in this task.
///
/// \return A list of delete files to apply
const std::vector<std::shared_ptr<DataFile>>& existing_deletes() const {
return delete_files_;
}
};

namespace internal {
Expand All @@ -133,6 +241,17 @@ struct TableScanContext {

// Validate the context parameters to see if they have conflicts.
[[nodiscard]] Status Validate() const;

/// \brief Returns true if this scan is a current lineage scan, which means it does not
/// specify from/to snapshot IDs.
bool IsScanCurrentLineage() const;

/// \brief Get the snapshot ID to scan up to (inclusive) based on the context.
Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;

/// \brief Get the snapshot ID to scan from (exclusive) based on the context.
Result<std::optional<int64_t>> FromSnapshotIdExclusive(
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
};

} // namespace internal
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
file_scan_task_test.cc
incremental_append_scan_test.cc
incremental_changelog_scan_test.cc
table_scan_test.cc)

add_iceberg_test(table_update_test
Expand Down
Loading
Loading