Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,30 @@ class CompositeMessageDeliverer(deliverers: List<MessageDeliverer>) : MessageDel
?: return DeliveryResult.PermanentFailure("No deliverer registered for type '${entry.deliveryType}'")
return messageDeliverer.deliver(entry)
}

/**
* Groups entries by [OutboxEntry.deliveryType] and delegates each sub-batch
* to the matching deliverer's [MessageDeliverer.deliverBatch]. Results are
* re-assembled in original input order.
*
* Entries whose type has no registered deliverer are mapped to
* [DeliveryResult.PermanentFailure] (consistent with [deliver]).
*/
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
if (entries.isEmpty()) return emptyList()

val resultByEntry: Map<OutboxEntry, DeliveryResult> = entries
.groupBy { it.deliveryType }
.flatMap { (type, group) ->
val deliverer = registry[type]
if (deliverer != null) {
deliverer.deliverBatch(group)
} else {
group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") }
}
}
.toMap()

return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,18 @@ interface MessageDeliverer {
val type: String

fun deliver(entry: OutboxEntry): DeliveryResult

/**
* Delivers a batch of entries, returning per-entry results in the same order
* as the input list.
*
* The default implementation delegates to [deliver] sequentially and is
* appropriate for any transport. Implementations whose underlying I/O can
* be overlapped (e.g. Kafka's internal record batching, parallel HTTP
* `sendAsync`) should override this method to exploit that.
*
* Per-entry result classification (Success / RetriableFailure / PermanentFailure)
* is preserved — callers receive one [DeliveryResult] per input entry.
*/
fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> = entries.map { entry -> entry to deliver(entry) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,27 @@ class OutboxEntryProcessor(
private val retryPolicy: RetryPolicy,
private val clock: Clock,
) {
fun process(entry: OutboxEntry): OutboxEntry {
fun process(entry: OutboxEntry): OutboxEntry = applyResult(entry, deliverer.deliver(entry), clock.instant())

/**
* Processes a batch of entries via [MessageDeliverer.deliverBatch], applying
* the retry policy per result. Returns processed entries in the same order
* as the input list.
*/
fun processBatch(entries: List<OutboxEntry>): List<OutboxEntry> {
if (entries.isEmpty()) return emptyList()
val now = clock.instant()
return when (val result = deliverer.deliver(entry)) {
is DeliveryResult.Success -> entry.toDelivered(now)
is DeliveryResult.RetriableFailure ->
if (retryPolicy.shouldRetry(entry.retries)) {
entry.retry(now, result.error)
} else {
entry.toFailed(now, result.error)
}
is DeliveryResult.PermanentFailure -> entry.toFailed(now, result.error)
}
return deliverer.deliverBatch(entries).map { (entry, result) -> applyResult(entry, result, now) }
}

private fun applyResult(entry: OutboxEntry, result: DeliveryResult, now: java.time.Instant): OutboxEntry = when (result) {
is DeliveryResult.Success -> entry.toDelivered(now)
is DeliveryResult.RetriableFailure ->
if (retryPolicy.shouldRetry(entry.retries)) {
entry.retry(now, result.error)
} else {
entry.toFailed(now, result.error)
}
is DeliveryResult.PermanentFailure -> entry.toFailed(now, result.error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,46 @@ import java.time.Duration

/**
* Orchestrates a single processing cycle: claims pending entries from [OutboxStore],
* delegates each to [OutboxEntryProcessor], and persists the result.
* delegates batch delivery to [OutboxEntryProcessor], and persists each result.
*
* An optional [OutboxProcessorListener] is notified after each entry and after the
* full batch. Exceptions in the listener are caught and logged — they never break
* processing. Transaction management is the caller's responsibility.
*
* In the batch processing path, the per-entry `duration` reported in
* [OutboxProcessingEvent] reflects the **wall-clock duration of the whole batch**
* (because transports may overlap their per-entry I/O internally — e.g. Kafka's
* fire-flush-await — making per-entry timing meaningless). Use
* [OutboxProcessorListener.onBatchProcessed] when you need batch-level timing.
*/
class OutboxProcessor(
private val store: OutboxStore,
private val entryProcessor: OutboxEntryProcessor,
private val listener: OutboxProcessorListener? = null,
private val clock: Clock = Clock.systemUTC(),
) {
/**
* Claims up to [limit] pending entries, processes them as a batch, and persists
* each result. Returns the number of entries processed (0 if the store had nothing).
*/
@JvmOverloads
fun processNext(limit: Int = 10) {
fun processNext(limit: Int = 10): Int {
val batchStart = clock.instant()
var count = 0
store.claimPending(limit).forEach { entry ->
val entryStart = clock.instant()
val updated = entryProcessor.process(entry)
val deliveryDuration = Duration.between(entryStart, clock.instant())
val claimed = store.claimPending(limit)
if (claimed.isEmpty()) {
notifyBatch(0, Duration.between(batchStart, clock.instant()))
return 0
}

val processed = entryProcessor.processBatch(claimed)
val batchDuration = Duration.between(batchStart, clock.instant())

processed.forEach { updated ->
store.updateAfterProcessing(updated)
count++
notifyEntry(updated, deliveryDuration)
notifyEntry(updated, batchDuration)
}
notifyBatch(count, Duration.between(batchStart, clock.instant()))
notifyBatch(processed.size, Duration.between(batchStart, clock.instant()))
return processed.size
}

private fun notifyEntry(updated: OutboxEntry, duration: Duration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.softwaremill.okapi.core

import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.string.shouldContain
import io.kotest.matchers.types.shouldBeInstanceOf
import java.time.Instant

private fun deliveryInfo(t: String) = object : DeliveryInfo {
override val type = t
override fun serialize(): String = """{"type":"$t"}"""
}

private fun entryOfType(t: String, id: Int): OutboxEntry =
OutboxEntry.createPending(OutboxMessage("evt-$id", "{}"), deliveryInfo(t), Instant.EPOCH)

private fun fixedDeliverer(t: String, result: DeliveryResult) = object : MessageDeliverer {
override val type = t
override fun deliver(entry: OutboxEntry): DeliveryResult = result
}

class CompositeMessageDelivererTest : FunSpec({
test("deliverBatch groups entries by type, delegates to each transport, preserves input order") {
val composite = CompositeMessageDeliverer(
listOf(
fixedDeliverer("kafka", DeliveryResult.Success),
fixedDeliverer("http", DeliveryResult.RetriableFailure("503")),
),
)
val entries = listOf(
entryOfType("kafka", 1),
entryOfType("http", 2),
entryOfType("kafka", 3),
entryOfType("http", 4),
)

val results = composite.deliverBatch(entries)

results.size shouldBe 4
results.map { it.first } shouldBe entries
results[0].second shouldBe DeliveryResult.Success
results[1].second shouldBe DeliveryResult.RetriableFailure("503")
results[2].second shouldBe DeliveryResult.Success
results[3].second shouldBe DeliveryResult.RetriableFailure("503")
}

test("deliverBatch fails permanently for entries with no registered deliverer") {
val composite = CompositeMessageDeliverer(
listOf(fixedDeliverer("kafka", DeliveryResult.Success)),
)
val entries = listOf(
entryOfType("kafka", 1),
entryOfType("missing", 2),
)

val results = composite.deliverBatch(entries)

results.size shouldBe 2
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing"
}

test("deliverBatch with empty input returns empty list") {
val composite = CompositeMessageDeliverer(emptyList())
composite.deliverBatch(emptyList()) shouldBe emptyList()
}

test("deliverBatch uses each transport's overridden deliverBatch (not just deliver)") {
var batchCallsKafka = 0
var batchCallsHttp = 0
val kafkaDeliverer = object : MessageDeliverer {
override val type = "kafka"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
batchCallsKafka++
return entries.map { it to DeliveryResult.Success }
}
}
val httpDeliverer = object : MessageDeliverer {
override val type = "http"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
batchCallsHttp++
return entries.map { it to DeliveryResult.Success }
}
}
val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer))

composite.deliverBatch(
listOf(
entryOfType("kafka", 1),
entryOfType("http", 2),
entryOfType("kafka", 3),
),
)

batchCallsKafka shouldBe 1
batchCallsHttp shouldBe 1
}
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.softwaremill.okapi.core

import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.time.Instant

private val stubDeliveryInfo = object : DeliveryInfo {
override val type = "stub"
override fun serialize(): String = """{"type":"stub"}"""
}

private fun stubEntry(id: Int) = OutboxEntry.createPending(OutboxMessage("evt-$id", "{}"), stubDeliveryInfo, Instant.EPOCH)

private fun delivererReturning(vararg results: DeliveryResult) = object : MessageDeliverer {
override val type = "stub"
private var idx = 0
override fun deliver(entry: OutboxEntry): DeliveryResult = results[idx++]
}

class MessageDelivererTest : FunSpec({
test("default deliverBatch delegates to deliver and preserves input order") {
val deliverer = delivererReturning(
DeliveryResult.Success,
DeliveryResult.RetriableFailure("err1"),
DeliveryResult.PermanentFailure("err2"),
)
val entries = listOf(stubEntry(1), stubEntry(2), stubEntry(3))

val results = deliverer.deliverBatch(entries)

results.size shouldBe 3
results[0].first shouldBe entries[0]
results[0].second shouldBe DeliveryResult.Success
results[1].first shouldBe entries[1]
results[1].second shouldBe DeliveryResult.RetriableFailure("err1")
results[2].first shouldBe entries[2]
results[2].second shouldBe DeliveryResult.PermanentFailure("err2")
}

test("default deliverBatch on empty input returns empty list without calling deliver") {
var deliverCalls = 0
val deliverer = object : MessageDeliverer {
override val type = "stub"
override fun deliver(entry: OutboxEntry): DeliveryResult {
deliverCalls++
return DeliveryResult.Success
}
}

deliverer.deliverBatch(emptyList()) shouldBe emptyList()
deliverCalls shouldBe 0
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,50 @@ class OutboxEntryProcessorTest :
}
}

given("processBatch() — mixed results") {
`when`("called with three entries returning Success, RetriableFailure, PermanentFailure") {
val deliverer = object : MessageDeliverer {
override val type = "stub"
private val results = listOf(
DeliveryResult.Success,
DeliveryResult.RetriableFailure("retry me"),
DeliveryResult.PermanentFailure("never"),
)
private var idx = 0
override fun deliver(entry: OutboxEntry): DeliveryResult = results[idx++]
}
val processor = OutboxEntryProcessor(deliverer, retryPolicy, fixedClock)
val entries = listOf(pendingEntry(), pendingEntry(), pendingEntry())
val results = processor.processBatch(entries)

then("preserves input order") {
results.size shouldBe 3
}
then("first entry is DELIVERED") {
results[0].status shouldBe OutboxStatus.DELIVERED
}
then("second entry is PENDING (retriable, retries remaining)") {
results[1].status shouldBe OutboxStatus.PENDING
results[1].lastError shouldBe "retry me"
}
then("third entry is FAILED (permanent)") {
results[2].status shouldBe OutboxStatus.FAILED
results[2].lastError shouldBe "never"
}
}
}

given("processBatch() — empty input") {
`when`("called with empty list") {
val processor = OutboxEntryProcessor(stubDeliverer(DeliveryResult.Success), retryPolicy, fixedClock)
val results = processor.processBatch(emptyList())

then("returns empty list without invoking deliverer") {
results shouldBe emptyList()
}
}
}

given("process() — PermanentFailure") {
`when`("called with retries=0") {
val processor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class OutboxProcessorTest :
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
OutboxProcessor(store, entryProcessor).processNext(limit = 10)
val returnedCount = OutboxProcessor(store, entryProcessor).processNext(limit = 10)
val results = processedEntries.toList()

then("both entries are updated") {
Expand All @@ -68,6 +68,25 @@ class OutboxProcessorTest :
then("both are DELIVERED") {
results.all { it.status == OutboxStatus.DELIVERED } shouldBe true
}
then("processNext returns the count of processed entries") {
returnedCount shouldBe 2
}
}
}

given("processNext() with empty store — return value") {
`when`("called") {
pendingEntries = emptyList()
val entryProcessor = OutboxEntryProcessor(
deliverer = stubDeliverer(DeliveryResult.Success),
retryPolicy = RetryPolicy(maxRetries = 3),
clock = fixedClock,
)
val returnedCount = OutboxProcessor(store, entryProcessor).processNext()

then("returns 0") {
returnedCount shouldBe 0
}
}
}

Expand Down