Skip to content
13 changes: 11 additions & 2 deletions sql/api/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.errors.DataTypeErrors.{toSQLType, toSQLValue}
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.TypeApiOps
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -627,8 +628,16 @@ trait Row extends Serializable {
}

// Convert a value to json.
def toJson(value: Any, dataType: DataType): JValue = (value, dataType) match {
case (null, _) => JNull
def toJson(value: Any, dataType: DataType): JValue =
if (value == null) {
JNull
} else {
TypeApiOps(dataType)
.map(ops => JString(ops.format(value)))
.getOrElse(toJsonDefault(value, dataType))
}

def toJsonDefault(value: Any, dataType: DataType): JValue = (value, dataType) match {
case (b: Boolean, _) => JBool(b)
case (b: Byte, _) => JLong(b)
case (s: Short, _) => JLong(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, B
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.TypeApiOps
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -70,6 +71,13 @@ object RowEncoder extends DataTypeErrorsBase {
}

private[sql] def encoderForDataType(dataType: DataType, lenient: Boolean): AgnosticEncoder[_] =
Comment thread
davidm-db marked this conversation as resolved.
TypeApiOps(dataType)
.map(_.getEncoder)
.getOrElse(encoderForDataTypeDefault(dataType, lenient))

private def encoderForDataTypeDefault(
dataType: DataType,
lenient: Boolean): AgnosticEncoder[_] =
dataType match {
case NullType => NullEncoder
case BooleanType => BoxedBooleanEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[sql] trait SqlApiConf {
def parserDfaCacheFlushRatio: Double
def legacyParameterSubstitutionConstantsOnly: Boolean
def legacyIdentifierClauseOnly: Boolean
def typesFrameworkEnabled: Boolean
}

private[sql] object SqlApiConf {
Expand Down Expand Up @@ -110,4 +111,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def parserDfaCacheFlushRatio: Double = -1.0
override def legacyParameterSubstitutionConstantsOnly: Boolean = false
override def legacyIdentifierClauseOnly: Boolean = false
override def typesFrameworkEnabled: Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.LocalTimeEncoder
import org.apache.spark.sql.catalyst.util.{FractionTimeFormatter, TimeFormatter}
import org.apache.spark.sql.types.{DataType, TimeType}

/**
* Client-side (spark-api) operations for TimeType.
*
* This class implements all TypeApiOps methods for the TIME data type:
* - String formatting: uses FractionTimeFormatter for consistent output
* - Row encoding: uses LocalTimeEncoder for java.time.LocalTime
*
* RELATIONSHIP TO TimeTypeOps: TimeTypeOps (in catalyst package) extends this class to inherit
* client-side operations while adding server-side operations (physical type, literals, etc.).
*
* @param t
* The TimeType with precision information
* @since 4.2.0
*/
class TimeTypeApiOps(val t: TimeType) extends TypeApiOps {

override def dataType: DataType = t

// ==================== String Formatting ====================

@transient
private lazy val timeFormatter: TimeFormatter = new FractionTimeFormatter()

override def format(v: Any): String = {
timeFormatter.format(v.asInstanceOf[Long])
}

override def toSQLValue(v: Any): String = {
s"TIME '${format(v)}'"
}

// ==================== Row Encoding ====================

override def getEncoder: AgnosticEncoder[_] = LocalTimeEncoder
}
126 changes: 126 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/types/ops/TypeApiOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types.ops

import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.{DataType, TimeType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Client-side (spark-api) type operations for the Types Framework.
*
* This trait consolidates all client-side operations that a data type must implement to be usable
* in the Spark SQL API layer. All methods are mandatory because a type cannot function correctly
* without string formatting (needed for CAST to STRING, EXPLAIN, SHOW) or encoding (needed for
* Dataset[T] operations).
*
* This single-interface design was chosen over separate FormatTypeOps/EncodeTypeOps traits to
* make it clear what a new type must implement - there is one mandatory interface, and it
* contains everything required. Optional capabilities (e.g., proto, Arrow, JDBC) are defined as
* separate traits that can be mixed in incrementally.
*
* RELATIONSHIP TO TypeOps:
* - TypeOps (catalyst): Server-side operations - physical types, literals, conversions
* - TypeApiOps (spark-api): Client-side operations - formatting, encoding
*
* The split exists because sql/api cannot depend on sql/catalyst. For TimeType, TimeTypeOps
* (catalyst) extends TimeTypeApiOps (sql-api) to inherit both sets of operations.
*
* @see
* TimeTypeApiOps for reference implementation
* @since 4.2.0
*/
trait TypeApiOps extends Serializable {

/** The DataType this Ops instance handles. */
def dataType: DataType

// ==================== String Formatting ====================

/**
* Formats an internal value as a display string.
*
* Used by CAST to STRING, EXPLAIN output, SHOW commands.
*
* @param v
* the internal value (e.g., Long nanoseconds for TimeType)
* @return
* formatted string (e.g., "10:30:45.123456")
*/
def format(v: Any): String
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we call it at the client side?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the moment, it's only called from the formatUTF8 below, but in one of the later phases it will be called from Row.scala (client/api) and HiveResult.scala (core).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, looking at it now there's no reason to not add it immediately, it fits purposefully into this PR... adding it


/**
* Formats an internal value as a UTF8String.
*
* Default implementation wraps format(). Override for performance if needed.
*/
def formatUTF8(v: Any): UTF8String = UTF8String.fromString(format(v))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect any data type to override it? It seems weird if format and formatUTF8 are inconsistent. We can probably remove this API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I see the use case.


/**
* Formats an internal value as a SQL literal string.
*
* @param v
* the internal value
* @return
* SQL literal string (e.g., "TIME '10:30:00'")
*/
def toSQLValue(v: Any): String

// ==================== Row Encoding ====================

/**
* Returns the AgnosticEncoder for this type.
*
* Used by RowEncoder for Dataset[T] operations.
*
* @return
* AgnosticEncoder instance (e.g., LocalTimeEncoder for TimeType)
*/
def getEncoder: AgnosticEncoder[_]
}

/**
* Factory object for creating TypeApiOps instances.
*
* Returns Option to serve as both lookup and existence check - callers use getOrElse to fall
* through to legacy handling. The feature flag check is inside apply(), so callers don't need to
* check it separately.
*/
object TypeApiOps {

/**
* Returns a TypeApiOps instance for the given DataType, if supported by the framework.
*
* Returns None if the type is not supported or the framework is disabled. This is the single
* registration point for all client-side type operations.
*
* @param dt
* the DataType to get operations for
* @return
* Some(TypeApiOps) if supported, None otherwise
*/
def apply(dt: DataType): Option[TypeApiOps] = {
if (!SqlApiConf.get.typesFrameworkEnabled) return None
dt match {
case tt: TimeType => Some(new TimeTypeApiOps(tt))
// Add new types here - single registration point
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.language.existentials
import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -62,6 +63,13 @@ object CatalystTypeConverters {

private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any, Any] = {
TypeUtils.failUnsupportedDataType(dataType, SQLConf.get)
TypeOps(dataType)
.map(ops => new TypeOpsConverter(ops))
.getOrElse(getConverterForTypeDefault(dataType))
}

private def getConverterForTypeDefault(
dataType: DataType): CatalystTypeConverter[Any, Any, Any] = {
val converter = dataType match {
case udt: UserDefinedType[_] => UDTConverter(udt)
case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
Expand Down Expand Up @@ -150,6 +158,17 @@ object CatalystTypeConverters {
override def toScalaImpl(row: InternalRow, column: Int): Any = row.get(column, dataType)
}

/**
* Adapter that wraps TypeOps to implement CatalystTypeConverter.
* Used by the Types Framework to provide type conversion for framework-supported types.
*/
private class TypeOpsConverter(ops: TypeOps)
extends CatalystTypeConverter[Any, Any, Any] {
override def toCatalystImpl(scalaValue: Any): Any = ops.toCatalystImpl(scalaValue)
override def toScala(catalystValue: Any): Any = ops.toScala(catalystValue)
override def toScalaImpl(row: InternalRow, column: Int): Any = ops.toScalaImpl(row, column)
}

private case class UDTConverter[A >: Null](
udt: UserDefinedType[A]) extends CatalystTypeConverter[A, A, Any] {
// toCatalyst (it calls toCatalystImpl) will do null check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -168,8 +169,11 @@ object InternalRow {
/**
* Returns a writer for an `InternalRow` with given data type.
*/
@scala.annotation.tailrec
def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match {
def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit =
TypeOps(dt).map(_.getRowWriter(ordinal)).getOrElse(getWriterDefault(ordinal, dt))

private def getWriterDefault(
ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match {
case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean])
case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte])
case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, CalendarIntervalEncoder, NullEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, SparkDecimalEncoder, VariantEncoder}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal}
Expand Down Expand Up @@ -97,7 +98,10 @@ object EncoderUtils {
case _ => false
}

def dataTypeJavaClass(dt: DataType): Class[_] = {
def dataTypeJavaClass(dt: DataType): Class[_] =
TypeOps(dt).map(_.getJavaClass).getOrElse(dataTypeJavaClassDefault(dt))

private def dataTypeJavaClassDefault(dt: DataType): Class[_] = {
dt match {
case _: DecimalType => classOf[Decimal]
case _: DayTimeIntervalType => classOf[PhysicalLongType.InternalType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.types.ops.TypeOps
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -194,8 +193,13 @@ final class MutableAny extends MutableValue {
*/
final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {

@tailrec
private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue = dataType match {
private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue =
TypeOps(dataType)
.map(_.getMutableValue)
.getOrElse(dataTypeToMutableValueDefault(dataType))

private[this] def dataTypeToMutableValueDefault(
dataType: DataType): MutableValue = dataType match {
// We use INT for DATE and YearMonthIntervalType internally
case IntegerType | DateType | _: YearMonthIntervalType => new MutableInt
// We use Long for Timestamp, Timestamp without time zone and DayTimeInterval internally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.ops.TypeApiOps
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -65,7 +66,12 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression =>
case NoConstraint => castToString(from)
}

private def castToString(from: DataType): Any => UTF8String = from match {
private def castToString(from: DataType): Any => UTF8String =
TypeApiOps(from)
.map(ops => acceptAny[Any](v => ops.formatUTF8(v)))
.getOrElse(castToStringDefault(from))

private def castToStringDefault(from: DataType): Any => UTF8String = from match {
case CalendarIntervalType =>
acceptAny[CalendarInterval](i => UTF8String.fromString(i.toString))
case BinaryType => acceptAny[Array[Byte]](binaryFormatter.apply)
Expand Down
Loading