Skip to content

Core, Data, Flink: Moving Flink to use the new FormatModel API#15329

Open
pvary wants to merge 2 commits intoapache:mainfrom
pvary:flink_model
Open

Core, Data, Flink: Moving Flink to use the new FormatModel API#15329
pvary wants to merge 2 commits intoapache:mainfrom
pvary:flink_model

Conversation

@pvary
Copy link
Contributor

@pvary pvary commented Feb 15, 2026

Part of: #12298
Implementation of the new API: #12774

FlinkFormatModel and related changes

abstract class FlinkSchemaVisitor<T> {

static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> visitor) {
public static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> visitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this public change required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Remained from old versions.
Reverted

visitor.beforeStruct(struct.asStructType());

LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex);
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to switch the visit impl to try-finally? if there is any exception, it would just fail. is it important to call the after methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same pattern for every field, that the after method is called in the finally. Since I have added the afterStruct, I have decided to follow the same pattern here as well.

.split(task.start(), task.length())
.caseSensitive(caseSensitive)
.filter(task.residual())
.reuseContainers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old newOrcIterable method doesn't set this boolean flag since Orc ReadBuilder doesn't support it. I forgot how the new API handles the difference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check that the value is always set to the expected value:

public org.apache.iceberg.io.CloseableIterable<D> build() {
Preconditions.checkNotNull(reuseContainers, "Reuse containers is required for ORC read");

public void testPromotedFlinkDataType() throws Exception {
Schema iSchema =
new Schema(
Types.NestedField.required(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand the purpose of expanded types in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that in one of the versions of the code I needed to test embedded objects. It's kind of late here, but it was somewhat related to the change with the FlinkSchemaVisitor, but I can't find the usage anymore. So it might not be needed in the end.

Let me revert the changes and see if any tests fail. That will help me in the morning to remember the history

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants