-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-50593][SQL] SPJ: Support truncate transform via generalized ReducibleFunction API #55885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2fdd9d2
81d63dd
c51a582
f65ac19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.sql.connector.catalog.functions; | ||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Container for reducible function literal parameters. | ||
| * Provides type-safe access to parameters of various types. | ||
| * | ||
| * Examples: | ||
| * <ul> | ||
| * <li>bucket(4, col) → ReducibleParameters([4])</li> | ||
| * <li>truncate(col, 3) → ReducibleParameters([3])</li> | ||
| * <li>range_bucket(col, 0L, 100L, 10) → ReducibleParameters([0L, 100L, 10])</li> | ||
| * <li>custom_transform(col, "param") → ReducibleParameters(["param"])</li> | ||
| * </ul> | ||
| * | ||
| * @since 5.0.0 | ||
| */ | ||
| @Evolving | ||
| public class ReducibleParameters { | ||
| public static final ReducibleParameters EMPTY = new ReducibleParameters(); | ||
|
|
||
| private final List<Object> values; | ||
|
|
||
| private ReducibleParameters() { | ||
| this.values = new ArrayList<>(); | ||
| } | ||
|
|
||
| public ReducibleParameters(List<Object> values) { | ||
| this.values = values; | ||
| } | ||
|
|
||
| public ReducibleParameters(Object... values) { | ||
| this.values = Arrays.asList(values); | ||
| } | ||
|
|
||
| /** | ||
| * Get the number of parameters. | ||
| */ | ||
| public int count() { | ||
| return values.size(); | ||
| } | ||
|
|
||
| /** | ||
| * Check if this container has parameters. | ||
| */ | ||
| public boolean isEmpty() { | ||
| return values.isEmpty(); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Integer. | ||
| * @throws ClassCastException if parameter is not an Integer | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public int getInt(int index) { | ||
| return (Integer) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Long. | ||
| * @throws ClassCastException if parameter is not a Long | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public long getLong(int index) { | ||
| return (Long) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as String. | ||
| * @throws ClassCastException if parameter is not a String | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public String getString(int index) { | ||
| return (String) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Double. | ||
| * @throws ClassCastException if parameter is not a Double | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public double getDouble(int index) { | ||
| return (Double) values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get parameter at index as Float. | ||
| * @throws ClassCastException if parameter is not a Float | ||
| * @throws IndexOutOfBoundsException if index is invalid | ||
| */ | ||
| public float getFloat(int index) { | ||
| return (Float) values.get(index); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
/**
* Get parameter at index as BigDecimal.
* @throws ClassCastException if parameter is not a BigDecimal
* @throws IndexOutOfBoundsException if index is invalid
*/
public java.math.BigDecimal getBigDecimal(int index) {
return (java.math.BigDecimal) values.get(index);
}Same gap exists for any other type the Spark side might convert (binary, interval, etc.) — see the |
||
|
|
||
| /** | ||
| * Get raw parameter value at index. | ||
| */ | ||
| public Object get(int index) { | ||
| return values.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * Get all parameter values as a list. | ||
| */ | ||
| public List<Object> getAll() { | ||
| return values; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| ReducibleParameters that = (ReducibleParameters) o; | ||
| return values.equals(that.values); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return values.hashCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ReducibleParameters(" + values + ")"; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Structural alternative — reuse
V2Literalinstead of introducingReducibleParameters. Surfacing this even though it's late in the cycle, because it's the kind of design choice worth weighing before locking in a new public class.The proposal: drop
ReducibleParametersentirely and have the generalized reducer takeorg.apache.spark.sql.connector.expressions.Literal(the V2 literal type already used everywhere else in the connector API):Connector use:
What this fixes simultaneously:
.value()and dispatch on.dataType(). No more "we forgotgetBigDecimal" / "we'll needgetCalendarInterval" / etc.extractParameterspartial-conversion concern (General-section bullet Removed reference to incubation in Spark user docs. #2) goes away.V2LiteralcarriesdataType()alongside the value, so the connector interprets it correctly regardless of whether it'sString,BigDecimal,byte[],CalendarInterval, etc. No Catalyst-internal types leaking, no type-by-type special-casing —extractParametersshrinks to a Catalyst-Literal → V2-Literal conversion (essentially the inverse of whatV2ExpressionUtils.toCatalystalready does for V2 literals).V2Literalis already public, already@Evolving→ stable, already used by every connector that authors V2 transforms (Expressions.literal(N),Expressions.bucket(N, col)). Receiving them back through the reducer API is symmetric round-trip with how transforms are constructed — V2 connectors never have to cross the Catalyst boundary in this public surface.Trade-offs being honest about:
(Integer) params[0].value()vsparams.getInt(0)). Real but small.Literal<?>[]produces unchecked-warning ceremony.List<Literal<?>>orLiteral<?>...varargs are cleaner alternatives.Literal<Integer>).cc @sunchao @szehon-ho — would value your read on this trade-off, given your existing reviews of
ReducibleParameters. Should the API rebase onV2Literal, or stay with the new class as drafted?