Describe the enhancement
array_distinct is currently marked Incompatible because the DataFusion implementation sorts output elements, while Spark preserves insertion order of first occurrences.
For example:
- Spark:
array_distinct([2, 1, 2, 3]) → [2, 1, 3] (insertion order)
- Comet:
array_distinct([2, 1, 2, 3]) → [1, 2, 3] (sorted)
This difference means users must opt in with spark.comet.expr.ArrayDistinct.allowIncompatible=true.
Proposed approach
Implement a custom array_distinct in native/spark-expr/ that uses a hash set for deduplication while preserving insertion order, matching Spark's behavior. This would allow upgrading the support level from Incompatible to Compatible.
Key behaviors to match:
- Preserve insertion order of first occurrence
- De-duplicate NULL elements (keep only the first NULL)
- De-duplicate NaN values (Float and Double)
- Support all orderable element types
Additional context
The current implementation delegates to DataFusion's built-in array_distinct UDF (datafusion-functions-nested, set_ops.rs), which uses RowConverter with .sorted().dedup(). A Spark-compatible version would need to use .dedup() without sorting, or use a different deduplication strategy entirely.
Test coverage was expanded in #3887.
Describe the enhancement
array_distinctis currently markedIncompatiblebecause the DataFusion implementation sorts output elements, while Spark preserves insertion order of first occurrences.For example:
array_distinct([2, 1, 2, 3])→[2, 1, 3](insertion order)array_distinct([2, 1, 2, 3])→[1, 2, 3](sorted)This difference means users must opt in with
spark.comet.expr.ArrayDistinct.allowIncompatible=true.Proposed approach
Implement a custom
array_distinctinnative/spark-expr/that uses a hash set for deduplication while preserving insertion order, matching Spark's behavior. This would allow upgrading the support level fromIncompatibletoCompatible.Key behaviors to match:
Additional context
The current implementation delegates to DataFusion's built-in
array_distinctUDF (datafusion-functions-nested,set_ops.rs), which usesRowConverterwith.sorted().dedup(). A Spark-compatible version would need to use.dedup()without sorting, or use a different deduplication strategy entirely.Test coverage was expanded in #3887.