Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,89 @@

package org.apache.comet.iceberg

import java.lang.reflect.Method

import org.apache.spark.internal.Logging

/** Cached Iceberg classes and methods to avoid repeated reflection lookups. */
case class IcebergReflectionCache(
contentScanTaskClass: Class[_],
fileScanTaskClass: Class[_],
contentFileClass: Class[_],
deleteFileClass: Class[_],
schemaParserClass: Class[_],
schemaClass: Class[_],
partitionSpecParserClass: Class[_],
partitionSpecClass: Class[_],
fileMethod: Method,
startMethod: Method,
lengthMethod: Method,
partitionMethod: Method,
residualMethod: Method,
taskSchemaMethod: Method,
deletesMethod: Method,
specMethod: Method,
schemaToJsonMethod: Method,
specToJsonMethod: Method,
deleteContentMethod: Method,
deleteSpecIdMethod: Method,
deleteEqualityIdsMethod: Method)
Copy link
Contributor

@coderfender coderfender Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this cache is truly helpful or perhaps if the reward is worth the complexity being introduced. Here are my concerns and I would love to know your opinion

  1. We have ~ 20 + fields making it difficult to maintain and manage
  2. We are sometimes caching methods vs classes . Can we be consistent and perhaps logically bucket this ?
  3. What happens if a Class or method not is not found ? Would we rather throw an error or fail silently ? What do you think is the path of least resistance here ?
  4. Why did we extends Logging for the companion object ?
  5. Why factory instead of singleton approach ? Perhaps a single instance is useful across ?
  6. Can we guarantee that this is thread safe and only populated when iceberg is involved
  7. Are we catering / handling all supported iceberg versions and their signatures?
  8. nit : might also want to rename class to indicate that it is in the Iceberg subsystem


object IcebergReflectionCache extends Logging {

def create(): IcebergReflectionCache = {
// scalastyle:off classforname
val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK)
val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE)
val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE)
val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER)
val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA)
val partitionSpecParserClass =
Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER)
val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)
// scalastyle:on classforname

val fileMethod = contentScanTaskClass.getMethod("file")
val startMethod = contentScanTaskClass.getMethod("start")
val lengthMethod = contentScanTaskClass.getMethod("length")
val partitionMethod = contentScanTaskClass.getMethod("partition")
val residualMethod = contentScanTaskClass.getMethod("residual")
val taskSchemaMethod = fileScanTaskClass.getMethod("schema")
val deletesMethod = fileScanTaskClass.getMethod("deletes")
val specMethod = fileScanTaskClass.getMethod("spec")
val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass)
schemaToJsonMethod.setAccessible(true)
val specToJsonMethod = partitionSpecParserClass.getMethod("toJson", partitionSpecClass)
val deleteContentMethod = deleteFileClass.getMethod("content")
val deleteSpecIdMethod = deleteFileClass.getMethod("specId")
val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds")

IcebergReflectionCache(
contentScanTaskClass = contentScanTaskClass,
fileScanTaskClass = fileScanTaskClass,
contentFileClass = contentFileClass,
deleteFileClass = deleteFileClass,
schemaParserClass = schemaParserClass,
schemaClass = schemaClass,
partitionSpecParserClass = partitionSpecParserClass,
partitionSpecClass = partitionSpecClass,
fileMethod = fileMethod,
startMethod = startMethod,
lengthMethod = lengthMethod,
partitionMethod = partitionMethod,
residualMethod = residualMethod,
taskSchemaMethod = taskSchemaMethod,
deletesMethod = deletesMethod,
specMethod = specMethod,
schemaToJsonMethod = schemaToJsonMethod,
specToJsonMethod = specToJsonMethod,
deleteContentMethod = deleteContentMethod,
deleteSpecIdMethod = deleteSpecIdMethod,
deleteEqualityIdsMethod = deleteEqualityIdsMethod)
}
}

/**
* Shared reflection utilities for Iceberg operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceR
import org.apache.spark.sql.types._

import org.apache.comet.ConfigEntry
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection, IcebergReflectionCache}
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField}
Expand Down Expand Up @@ -218,33 +218,25 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
}
}

/**
* Extracts delete files from an Iceberg FileScanTask as a list (for deduplication).
*/
private def extractDeleteFilesList(
task: Any,
contentFileClass: Class[_],
fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = {
cache: IcebergReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = {
try {
// scalastyle:off classforname
val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE)
// scalastyle:on classforname

val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass)
val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]]
val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes

deletes.asScala.flatMap { deleteFile =>
deletesList.asScala.flatMap { deleteFile =>
try {
IcebergReflection
.extractFileLocation(contentFileClass, deleteFile)
.extractFileLocation(cache.contentFileClass, deleteFile)
.map { deletePath =>
val deleteBuilder =
OperatorOuterClass.IcebergDeleteFile.newBuilder()
deleteBuilder.setFilePath(deletePath)

val contentType =
try {
val contentMethod = deleteFileClass.getMethod("content")
val content = contentMethod.invoke(deleteFile)
val content = cache.deleteContentMethod.invoke(deleteFile)
content.toString match {
case IcebergReflection.ContentTypes.POSITION_DELETES =>
IcebergReflection.ContentTypes.POSITION_DELETES
Expand All @@ -260,21 +252,20 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit

val specId =
try {
val specIdMethod = deleteFileClass.getMethod("specId")
specIdMethod.invoke(deleteFile).asInstanceOf[Int]
cache.deleteSpecIdMethod.invoke(deleteFile).asInstanceOf[Int]
} catch {
case _: Exception =>
0
}
deleteBuilder.setPartitionSpecId(specId)

try {
val equalityIdsMethod =
deleteFileClass.getMethod("equalityFieldIds")
val equalityIds = equalityIdsMethod
val equalityIds = cache.deleteEqualityIdsMethod
.invoke(deleteFile)
.asInstanceOf[java.util.List[Integer]]
equalityIds.forEach(id => deleteBuilder.addEqualityIds(id))
if (equalityIds != null) {
equalityIds.forEach(id => deleteBuilder.addEqualityIds(id))
}
} catch {
case _: Exception =>
}
Expand All @@ -297,38 +288,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
}
}

/**
* Serializes partition spec and data from an Iceberg FileScanTask.
*
* Extracts partition specification (field definitions and transforms) and partition data
* (actual values) from the task. This information is used by the native execution engine to
* build a constants_map for identity-transformed partition columns and to handle
* partition-level filtering.
*/
private def serializePartitionData(
task: Any,
contentScanTaskClass: Class[_],
fileScanTaskClass: Class[_],
cache: IcebergReflectionCache,
taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder,
commonBuilder: OperatorOuterClass.IcebergScanCommon.Builder,
partitionTypeToPoolIndex: mutable.HashMap[String, Int],
partitionSpecToPoolIndex: mutable.HashMap[String, Int],
partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = {
try {
val specMethod = fileScanTaskClass.getMethod("spec")
val spec = specMethod.invoke(task)
val spec = cache.specMethod.invoke(task)

if (spec != null) {
// Deduplicate partition spec
try {
// scalastyle:off classforname
val partitionSpecParserClass =
Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER)
val toJsonMethod = partitionSpecParserClass.getMethod(
"toJson",
Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC))
// scalastyle:on classforname
val partitionSpecJson = toJsonMethod
val partitionSpecJson = cache.specToJsonMethod
.invoke(null, spec)
.asInstanceOf[String]

Expand All @@ -345,8 +319,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
}

// Get partition data from the task (via file().partition())
val partitionMethod = contentScanTaskClass.getMethod("partition")
val partitionData = partitionMethod.invoke(task)
val partitionData = cache.partitionMethod.invoke(task)

if (partitionData != null) {
// Get the partition type/schema from the spec
Expand Down Expand Up @@ -770,25 +743,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
commonBuilder.addRequiredSchema(field.build())
}

// Load Iceberg classes once (avoid repeated class loading in loop)
// scalastyle:off classforname
val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK)
val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE)
val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER)
val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA)
// scalastyle:on classforname

// Cache method lookups (avoid repeated getMethod in loop)
val fileMethod = contentScanTaskClass.getMethod("file")
val startMethod = contentScanTaskClass.getMethod("start")
val lengthMethod = contentScanTaskClass.getMethod("length")
val residualMethod = contentScanTaskClass.getMethod("residual")
val taskSchemaMethod = fileScanTaskClass.getMethod("schema")
val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass)
toJsonMethod.setAccessible(true)
val cache = IcebergReflectionCache.create()
val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]()

// Access inputRDD - safe now, DPP is resolved
scanExec.inputRDD match {
case rdd: DataSourceRDD =>
val partitions = rdd.partitions
Expand Down Expand Up @@ -817,10 +774,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit

val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder()

val dataFile = fileMethod.invoke(task)
val dataFile = cache.fileMethod.invoke(task)

val filePathOpt =
IcebergReflection.extractFileLocation(contentFileClass, dataFile)
IcebergReflection.extractFileLocation(cache.contentFileClass, dataFile)

filePathOpt match {
case Some(filePath) =>
Expand All @@ -832,17 +789,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
throw new RuntimeException(msg)
}

val start = startMethod.invoke(task).asInstanceOf[Long]
val start = cache.startMethod.invoke(task).asInstanceOf[Long]
taskBuilder.setStart(start)

val length = lengthMethod.invoke(task).asInstanceOf[Long]
val length = cache.lengthMethod.invoke(task).asInstanceOf[Long]
taskBuilder.setLength(length)

val taskSchema = taskSchemaMethod.invoke(task)
val taskSchema = cache.taskSchemaMethod.invoke(task)

val deletes =
IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass)
val hasDeletes = !deletes.isEmpty
val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]]
val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes
val hasDeletes = !deletesList.isEmpty

val schema: AnyRef =
if (hasDeletes) {
Expand All @@ -869,13 +826,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
val schemaIdx = schemaToPoolIndex.getOrElseUpdate(
schema, {
val idx = schemaToPoolIndex.size
val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String]
val schemaJson =
cache.schemaToJsonMethod.invoke(null, schema).asInstanceOf[String]
commonBuilder.addSchemaPool(schemaJson)
idx
})
taskBuilder.setSchemaIdx(schemaIdx)

val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema)
// Use cached field ID mapping to avoid repeated reflection per-task
val nameToFieldId = fieldIdMappingCache.getOrElseUpdate(
schema,
IcebergReflection.buildFieldIdMapping(schema))

val projectFieldIds = output.flatMap { attr =>
nameToFieldId
Expand All @@ -898,8 +859,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
})
taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx)

val deleteFilesList =
extractDeleteFilesList(task, contentFileClass, fileScanTaskClass)
val deleteFilesList = extractDeleteFilesList(task, cache)
if (deleteFilesList.nonEmpty) {
val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate(
deleteFilesList, {
Expand All @@ -914,7 +874,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit

val residualExprOpt =
try {
val residualExpr = residualMethod.invoke(task)
val residualExpr = cache.residualMethod.invoke(task)
val catalystExpr = convertIcebergExpression(residualExpr, output)
catalystExpr.flatMap { expr =>
exprToProto(expr, output, binding = false)
Expand All @@ -939,8 +899,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit

serializePartitionData(
task,
contentScanTaskClass,
fileScanTaskClass,
cache,
taskBuilder,
commonBuilder,
partitionTypeToPoolIndex,
Expand Down
Loading
Loading