Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6706625
feat(processing): replace StreamBatcher with StreamEventAggregator
aleksandar-apostolov Apr 22, 2026
0fb3a73
fix(processing): harden aggregator against throwing extractors/deseri…
aleksandar-apostolov Apr 22, 2026
7dae14c
fix(processing): address review feedback — thread safety, validation,…
aleksandar-apostolov Apr 22, 2026
2ce0b8b
refactor(processing): use StreamRestartableChannel, remove auto-start…
aleksandar-apostolov Apr 22, 2026
eae6cf4
test(processing): assert all three stress test guarantees
aleksandar-apostolov Apr 22, 2026
ef95b74
fix(processing): address remaining review comments
aleksandar-apostolov Apr 22, 2026
5f51987
test(processing): add factory parameter validation tests
aleksandar-apostolov Apr 22, 2026
398efc3
refactor(processing): introduce StreamEventAggregationPolicy
aleksandar-apostolov Apr 22, 2026
a069e0a
fix(processing): use runCatchingCancellable throughout aggregator
aleksandar-apostolov Apr 22, 2026
034ed06
fix(processing): explicit shutdown catch, runCatchingCancellable for …
aleksandar-apostolov Apr 22, 2026
729ff79
test(processing): close coverage gaps in StreamEventAggregatorImpl
aleksandar-apostolov Apr 22, 2026
bcb810c
refactor(processing): pass policy directly to StreamEventAggregatorImpl
aleksandar-apostolov Apr 22, 2026
843c8d3
fix(test): read policy fields through aggregator.policy in config test
aleksandar-apostolov Apr 23, 2026
8bd6aef
fix(test): make stress test resilient to slow CI
aleksandar-apostolov Apr 23, 2026
083cc7a
refactor(aggregator): change StreamAggregatedEvent from Map to List
aleksandar-apostolov Apr 28, 2026
7da11a5
docs: add Event Aggregator section to README
aleksandar-apostolov Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Stream Android Core uses annotations to distinguish between stable public APIs a
- [Batcher](#batcher)
- [Debouncer](#debouncer)
- [Throttler](#throttler)
- [Event Aggregator](#event-aggregator)
- [Threading Utilities](#threading-utilities)
- [Token Management](#token-management)
- [WebSocket Connections](#websocket-connections)
Expand Down Expand Up @@ -361,6 +362,24 @@ enum class Retention {
- `AUTO_REMOVE`: UI components, fragments, activities (automatic cleanup)
- `KEEP_UNTIL_CANCELLED`: Long-lived services, singletons (explicit lifecycle)

#### Handling Aggregated Events

When the [Event Aggregator](#event-aggregator) is active, listeners registered via `StreamSubscriptionManager` may receive `StreamAggregatedEvent` instances during traffic spikes in addition to individual events. Your listener **must** handle both:

```kotlin
subscriptionManager.forEach { listener ->
when (event) {
is StreamAggregatedEvent<*> -> {
// Batch of events in arrival order — process sequentially
event.events.forEach { listener.onEvent(it) }
}
else -> listener.onEvent(event)
}
}
```

See the [Event Aggregator](#event-aggregator) section for full details on ordering guarantees and atomic state updates.

---

### Serial Processing Queue
Expand Down Expand Up @@ -636,6 +655,87 @@ throttler.reset()

---

### Event Aggregator

Adaptive event aggregator that switches between individual and batched event delivery based on traffic volume.

#### How It Works

During normal traffic, WebSocket events are deserialized and dispatched one at a time. During spikes, the aggregator collects events within a time window and delivers them as a single `StreamAggregatedEvent` — a flat list preserving arrival order.

```kotlin
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamEventAggregationPolicy
import io.getstream.android.core.api.processing.StreamAggregatedEvent

val aggregator = StreamEventAggregator<MyEvent>(
scope = scope,
policy = StreamEventAggregationPolicy.from(
typeExtractor = { raw -> extractType(raw) },
deserializer = { raw -> Result.success(deserialize(raw)) },
aggregationThreshold = 10, // Aggregate when >= 10 events accumulated
maxWindowMs = 500, // Max collection window before flushing
),
)

aggregator.onEvent { event ->
when (event) {
is StreamAggregatedEvent<*> -> {
// Spike — process all events sequentially in one atomic state update
event.events.forEach { singleEvent ->
handleEvent(singleEvent)
}
}
is MyEvent -> {
// Normal traffic — single event
handleEvent(event)
}
}
}

aggregator.start()
```

#### Event Ordering

`StreamAggregatedEvent.events` is a **flat `List<T>` in arrival order** — not grouped by type. This is critical for correctness: events like "reaction added" must be processed before "reaction removed" for the same entity. Grouping by type key would lose this inter-type ordering.

Product SDKs should process the list sequentially and apply all updates in one atomic state mutation, so UI recomposes once per batch instead of once per event.

#### Handling Aggregated Events in Subscriptions

When subscribing to `StreamClient` events via `StreamClientListener`, your `onEvent` handler may receive either an individual event or a `StreamAggregatedEvent`. Product SDKs must handle both:

```kotlin
val listener = object : StreamClientListener {
override fun onEvent(event: Any) {
when (event) {
is StreamAggregatedEvent<*> -> {
// Apply all events in one state transaction
val events = event.events.filterIsInstance<MyProductEvent>()
stateStore.atomicUpdate { state ->
events.fold(state) { acc, e -> applyEvent(acc, e) }
}
}
is MyProductEvent -> {
// Single event — apply directly
stateStore.update { state -> applyEvent(state, event) }
}
}
}
}
```

Failing to handle `StreamAggregatedEvent` means aggregated events are silently ignored during traffic spikes.

#### Use Cases

- WebSocket event delivery (automatic — built into `StreamSocketSession`)
- High-frequency event streams (chat rooms with many participants)
- Any scenario where burst traffic causes excessive UI recomposition

---

### Threading Utilities

Safe cross-thread execution with timeout protection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleS
import io.getstream.android.core.api.model.connection.network.StreamNetworkState
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
import io.getstream.android.core.api.processing.StreamBatcher
import io.getstream.android.core.api.processing.StreamEventAggregationPolicy
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
Expand Down Expand Up @@ -249,14 +250,7 @@ public fun StreamClient(
socketFactory =
components.socketFactory
?: StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")),
batcher =
components.batcher
?: StreamBatcher(
scope = scope,
batchSize = socketConfig.batchSize,
initialDelayMs = socketConfig.batchInitialDelayMs,
maxDelayMs = socketConfig.batchMaxDelayMs,
),
eventAggregator = components.eventAggregator,
healthMonitor =
components.healthMonitor
?: StreamHealthMonitor(
Expand Down Expand Up @@ -320,13 +314,7 @@ internal fun createStreamClientInternal(
connectionIdHolder: StreamConnectionIdHolder = StreamConnectionIdHolder(),
socketFactory: StreamWebSocketFactory =
StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")),
batcher: StreamBatcher<String> =
StreamBatcher(
scope = scope,
batchSize = socketConfig.batchSize,
initialDelayMs = socketConfig.batchInitialDelayMs,
maxDelayMs = socketConfig.batchMaxDelayMs,
),
eventAggregator: StreamEventAggregator<*>? = null,

// Monitoring
healthMonitor: StreamHealthMonitor =
Expand Down Expand Up @@ -427,6 +415,28 @@ internal fun createStreamClientInternal(
),
)

val eventParser =
StreamCompositeEventSerializationImpl(
internal =
serializationConfig.eventParser ?: StreamEventSerialization(compositeSerialization),
external = serializationConfig.productEventSerializers,
)

val resolvedAggregator =
eventAggregator
?: StreamEventAggregator(
scope = clientScope,
policy =
StreamEventAggregationPolicy.from(
typeExtractor = { raw -> eventParser.peekType(raw) },
deserializer = { raw -> eventParser.deserialize(raw) },
aggregationThreshold = socketConfig.aggregationThreshold,
maxWindowMs = socketConfig.aggregationMaxWindowMs,
dispatchQueueCapacity = socketConfig.aggregationDispatchQueueCapacity,
),
logger = logProvider.taggedLogger("SCEventAggregator"),
)

val mutableConnectionState = MutableStateFlow<StreamConnectionState>(StreamConnectionState.Idle)
return StreamClientImpl(
user = user,
Expand All @@ -446,15 +456,9 @@ internal fun createStreamClientInternal(
products = products,
config = socketConfig,
jsonSerialization = compositeSerialization,
eventParser =
StreamCompositeEventSerializationImpl(
internal =
serializationConfig.eventParser
?: StreamEventSerialization(compositeSerialization),
external = serializationConfig.productEventSerializers,
),
eventParser = eventParser,
healthMonitor = healthMonitor,
batcher = batcher,
aggregator = resolvedAggregator,
internalSocket = socket,
subscriptionManager =
StreamSubscriptionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.getstream.android.core.api.components.StreamAndroidComponentsProvider
import io.getstream.android.core.api.log.StreamLoggerProvider
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
import io.getstream.android.core.api.processing.StreamBatcher
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
Expand Down Expand Up @@ -63,7 +63,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
* @param tokenManager Token lifecycle manager.
* @param connectionIdHolder Connection ID storage.
* @param socketFactory WebSocket factory.
* @param batcher WebSocket message batcher.
* @param eventAggregator WebSocket event aggregator.
* @param healthMonitor Connection health monitor.
* @param networkMonitor Network connectivity monitor.
* @param lifecycleMonitor App lifecycle monitor.
Expand All @@ -80,7 +80,7 @@ public data class StreamComponentProvider(
val tokenManager: StreamTokenManager? = null,
val connectionIdHolder: StreamConnectionIdHolder? = null,
val socketFactory: StreamWebSocketFactory? = null,
val batcher: StreamBatcher<String>? = null,
val eventAggregator: StreamEventAggregator<*>? = null,
val healthMonitor: StreamHealthMonitor? = null,
val networkMonitor: StreamNetworkMonitor? = null,
val lifecycleMonitor: StreamLifecycleMonitor? = null,
Expand Down
Loading