Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,17 @@ graph BT

## Performance

Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026):
Throughput on a single instance (MacBook M3 Max, JDK 25 LTS, May 2026):

| Transport | batchSize=10 | batchSize=100 |
|-----------|--------------|----------------|
| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s |
| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s |
| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s |
| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,470 msg/s** | **~4,720 msg/s** |
| HTTP @ webhook latency 20 ms (sync sequential — KOJAK-74 in progress) | ~33 msg/s | ~36 msg/s |
| HTTP @ webhook latency 100 ms (sync sequential — KOJAK-74 in progress) | ~9 msg/s | ~9 msg/s |

These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. Performance work to lift these limits (async batch delivery, multi-threaded scheduler) is tracked under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14).
Kafka throughput jumped 13-41× over the original sync-sequential baseline thanks to the [KOJAK-73](https://softwaremill.atlassian.net/browse/KOJAK-73) `deliverBatch` fire-flush-await pattern. HTTP is next ([KOJAK-74](https://softwaremill.atlassian.net/browse/KOJAK-74)) and multi-threaded scheduler scaling ([KOJAK-77](https://softwaremill.atlassian.net/browse/KOJAK-77)) is in the roadmap. The full optimization plan lives under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14).

Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/).
Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/).

## Build

Expand Down
157 changes: 157 additions & 0 deletions benchmarks/kojak-73-kafka.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
[
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "10"
},
"primaryMetric" : {
"score" : 0.680696006383041,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.6561445592105263,
"50.0" : 0.680696006383041,
"90.0" : 0.7052474535555555,
"95.0" : 0.7052474535555555,
"99.0" : 0.7052474535555555,
"99.9" : 0.7052474535555555,
"99.99" : 0.7052474535555555,
"99.999" : 0.7052474535555555,
"99.9999" : 0.7052474535555555,
"100.0" : 0.7052474535555555
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.7052474535555555,
0.6561445592105263
]
]
},
"secondaryMetrics" : {
}
},
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "50"
},
"primaryMetric" : {
"score" : 0.26791908345521237,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.2562269965675676,
"50.0" : 0.26791908345521237,
"90.0" : 0.27961117034285715,
"95.0" : 0.27961117034285715,
"99.0" : 0.27961117034285715,
"99.9" : 0.27961117034285715,
"99.99" : 0.27961117034285715,
"99.999" : 0.27961117034285715,
"99.9999" : 0.27961117034285715,
"100.0" : 0.27961117034285715
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.27961117034285715,
0.2562269965675676
]
]
},
"secondaryMetrics" : {
}
},
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "100"
},
"primaryMetric" : {
"score" : 0.21151745586904763,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.21086217661904763,
"50.0" : 0.21151745586904763,
"90.0" : 0.2121727351190476,
"95.0" : 0.2121727351190476,
"99.0" : 0.2121727351190476,
"99.9" : 0.2121727351190476,
"99.99" : 0.2121727351190476,
"99.999" : 0.2121727351190476,
"99.9999" : 0.2121727351190476,
"100.0" : 0.2121727351190476
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.2121727351190476,
0.21086217661904763
]
]
},
"secondaryMetrics" : {
}
}
]


67 changes: 67 additions & 0 deletions benchmarks/results-postopt-kojak-73.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# KOJAK-73: Kafka deliverBatch fire-flush-await — Results

Measured 2026-05-04 on the same hardware as the KOJAK-68 baseline (MacBook M3 Max,
JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, smoke-run JMH config:
`fork=1, warmup=1, iter=2, warmup=10s, measurement=15s`).

## Headline numbers — Kafka throughput

| batchSize | Baseline (ms/op) | KOJAK-73 (ms/op) | **Improvement** |
|-----------|------------------|------------------|-----------------|
| 10 | 9.168 | 0.681 | **13.5×** |
| 50 | 8.665 | 0.268 | **32.3×** |
| 100 | 8.701 | 0.212 | **41.0×** |

Translated to msg/s:

| batchSize | Baseline | KOJAK-73 | Improvement |
|-----------|----------|--------------|-------------|
| 10 | ~109 | **~1,468** | 13.5× |
| 50 | ~115 | **~3,731** | 32.3× |
| 100 | ~115 | **~4,717** | 41.0× |

Raw JSON: [`kojak-73-kafka.json`](kojak-73-kafka.json).

## What changed

`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await:
1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer)
2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`)
3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()`

Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`.

## Reading the table

- **`batchSize` is now load-bearing.** Pre-KOJAK-73 throughput was flat across `batchSize`
values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O.
Post-KOJAK-73 throughput scales with `batchSize` (1,468 → 3,731 → 4,717), proving that
Kafka's internal record batching is now being exploited.
- **Sublinear scaling 50 → 100** (32× → 41× vs expected ~2× more). Indicates that DB UPDATE
overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly
what motivates KOJAK-75 (batch UPDATE via `executeBatch`) — at small batch sizes the
per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed, the N
individual UPDATE statements become the next bottleneck to attack.
- **batchSize=10 lowest gain (13.5×)** — at that batch size only 10 records can amortize
one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is
proportionally larger.

## Verification context

- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering,
single flush call (verified via flush counter), synchronous send exception (Permanent +
Retriable variants), and future-based async exception (driven via `MockProducer` override
that completes/errors per-position inside flush).
- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka.
- ktlint clean, configuration cache reuses across modules.

## What's next

1. **KOJAK-74** — analogous fire-all-await for HTTP via parallel `httpClient.sendAsync`.
Expected impact at realistic webhook latency (`httpLatencyMs ∈ {20, 100}`):
from ~33 / ~9 msg/s baseline to **~500-2,000 msg/s** range, depending on host/connection
pool reuse.
2. **KOJAK-75** — batch UPDATE via `executeBatch`. Now load-bearing: at `batchSize=100`
the N individual UPDATE statements have become the dominant per-batch cost. Expected
to shift `batchSize=100` Kafka throughput from ~4,700 toward the ~10,000 msg/s range.
3. **KOJAK-77** — `concurrency` fan-out. Multiplies all of the above by N workers.
1 change: 1 addition & 0 deletions okapi-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
implementation(project(":okapi-core"))
implementation(libs.jacksonModuleKotlin)
implementation(libs.jacksonDatatypeJsr310)
implementation(libs.slf4jApi)
compileOnly(libs.kafkaClients)

testImplementation(libs.kafkaClients)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxEntry
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.RetriableException
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future

/**
* [MessageDeliverer] that publishes outbox entries to Kafka topics.
*
* Uses the provided [Producer] to send records synchronously.
* Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure];
* all other errors map to [DeliveryResult.PermanentFailure].
*/
Expand All @@ -20,26 +23,96 @@ class KafkaMessageDeliverer(
) : MessageDeliverer {
override val type: String = KafkaDeliveryInfo.TYPE

override fun deliver(entry: OutboxEntry): DeliveryResult {
val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata)
val record =
ProducerRecord(info.topic, info.partitionKey, entry.payload).apply {
info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) }
}
override fun deliver(entry: OutboxEntry): DeliveryResult = try {
producer.send(buildRecord(entry)).get()
DeliveryResult.Success
} catch (e: ExecutionException) {
classifyException(e.cause ?: e)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName)
} catch (e: Exception) {
classifyException(e)
}

/**
* Uses fire-flush-await: send all entries, then a single `flush()` (which
* bypasses `linger.ms`), then collect outcomes via non-blocking `Future.get()`
* since completion is already settled. A failing `send()` does not abort the
* batch; the result list mirrors input order.
*
* If `flush()` itself fails (interrupt, fatal producer state), per-entry
* futures still surface their own exception via `get()` and are classified
* individually — the batch as a whole is never abandoned.
*/
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
if (entries.isEmpty()) return emptyList()

return try {
producer.send(record).get()
val inflight: List<Pair<OutboxEntry, SendOutcome>> = entries.map { entry ->
entry to fireOne(entry)
}

try {
producer.flush()
} catch (e: InterruptException) {
Thread.currentThread().interrupt()
logger.warn("Kafka producer.flush() interrupted; per-entry futures will surface the cause", e)
} catch (e: Exception) {
logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e)
}

return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) }
}

private fun fireOne(entry: OutboxEntry): SendOutcome = try {
SendOutcome.Sent(producer.send(buildRecord(entry)))
} catch (e: Exception) {
val classified = classifyException(e)
logger.debug("Kafka send rejected synchronously for entry {}: {}", entry.outboxId, e.toString())
SendOutcome.ImmediateFailure(classified)
}

private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) {
is SendOutcome.ImmediateFailure -> outcome.result
is SendOutcome.Sent -> try {
outcome.future.get()
DeliveryResult.Success
} catch (e: ExecutionException) {
classifyException(e.cause ?: e)
} catch (e: InterruptedException) {
// Thread was interrupted while waiting for an in-flight future. The interrupt may have
// come from flush() (already restored the flag) or from an outer cancellation; either
// way, the entry is unsent — retry semantics, not a poison pill.
Thread.currentThread().interrupt()
DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName)
} catch (e: Exception) {
classifyException(e)
}
}

private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) {
DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error")
} else {
DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error")
private fun buildRecord(entry: OutboxEntry): ProducerRecord<String?, String> {
val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata)
return ProducerRecord<String?, String>(info.topic, info.partitionKey, entry.payload).apply {
info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) }
}
}

private fun classifyException(e: Throwable): DeliveryResult {
val message = e.message ?: e.javaClass.simpleName
return if (e is RetriableException) {
DeliveryResult.RetriableFailure(message)
} else {
DeliveryResult.PermanentFailure(message)
}
}

private sealed interface SendOutcome {
@JvmInline
value class Sent(val future: Future<RecordMetadata>) : SendOutcome
data class ImmediateFailure(val result: DeliveryResult) : SendOutcome
}

companion object {
private val logger = LoggerFactory.getLogger(KafkaMessageDeliverer::class.java)
}
}
Loading