Skip to content

Flink: In upsert mode with equality fields on a partitioned table, partition column has to be in equalityFields #15900

@tomerr90

Description

@tomerr90

Apache Iceberg version

1.10.1 (latest release)

Query engine

Flink

Please describe the bug 🐞

  1. https://github.com/apache/iceberg/blob/main/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java#L952

  2. https://github.com/apache/iceberg/blob/main/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java#L755

This validation is wrong... it seems to assume in the case above (upsert + partition + equalityFields) distribution mode has to be 'hash', which it doesnt, if its left at 'none', we'll have rows with the same equalityFields routed to the same writer to handle deletes properly, and a potentially a number of writers writing to the same partition, which is completely fine.
It is also just plain wrong sice if I have something like create time as a partition field, which is pretty common, equality will fail for the same row id in different times, maybe the intention was to make the partition transform value part of the equality fields? That would maintain correctness, and also help with writer explosion
Am I misunderstanding something?

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions