perf: Add ReflectionCache for Iceberg serialization optimization [iceberg]#3558
perf: Add ReflectionCache for Iceberg serialization optimization [iceberg]#3558Shekharrajak wants to merge 7 commits intoapache:mainfrom
Conversation
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks @Shekharrajak! First round of feedback. I need to dig into failure handling on this PR next week. Whenever you push your next commit we'll get coverage on the Iceberg tests.
spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Outdated
Show resolved
Hide resolved
| * Cache for Iceberg reflection metadata to avoid repeated class loading and method lookups. | ||
| * | ||
| * This cache is created once per serializePartitions() call and passed to helper methods. It | ||
| * provides ~50% serialization speedup by eliminating redundant reflection operations that would |
There was a problem hiding this comment.
Cache at the right scope - Create cache once per serializePartitions() call, not per-task and Pass cache to helpers: Don't let helper methods do their own class loading
4ad1055 to
3697a89
Compare
3697a89 to
2840052
Compare
|
Github CI Checks are looking fine. |
|
I think we would be benefit from some unit tests for the new cache case classes. Also we could only init this lazily? Not sure if I am missing something but 'structLikeClass' seems to be not used anywhere after init in the cache constructor? |
Added |
removed .
For rarely-used fields (but all current fields are used per-task, so eager init is fine) |
| specToJsonMethod: Method, | ||
| deleteContentMethod: Method, | ||
| deleteSpecIdMethod: Method, | ||
| deleteEqualityIdsMethod: Method) |
There was a problem hiding this comment.
I am not sure if this cache is truly helpful or perhaps if the reward is worth the complexity being introduced. Here are my concerns and I would love to know your opinion
- We have ~ 20 + fields making it difficult to maintain and manage
- We are sometimes caching methods vs classes . Can we be consistent and perhaps logically bucket this ?
- What happens if a Class or method not is not found ? Would we rather throw an error or fail silently ? What do you think is the path of least resistance here ?
- Why did we extends Logging for the companion object ?
- Why factory instead of singleton approach ? Perhaps a single instance is useful across ?
- Can we guarantee that this is thread safe and only populated when iceberg is involved
- Are we catering / handling all supported iceberg versions and their signatures?
- nit : might also want to rename class to indicate that it is in the Iceberg subsystem
|
|
||
| val cache = ReflectionCache.create() | ||
|
|
||
| for (_ <- 1 to 10) { |
There was a problem hiding this comment.
I am not sure what is the purpose of this loop or this test here ?
There was a problem hiding this comment.
It was to check everytime we should be able to access the class/method .
| } | ||
| } | ||
|
|
||
| test("ReflectionCache schemaToJsonMethod is accessible") { |
There was a problem hiding this comment.
May be an overkill to have this in a single test ?
| val cache1 = ReflectionCache.create() | ||
| val cache2 = ReflectionCache.create() | ||
|
|
||
| assert(cache1.contentScanTaskClass != null) |
There was a problem hiding this comment.
Not sure if the tests are truly checking if the caches are independent ? The should ultimately be holding ref to the same iceberg methods / instances right ?
| } catch { | ||
| case _: ClassNotFoundException => false | ||
| } | ||
| } |
There was a problem hiding this comment.
It would be great to see tests across various iceberg versions , missing / partial constructors and asserting expected behavior. Also we might want to add further tests to see if the methods returned are executable and they aren't corrupted while building the cache
Which issue does this PR close?
Closes #3456.
Rationale for this change
PR #3298 added reflection caching optimizations for Iceberg serialization, but these were lost during subsequent refactoring in #3349 and #3443. The current code performs redundant
Class.forName()andgetMethod()calls for every task (tens of thousands of times for large tables), causing significant serialization overhead.What changes are included in this PR?
ReflectionCachecase classserializePartitions()to create cache once and pass to helper methodsextractDeleteFilesList()andserializePartitionData()to use cached methodsbuildFieldIdMapping()calls per-taskCometIcebergSerializationBenchmarkto measure serialization performanceHow are these changes tested?
Benchmark: