Skip to content

Commit 9640746

Browse files
committed
fix(parquet): fix parquet writer metrics conversion
1 parent 97ea870 commit 9640746

11 files changed

Lines changed: 387 additions & 122 deletions

src/iceberg/file_writer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class ICEBERG_EXPORT WriterProperties : public ConfigBase<WriterProperties> {
6060
"zstd"};
6161
inline static Entry<std::string> kParquetCompressionLevel{
6262
"write.parquet.compression-level", ""};
63+
/// \brief Maximum number of rows in each Parquet row group.
64+
inline static Entry<int64_t> kParquetMaxRowGroupRows{"write.parquet.max-row-group-rows",
65+
1024 * 1024};
6366

6467
/// TODO(gangwu): add table properties with write.avro|parquet|orc.*
6568

src/iceberg/metrics.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace iceberg {
3636
/// lower/upper bounds for a specific field identified by its field_id.
3737
struct ICEBERG_EXPORT FieldMetrics {
3838
/// \brief The field ID this metrics belongs to.
39-
int32_t field_id;
39+
int32_t field_id = -1;
4040

4141
/// \brief The total number of values (including nulls) for this field.
4242
/// A negative value indicates the count is unknown.

src/iceberg/metrics_config.cc

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,21 +125,20 @@ const std::shared_ptr<MetricsConfig>& MetricsConfig::Default() {
125125

126126
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) {
127127
ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
128-
auto sort_order = table.sort_order();
129-
return MakeInternal(table.properties(), *schema,
130-
*sort_order.value_or(SortOrder::Unsorted()));
128+
auto order = table.sort_order().value_or(SortOrder::Unsorted());
129+
return MakeInternal(table.properties(), schema.get(), order.get());
131130
}
132131

133132
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
134133
std::unordered_map<std::string, std::string> properties) {
135134
// Create a minimal TableProperties wrapper for the properties
136135
TableProperties props = TableProperties::FromMap(std::move(properties));
137136

138-
return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
137+
return MakeInternal(props, /*schema=*/nullptr, /*order=*/nullptr);
139138
}
140139

141140
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
142-
const TableProperties& props, const Schema& schema, const SortOrder& order) {
141+
const TableProperties& props, const Schema* schema, const SortOrder* order) {
143142
ColumnModeMap column_modes;
144143

145144
MetricsMode default_mode = kDefaultMetricsMode;
@@ -148,16 +147,16 @@ Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
148147
props.Get(TableProperties::kDefaultWriteMetricsMode);
149148
ICEBERG_ASSIGN_OR_RAISE(default_mode,
150149
ParseMode(configured_metrics_mode, kDefaultMetricsMode));
151-
} else {
150+
} else if (schema != nullptr) {
152151
int32_t max_inferred_columns = MaxInferredColumns(props);
153152
GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
154-
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
153+
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(*schema));
155154
auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
156155
if (max_inferred_columns < projected_columns) {
157156
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
158-
LimitFieldIds(schema, max_inferred_columns));
157+
LimitFieldIds(*schema, max_inferred_columns));
159158
for (auto id : limit_field_ids) {
160-
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id));
159+
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema->FindColumnNameById(id));
161160
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id);
162161
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
163162
}
@@ -167,10 +166,12 @@ Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
167166
}
168167

169168
// First set sorted column with sorted column default (can be overridden by user)
170-
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
171-
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
172-
for (const auto& sorted_column : sorted_columns) {
173-
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
169+
if (schema != nullptr && order != nullptr) {
170+
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
171+
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(*schema, *order);
172+
for (const auto& sorted_column : sorted_columns) {
173+
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
174+
}
174175
}
175176

176177
// Handle user overrides of defaults

src/iceberg/metrics_config.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ class ICEBERG_EXPORT MetricsConfig {
101101
///
102102
/// \param props will be read for metrics overrides (write.metadata.metrics.column.*)
103103
/// and default(write.metadata.metrics.default)
104-
/// \param schema table schema
105-
/// \param order sort order columns, will be promoted to truncate(16)
104+
/// \param schema table schema, or nullptr when only properties are available
105+
/// \param order table sort order, or nullptr when unavailable. If provided, sorted
106+
/// columns use at least the default truncate metrics mode (`truncate(16)`) when
107+
/// the default mode is `none` or `counts`; explicit column overrides still win.
106108
/// \return metrics configuration
107109
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const TableProperties& props,
108-
const Schema& schema,
109-
const SortOrder& order);
110+
const Schema* schema,
111+
const SortOrder* order);
110112

111113
ColumnModeMap column_modes_;
112114
MetricsMode default_mode_;

0 commit comments

Comments
 (0)