Skip to content

Commit dba8f92

Browse files
authored
feat: add snapshot util (#420)
1 parent 49c54d4 commit dba8f92

17 files changed

+1049
-29
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
8383
util/decimal.cc
8484
util/gzip_internal.cc
8585
util/murmurhash3_internal.cc
86+
util/snapshot_util.cc
8687
util/temporal_util.cc
8788
util/timepoint.cc
8889
util/truncate_util.cc

src/iceberg/avro/avro_stream_internal.cc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len) {
6666
}
6767

6868
void AvroInputStream::backup(size_t len) {
69-
ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available",
70-
len, buffer_pos_);
69+
ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
70+
"Cannot backup {} bytes, only {} bytes available", len,
71+
buffer_pos_);
7172

7273
buffer_pos_ -= len;
7374
byte_count_ -= len;
@@ -88,7 +89,8 @@ size_t AvroInputStream::byteCount() const { return byte_count_; }
8889

8990
void AvroInputStream::seek(int64_t position) {
9091
auto status = input_stream_->Seek(position);
91-
ICEBERG_CHECK(status.ok(), "Failed to seek to {}, got {}", position, status.ToString());
92+
ICEBERG_CHECK_OR_DIE(status.ok(), "Failed to seek to {}, got {}", position,
93+
status.ToString());
9294

9395
buffer_pos_ = 0;
9496
available_bytes_ = 0;
@@ -116,8 +118,9 @@ bool AvroOutputStream::next(uint8_t** data, size_t* len) {
116118
}
117119

118120
void AvroOutputStream::backup(size_t len) {
119-
ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available",
120-
len, buffer_pos_);
121+
ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
122+
"Cannot backup {} bytes, only {} bytes available", len,
123+
buffer_pos_);
121124
buffer_pos_ -= len;
122125
}
123126

@@ -126,12 +129,12 @@ uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_po
126129
void AvroOutputStream::flush() {
127130
if (buffer_pos_ > 0) {
128131
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
129-
ICEBERG_CHECK(status.ok(), "Write failed {}", status.ToString());
132+
ICEBERG_CHECK_OR_DIE(status.ok(), "Write failed {}", status.ToString());
130133
flushed_bytes_ += buffer_pos_;
131134
buffer_pos_ = 0;
132135
}
133136
auto status = output_stream_->Flush();
134-
ICEBERG_CHECK(status.ok(), "Flush failed {}", status.ToString());
137+
ICEBERG_CHECK_OR_DIE(status.ok(), "Flush failed {}", status.ToString());
135138
}
136139

137140
const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream()

src/iceberg/exception.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ICEBERG_EXPORT ExpressionError : public IcebergError {
4444
explicit ExpressionError(const std::string& what) : IcebergError(what) {}
4545
};
4646

47-
#define ICEBERG_CHECK(condition, ...) \
47+
#define ICEBERG_CHECK_OR_DIE(condition, ...) \
4848
do { \
4949
if (!(condition)) [[unlikely]] { \
5050
throw iceberg::IcebergError(std::format(__VA_ARGS__)); \

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ iceberg_sources = files(
105105
'util/decimal.cc',
106106
'util/gzip_internal.cc',
107107
'util/murmurhash3_internal.cc',
108+
'util/snapshot_util.cc',
108109
'util/temporal_util.cc',
109110
'util/timepoint.cc',
110111
'util/truncate_util.cc',

src/iceberg/table_metadata.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
7070
}
7171

7272
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
73-
const std::optional<int32_t>& schema_id) const {
73+
std::optional<int32_t> schema_id) const {
7474
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
75-
return schema->schema_id() == schema_id;
75+
return schema != nullptr && schema->schema_id() == schema_id;
7676
});
7777
if (iter == schemas.end()) {
7878
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
@@ -82,7 +82,7 @@ Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
8282

8383
Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
8484
auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) {
85-
return spec->spec_id() == default_spec_id;
85+
return spec != nullptr && spec->spec_id() == default_spec_id;
8686
});
8787
if (iter == partition_specs.end()) {
8888
return NotFound("Default partition spec is not found");
@@ -92,7 +92,7 @@ Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
9292

9393
Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
9494
auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) {
95-
return order->order_id() == default_sort_order_id;
95+
return order != nullptr && order->order_id() == default_sort_order_id;
9696
});
9797
if (iter == sort_orders.end()) {
9898
return NotFound("Default sort order is not found");
@@ -106,7 +106,7 @@ Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
106106

107107
Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t snapshot_id) const {
108108
auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) {
109-
return snapshot->snapshot_id == snapshot_id;
109+
return snapshot != nullptr && snapshot->snapshot_id == snapshot_id;
110110
});
111111
if (iter == snapshots.end()) {
112112
return NotFound("Snapshot with ID {} is not found", snapshot_id);

src/iceberg/table_metadata.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ struct ICEBERG_EXPORT TableMetadata {
128128
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
129129
/// \brief Get the current schema by ID, return NotFoundError if not found
130130
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
131-
const std::optional<int32_t>& schema_id) const;
131+
std::optional<int32_t> schema_id) const;
132132
/// \brief Get the current partition spec, return NotFoundError if not found
133133
Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
134134
/// \brief Get the current sort order, return NotFoundError if not found

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ add_iceberg_test(table_test
7070
SOURCES
7171
metrics_config_test.cc
7272
snapshot_test.cc
73+
snapshot_util_test.cc
7374
table_metadata_builder_test.cc
7475
table_requirement_test.cc
7576
table_requirements_test.cc

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ iceberg_tests = {
4747
'sources': files(
4848
'metrics_config_test.cc',
4949
'snapshot_test.cc',
50+
'snapshot_util_test.cc',
5051
'table_metadata_builder_test.cc',
5152
'table_requirement_test.cc',
5253
'table_requirements_test.cc',

0 commit comments

Comments
 (0)