Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -37,7 +36,6 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
Expand Down Expand Up @@ -93,10 +91,30 @@ static org.apache.iceberg.Schema resolveSchema(
if (keep != null && !keep.isEmpty()) {
selectedFieldsBuilder.addAll(keep);
} else if (drop != null && !drop.isEmpty()) {
Set<String> fields =
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
drop.forEach(fields::remove);
selectedFieldsBuilder.addAll(fields);
// Get all field paths including nested ones
java.util.List<String> allPaths =
new java.util.ArrayList<>(
org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet());
java.util.Collections.sort(allPaths);

// Identify leaf fields only (fields that are not parents of other fields)
// This prevents selecting a parent struct from implicitly including dropped children
java.util.Set<String> leaves = new java.util.HashSet<>();
for (int i = 0; i < allPaths.size(); i++) {
String path = allPaths.get(i);
// If the next path starts with "path.", then "path" is a parent - skip it
if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) {
continue;
}
leaves.add(path);
}

// Remove fields that are dropped or are children of dropped fields
for (String d : drop) {
leaves.removeIf(f -> f.equals(d) || f.startsWith(d + "."));
}

selectedFieldsBuilder.addAll(leaves);
} else {
// default: include all columns
return schema;
Expand Down Expand Up @@ -327,7 +345,9 @@ void validate(Table table) {
param = "drop";
fieldsSpecified = newHashSet(checkNotNull(drop));
}
table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name()));
// Use findField() to support nested column paths (e.g., "colA.colB")
// Iceberg's Schema.findField() resolves dot-notation paths for nested fields
fieldsSpecified.removeIf(name -> table.schema().findField(name) != null);

checkArgument(
fieldsSpecified.isEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,53 @@ public void testProjectedSchema() {
assertTrue(projectKeep.sameSchema(expectedKeep));
}

@Test
public void testNestedColumnPruningValidation() {
// Test that nested column paths (dot notation) are accepted in keep/drop configuration
org.apache.iceberg.Schema schemaWithNested =
new org.apache.iceberg.Schema(
required(1, "id", StringType.get()),
required(
2,
"data",
StructType.of(
required(3, "name", StringType.get()), required(4, "value", StringType.get()))),
required(5, "metadata", StringType.get()));

// Test that nested column path "data.name" is valid and can be selected
org.apache.iceberg.Schema projectNestedKeep =
resolveSchema(schemaWithNested, asList("id", "data.name"), null);

// Verify the projected schema contains the nested field
assertTrue(projectNestedKeep.findField("id") != null);
assertTrue(projectNestedKeep.findField("data.name") != null);
}

@Test
public void testNestedColumnDropValidation() {
// Test that nested column paths work correctly with drop configuration
org.apache.iceberg.Schema schemaWithNested =
new org.apache.iceberg.Schema(
required(1, "id", StringType.get()),
required(
2,
"data",
StructType.of(
required(3, "name", StringType.get()), required(4, "value", StringType.get()))),
required(5, "metadata", StringType.get()));

// Test dropping a nested field "data.name" - should keep id, data.value, metadata
org.apache.iceberg.Schema projectNestedDrop =
resolveSchema(schemaWithNested, null, asList("data.name"));

// Verify "data.name" is NOT in the projected schema
assertTrue(projectNestedDrop.findField("id") != null);
assertTrue(projectNestedDrop.findField("data.value") != null);
assertTrue(projectNestedDrop.findField("metadata") != null);
// data.name should be dropped
assertTrue(projectNestedDrop.findField("data.name") == null);
}

@Test
public void testSimpleScan() throws Exception {
TableIdentifier tableId =
Expand Down
Loading