Skip to content

[SPARK-56543] Add RTM stateless benchmark#55420

Open
jerrypeng wants to merge 7 commits into
apache:masterfrom
jerrypeng:SPARK-56543
Open

[SPARK-56543] Add RTM stateless benchmark#55420
jerrypeng wants to merge 7 commits into
apache:masterfrom
jerrypeng:SPARK-56543

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

@jerrypeng jerrypeng commented Apr 20, 2026

What changes were proposed in this pull request?

Adds RTMKafkaKafkaBenchmark, a standalone benchmark program for the Real-Time Mode (RTM) trigger in Structured Streaming. It is a stateless end-to-end Kafka-to-Kafka latency benchmark.

The benchmark is implemented as an object extending org.apache.spark.benchmark.BenchmarkBase, following the same convention as other Spark benchmarks (e.g.
StateStoreBasicOperationsBenchmark, MapStatusesSerDeserBenchmark). It is not a ScalaTest suite, so it is not discovered or executed by SBT test or Maven surefire — it only runs when
invoked explicitly via runMain or spark-submit.

The benchmark:

  1. Spins up a local-cluster Spark context (local-cluster[3, 5, 1024]) and a live embedded Kafka broker via KafkaTestUtils.
  2. Generates synthetic records at 1,000 records/sec into an input Kafka topic (5 partitions) from a background producer thread.
  3. Runs a stateless pipeline with RealTimeTrigger: reads from Kafka → base64-encodes the value → stamps a source-timestamp header → writes to an output Kafka topic.
  4. Captures per-batch processing latency via Spark's observe() API.
  5. After N batches complete, reads back the output topic and reports e2e latency percentiles (p0, p50, p90, p95, p99, p100) by comparing the source-timestamp header to the Kafka sink
    timestamp.
  6. Owns its own teardown via a try { ... } finally { cleanup() } inside runBenchmarkSuite, with an idempotent cleanup() that stops Spark and tears down the embedded Kafka broker even if
    setup partially fails, the streaming query times out, or post-run analysis throws.

Sample benchmark results

 Kafka to kafka query e2e_latency in milliseconds is
  p0:   45
  p50:  70
  p90:  78
  p95:  81
  p99:  85
  p100: 331

Why are the changes needed?

There is currently no benchmark to measure RTM stateless Kafka-to-Kafka latency. This makes it hard to quantify regressions or improvements to the RTM code path during local development
or before merging changes. This benchmark provides a repeatable, self-contained way to measure that, and follows the existing Spark benchmark framework so result files can be committed
and diffed across runs.

Does this PR introduce any user-facing change?

no

How was this patch tested?

N/A. Only a benchmark was added.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6 (claude-sonnet-4-6)

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya thank you for your review! I have addressed your comments. PTAL.

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya thank you for your review! I have addressed your comments. PTAL.

@jerrypeng jerrypeng requested a review from viirya April 28, 2026 05:09
@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya I have address your comments PTAL. Thanks in advance!

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source-level run instructions are now consistent with Spark benchmark style: sql-kafka-0-10/Test/runMain ..., and output support via SPARK_GENERATE_BENCHMARK_FILES=1 is aligned with BenchmarkBase.

The PR description is stale: it still says RTMKafkaKafkaBenchmarkSuite and testOnly *RTMKafkaKafkaBenchmarkSuite. That should be updated to the new object name and Test/runMain command, but that is documentation cleanup rather than a code blocker.

private var spark: SparkSession = _
private var testUtils: KafkaTestUtils = _

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BenchmarkBase.main calls runBenchmarkSuite(args) and only calls afterAll() afterwards; it does not wrap runBenchmarkSuite in try/finally. This benchmark starts embedded Kafka and a local-cluster Spark session in runBenchmarkSuite, then relies on afterAll() for teardown. If benchmark(...) times out, the query fails, getLatencies throws, or setup partially fails after Kafka starts, afterAll() will not run, leaving Kafka/Spark resources behind. Since this benchmark intentionally runs heavyweight local resources, it should handle its own exception path, e.g. wrap setup/run in try/finally or call an idempotent cleanup method on failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will add. Though the resources will not really be leaked as 1) Kafka is run in the same process and 2) workers will shutdown themselves down when the driver is not reachable.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks almost good to merge except for one minor issue and PR description cleanup. After fixing that, we can merge this.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, you added a new benchmark but you don't add benchmark result file?

If no benchmark result file, how do we know if later PRs make improvement or regression?

Could you run it and add benchmark result too?

@jerrypeng
Copy link
Copy Markdown
Contributor Author

@viirya added.

* Example output from a recent run (Linux x86_64, OpenJDK 17):
* {{{
* Kafka to kafka query e2e_latency in milliseconds is
* p0: 45
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look at the existing benchmark result files? We usually have benchmark environment in the result file. We should have it in this benchmark result too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants