Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class ICEBERG_EXPORT WriterProperties : public ConfigBase<WriterProperties> {
"zstd"};
inline static Entry<std::string> kParquetCompressionLevel{
"write.parquet.compression-level", ""};
/// \brief Maximum number of rows in each Parquet row group.
inline static Entry<int64_t> kParquetMaxRowGroupRows{"write.parquet.max-row-group-rows",
1024 * 1024};

/// TODO(gangwu): add table properties with write.avro|parquet|orc.*

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace iceberg {
/// lower/upper bounds for a specific field identified by its field_id.
struct ICEBERG_EXPORT FieldMetrics {
/// \brief The field ID this metrics belongs to.
int32_t field_id;
int32_t field_id = -1;

/// \brief The total number of values (including nulls) for this field.
/// A negative value indicates the count is unknown.
Expand Down
27 changes: 14 additions & 13 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,20 @@ const std::shared_ptr<MetricsConfig>& MetricsConfig::Default() {

Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(const Table& table) {
ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema());
auto sort_order = table.sort_order();
return MakeInternal(table.properties(), *schema,
*sort_order.value_or(SortOrder::Unsorted()));
auto order = table.sort_order().value_or(SortOrder::Unsorted());
return MakeInternal(table.properties(), schema.get(), order.get());
}

Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(
std::unordered_map<std::string, std::string> properties) {
// Create a minimal TableProperties wrapper for the properties
TableProperties props = TableProperties::FromMap(std::move(properties));

return MakeInternal(props, Schema({}), *SortOrder::Unsorted());
return MakeInternal(props, /*schema=*/nullptr, /*order=*/nullptr);
}

Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
const TableProperties& props, const Schema& schema, const SortOrder& order) {
const TableProperties& props, const Schema* schema, const SortOrder* order) {
ColumnModeMap column_modes;

MetricsMode default_mode = kDefaultMetricsMode;
Expand All @@ -148,16 +147,16 @@ Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
props.Get(TableProperties::kDefaultWriteMetricsMode);
ICEBERG_ASSIGN_OR_RAISE(default_mode,
ParseMode(configured_metrics_mode, kDefaultMetricsMode));
} else {
} else if (schema != nullptr) {
int32_t max_inferred_columns = MaxInferredColumns(props);
GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true);
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema));
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(*schema));
auto projected_columns = static_cast<int32_t>(visitor.Finish().size());
if (max_inferred_columns < projected_columns) {
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
LimitFieldIds(schema, max_inferred_columns));
LimitFieldIds(*schema, max_inferred_columns));
for (auto id : limit_field_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id));
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema->FindColumnNameById(id));
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id);
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
}
Expand All @@ -167,10 +166,12 @@ Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
}

// First set sorted column with sorted column default (can be overridden by user)
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
for (const auto& sorted_column : sorted_columns) {
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
if (schema != nullptr && order != nullptr) {
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(*schema, *order);
for (const auto& sorted_column : sorted_columns) {
column_modes[std::string(sorted_column)] = sorted_col_default_mode;
}
}

// Handle user overrides of defaults
Expand Down
10 changes: 6 additions & 4 deletions src/iceberg/metrics_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ class ICEBERG_EXPORT MetricsConfig {
///
/// \param props will be read for metrics overrides (write.metadata.metrics.column.*)
/// and default(write.metadata.metrics.default)
/// \param schema table schema
/// \param order sort order columns, will be promoted to truncate(16)
/// \param schema table schema, or nullptr when only properties are available
/// \param order table sort order, or nullptr when unavailable. If provided, sorted
/// columns use at least the default truncate metrics mode (`truncate(16)`) when
/// the default mode is `none` or `counts`; explicit column overrides still win.
/// \return metrics configuration
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const TableProperties& props,
const Schema& schema,
const SortOrder& order);
const Schema* schema,
const SortOrder* order);

ColumnModeMap column_modes_;
MetricsMode default_mode_;
Expand Down
Loading
Loading