Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import scala.collection.mutable
import scala.reflect.classTag

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.EncodeTypeOps
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -91,7 +92,6 @@ object RowEncoder extends DataTypeErrorsBase {
case TimestampNTZType => LocalDateTimeEncoder
case DateType if SqlApiConf.get.datetimeJava8ApiEnabled => LocalDateEncoder(lenient)
case DateType => DateEncoder(lenient)
case _: TimeType => LocalTimeEncoder
case CalendarIntervalType => CalendarIntervalEncoder
case _: DayTimeIntervalType => DayTimeIntervalEncoder
case _: YearMonthIntervalType => YearMonthIntervalEncoder
Expand Down Expand Up @@ -123,6 +123,7 @@ object RowEncoder extends DataTypeErrorsBase {
case g: GeographyType => GeographyEncoder(g)
case g: GeometryType => GeometryEncoder(g)

case _ if EncodeTypeOps.supports(dataType) => EncodeTypeOps(dataType).getEncoder
case _ =>
throw new AnalysisException(
errorClass = "UNSUPPORTED_DATA_TYPE_FOR_ENCODER",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.types.{DataType, TimeType}

// Encode type values to external types, for instance to Java types.
trait EncodeTypeOps {
// Gets an agnostic encoder which contains all info needed to convert internal row
// values of the given type to a specific objects.
def getEncoder: AgnosticEncoder[_]
}

object EncodeTypeOps {
private val supportedDataTypes: Set[DataType] =
Set(TimeType.MIN_PRECISION to TimeType.MAX_PRECISION map TimeType.apply: _*)

def supports(dt: DataType): Boolean = supportedDataTypes.contains(dt)
def apply(dt: DataType): EncodeTypeOps = TypeApiOps(dt).asInstanceOf[EncodeTypeOps]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.types.{DataType, TimeType}

// Format type values to strings
trait FormatTypeOps {
// Formats times according to the pattern `HH:mm:ss.[..fff..]` where `[..fff..]` is a fraction
// of second up to microsecond resolution. It doesn't output trailing zeros in the fraction.
def format(v: Any): String
// Converts given value to a SQL typed literal
def toSQLValue(v: Any): String
}

object FormatTypeOps {
private val supportedDataTypes: Set[DataType] =
Set(TimeType.MIN_PRECISION to TimeType.MAX_PRECISION map TimeType.apply: _*)

def supports(dt: DataType): Boolean = supportedDataTypes.contains(dt)
def apply(dt: DataType): FormatTypeOps = TypeApiOps(dt).asInstanceOf[FormatTypeOps]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import java.time.LocalTime

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.LocalTimeEncoder
import org.apache.spark.sql.catalyst.util.TimeFormatter
import org.apache.spark.sql.types.TimeType

class TimeTypeApiOps(t: TimeType)
extends TypeApiOps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. indentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I have to violate the coding style because some GA fails and requires the formatting

$ ./build/mvn scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl sql/api -pl sql/connect/common -pl sql/connect/server -pl sql/connect/shims -pl sql/connect/client/jvm

with EncodeTypeOps
with FormatTypeOps
with Serializable {
private lazy val fracFormatter = TimeFormatter.getFractionFormatter()

override def getEncoder: AgnosticEncoder[_] = LocalTimeEncoder

override def format(v: Any): String = v match {
case l: Long => fracFormatter.format(l)
case lt: LocalTime => fracFormatter.format(lt)
case other =>
throw SparkException.internalError(
s"Unsupported external type ${other.getClass.getName} for the type ${t.sql}")
}
override def toSQLValue(v: Any): String = s"TIME '${format(v)}'"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.SparkException
import org.apache.spark.sql.types.{DataType, TimeType}

// Operations over a data type
trait TypeApiOps

// The factory of type operation objects.
object TypeApiOps {
def apply(dt: DataType): TypeApiOps = dt match {
case tt: TimeType => new TimeTypeApiOps(tt)
case other =>
throw SparkException.internalError(
s"Cannot create an operation object of the type ${other.sql}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.language.existentials
import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.ops.ExternalTypeOps
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -76,7 +77,6 @@ object CatalystTypeConverters {
new GeometryConverter(g)
case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter
case DateType => DateConverter
case _: TimeType => TimeConverter
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter
case TimestampType => TimestampConverter
case TimestampNTZType => TimestampNTZConverter
Expand All @@ -90,6 +90,7 @@ object CatalystTypeConverters {
case DoubleType => DoubleConverter
case DayTimeIntervalType(_, endField) => DurationConverter(endField)
case YearMonthIntervalType(_, endField) => PeriodConverter(endField)
case _ if ExternalTypeOps.supports(dataType) => ExternalTypeOps(dataType)
case dataType: DataType => IdentityConverter(dataType)
}
converter.asInstanceOf[CatalystTypeConverter[Any, Any, Any]]
Expand All @@ -102,7 +103,7 @@ object CatalystTypeConverters {
* @tparam ScalaOutputType The type of Scala values returned when converting Catalyst to Scala.
* @tparam CatalystType The internal Catalyst type used to represent values of this Scala type.
*/
private abstract class CatalystTypeConverter[ScalaInputType, ScalaOutputType, CatalystType]
trait CatalystTypeConverter[ScalaInputType, ScalaOutputType, CatalystType]
extends Serializable {

/**
Expand Down Expand Up @@ -436,18 +437,6 @@ object CatalystTypeConverters {
DateTimeUtils.daysToLocalDate(row.getInt(column))
}

private object TimeConverter extends CatalystTypeConverter[LocalTime, LocalTime, Any] {
override def toCatalystImpl(scalaValue: LocalTime): Long = {
DateTimeUtils.localTimeToNanos(scalaValue)
}
override def toScala(catalystValue: Any): LocalTime = {
if (catalystValue == null) null
else DateTimeUtils.nanosToLocalTime(catalystValue.asInstanceOf[Long])
}
override def toScalaImpl(row: InternalRow, column: Int): LocalTime =
DateTimeUtils.nanosToLocalTime(row.getLong(column))
}

private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Any): Long = scalaValue match {
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
Expand Down Expand Up @@ -634,7 +623,7 @@ object CatalystTypeConverters {
case c: Char => StringConverter.toCatalyst(c.toString)
case d: Date => DateConverter.toCatalyst(d)
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: LocalTime => TimeConverter.toCatalyst(t)
case t: LocalTime => ExternalTypeOps(TimeType()).toCatalyst(t)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case i: Instant => InstantConverter.toCatalyst(i)
case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, CalendarIntervalEncoder, NullEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, SparkDecimalEncoder, VariantEncoder}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal}

/**
Expand Down Expand Up @@ -99,10 +100,10 @@ object EncoderUtils {

def dataTypeJavaClass(dt: DataType): Class[_] = {
dt match {
case _ if PhyTypeOps.supports(dt) => PhyTypeOps(dt).getJavaClass
Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we will get an Ops object here, and we will match by it instead of PhyTypeOps.supports. The current implementation is just workaround to avoid passing TypeOps instead of DataType.

case _: DecimalType => classOf[Decimal]
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
case _: YearMonthIntervalType => classOf[PhysicalIntegerType.InternalType]
case _: TimeType => classOf[PhysicalLongType.InternalType]
case _: StringType => classOf[UTF8String]
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -196,10 +197,11 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen

@tailrec
private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue = dataType match {
case _ if PhyTypeOps.supports(dataType) => PhyTypeOps(dataType).getMutableValue
// We use INT for DATE and YearMonthIntervalType internally
case IntegerType | DateType | _: YearMonthIntervalType => new MutableInt
// We use Long for Timestamp, Timestamp without time zone and DayTimeInterval internally
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType | _: TimeType =>
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType =>
new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.time.ZoneOffset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils, DateFormatter, FractionTimeFormatter, IntervalStringStyles, IntervalUtils, MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils, DateFormatter, IntervalStringStyles, IntervalUtils, MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.FormatTypeOps
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.ArrayImplicits._
Expand All @@ -35,7 +36,6 @@ import org.apache.spark.util.SparkStringUtils
trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression =>

private lazy val dateFormatter = DateFormatter()
private lazy val timeFormatter = new FractionTimeFormatter()
private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId)
private lazy val timestampNTZFormatter = TimestampFormatter.getFractionFormatter(ZoneOffset.UTC)

Expand Down Expand Up @@ -75,8 +75,6 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression =>
acceptAny[Long](t => UTF8String.fromString(timestampFormatter.format(t)))
case TimestampNTZType =>
acceptAny[Long](t => UTF8String.fromString(timestampNTZFormatter.format(t)))
case _: TimeType =>
acceptAny[Long](t => UTF8String.fromString(timeFormatter.format(t)))
case ArrayType(et, _) =>
acceptAny[ArrayData](array => {
val builder = new UTF8StringBuilder
Expand Down Expand Up @@ -176,6 +174,8 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression =>
case _: DecimalType if useDecimalPlainString =>
acceptAny[Decimal](d => UTF8String.fromString(d.toPlainString))
case _: StringType => acceptAny[UTF8String](identity[UTF8String])
case _ if FormatTypeOps.supports(from) =>
t => UTF8String.fromString(FormatTypeOps(from).format(t))
case _ => o => UTF8String.fromString(o.toString)
}

Expand Down Expand Up @@ -228,10 +228,11 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression =>
ctx.addReferenceObj("timestampNTZFormatter", timestampNTZFormatter),
timestampNTZFormatter.getClass)
(c, evPrim) => code"$evPrim = UTF8String.fromString($tf.format($c));"
case _: TimeType =>
case t if FormatTypeOps.supports(t) =>
val formatter = FormatTypeOps(t)
val tf = JavaCode.global(
ctx.addReferenceObj("timeFormatter", timeFormatter),
timeFormatter.getClass)
ctx.addReferenceObj("formatter", formatter),
formatter.getClass)
(c, evPrim) => code"$evPrim = UTF8String.fromString($tf.format($c));"
case CalendarIntervalType =>
(c, evPrim) => code"$evPrim = UTF8String.fromString($c.toString());"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.encoders.HashableWeakReference
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.ops.PhyTypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, CollationAwareUTF8String, CollationFactory, CollationSupport, MapData, SQLOrderingUtil, UnsafeRowUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -1991,11 +1992,12 @@ object CodeGenerator extends Logging {

@tailrec
def javaClass(dt: DataType): Class[_] = dt match {
case _ if PhyTypeOps.supports(dt) => PhyTypeOps(dt).getJavaClass
case BooleanType => java.lang.Boolean.TYPE
case ByteType => java.lang.Byte.TYPE
case ShortType => java.lang.Short.TYPE
case IntegerType | DateType | _: YearMonthIntervalType => java.lang.Integer.TYPE
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType | _: TimeType =>
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType =>
java.lang.Long.TYPE
case FloatType => java.lang.Float.TYPE
case DoubleType => java.lang.Double.TYPE
Expand Down
Loading