[SPARK-56543] Add RTM stateless benchmark#55420
Conversation
|
@viirya thank you for your review! I have addressed your comments. PTAL. |
|
@viirya thank you for your review! I have addressed your comments. PTAL. |
|
@viirya I have address your comments PTAL. Thanks in advance! |
There was a problem hiding this comment.
The source-level run instructions are now consistent with Spark benchmark style: sql-kafka-0-10/Test/runMain ..., and output support via SPARK_GENERATE_BENCHMARK_FILES=1 is aligned with BenchmarkBase.
The PR description is stale: it still says RTMKafkaKafkaBenchmarkSuite and testOnly *RTMKafkaKafkaBenchmarkSuite. That should be updated to the new object name and Test/runMain command, but that is documentation cleanup rather than a code blocker.
| private var spark: SparkSession = _ | ||
| private var testUtils: KafkaTestUtils = _ | ||
|
|
||
| override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { |
There was a problem hiding this comment.
BenchmarkBase.main calls runBenchmarkSuite(args) and only calls afterAll() afterwards; it does not wrap runBenchmarkSuite in try/finally. This benchmark starts embedded Kafka and a local-cluster Spark session in runBenchmarkSuite, then relies on afterAll() for teardown. If benchmark(...) times out, the query fails, getLatencies throws, or setup partially fails after Kafka starts, afterAll() will not run, leaving Kafka/Spark resources behind. Since this benchmark intentionally runs heavyweight local resources, it should handle its own exception path, e.g. wrap setup/run in try/finally or call an idempotent cleanup method on failure.
There was a problem hiding this comment.
sure will add. Though the resources will not really be leaked as 1) Kafka is run in the same process and 2) workers will shutdown themselves down when the driver is not reachable.
viirya
left a comment
There was a problem hiding this comment.
It looks almost good to merge except for one minor issue and PR description cleanup. After fixing that, we can merge this.
|
@viirya added. |
| * Example output from a recent run (Linux x86_64, OpenJDK 17): | ||
| * {{{ | ||
| * Kafka to kafka query e2e_latency in milliseconds is | ||
| * p0: 45 |
There was a problem hiding this comment.
Could you take a look at the existing benchmark result files? We usually have benchmark environment in the result file. We should have it in this benchmark result too.
What changes were proposed in this pull request?
Adds RTMKafkaKafkaBenchmark, a standalone benchmark program for the Real-Time Mode (RTM) trigger in Structured Streaming. It is a stateless end-to-end Kafka-to-Kafka latency benchmark.
The benchmark is implemented as an object extending org.apache.spark.benchmark.BenchmarkBase, following the same convention as other Spark benchmarks (e.g.
StateStoreBasicOperationsBenchmark, MapStatusesSerDeserBenchmark). It is not a ScalaTest suite, so it is not discovered or executed by SBT test or Maven surefire — it only runs when
invoked explicitly via runMain or spark-submit.
The benchmark:
timestamp.
setup partially fails, the streaming query times out, or post-run analysis throws.
Sample benchmark results
Why are the changes needed?
There is currently no benchmark to measure RTM stateless Kafka-to-Kafka latency. This makes it hard to quantify regressions or improvements to the RTM code path during local development
or before merging changes. This benchmark provides a repeatable, self-contained way to measure that, and follows the existing Spark benchmark framework so result files can be committed
and diffed across runs.
Does this PR introduce any user-facing change?
no
How was this patch tested?
N/A. Only a benchmark was added.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6 (claude-sonnet-4-6)