Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Stream Android Core uses annotations to distinguish between stable public APIs a
- [Batcher](#batcher)
- [Debouncer](#debouncer)
- [Throttler](#throttler)
- [Event Aggregator](#event-aggregator)
- [Threading Utilities](#threading-utilities)
- [Token Management](#token-management)
- [WebSocket Connections](#websocket-connections)
Expand Down Expand Up @@ -361,6 +362,24 @@ enum class Retention {
- `AUTO_REMOVE`: UI components, fragments, activities (automatic cleanup)
- `KEEP_UNTIL_CANCELLED`: Long-lived services, singletons (explicit lifecycle)

#### Handling Aggregated Events

When the [Event Aggregator](#event-aggregator) is active, listeners registered via `StreamSubscriptionManager` may receive `StreamAggregatedEvent` instances during traffic spikes in addition to individual events. Your listener **must** handle both:

```kotlin
subscriptionManager.forEach { listener ->
when (event) {
is StreamAggregatedEvent<*> -> {
// Batch of events in arrival order — process sequentially
event.events.forEach { listener.onEvent(it) }
}
else -> listener.onEvent(event)
}
}
```

See the [Event Aggregator](#event-aggregator) section for full details on ordering guarantees and atomic state updates.

---

### Serial Processing Queue
Expand Down Expand Up @@ -636,6 +655,87 @@ throttler.reset()

---

### Event Aggregator

Adaptive event aggregator that switches between individual and batched event delivery based on traffic volume.

#### How It Works

During normal traffic, WebSocket events are deserialized and dispatched one at a time. During spikes, the aggregator collects events within a time window and delivers them as a single `StreamAggregatedEvent` — a flat list preserving arrival order.

```kotlin
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamEventAggregationPolicy
import io.getstream.android.core.api.processing.StreamAggregatedEvent

val aggregator = StreamEventAggregator<MyEvent>(
scope = scope,
policy = StreamEventAggregationPolicy.from(
typeExtractor = { raw -> extractType(raw) },
deserializer = { raw -> Result.success(deserialize(raw)) },
aggregationThreshold = 10, // Aggregate when >= 10 events accumulated
maxWindowMs = 500, // Max collection window before flushing
),
)

aggregator.onEvent { event ->
when (event) {
is StreamAggregatedEvent<*> -> {
// Spike — process all events sequentially in one atomic state update
event.events.forEach { singleEvent ->
handleEvent(singleEvent)
}
}
is MyEvent -> {
// Normal traffic — single event
handleEvent(event)
}
}
}

aggregator.start()
```

#### Event Ordering

`StreamAggregatedEvent.events` is a **flat `List<T>` in arrival order** — not grouped by type. This is critical for correctness: events like "reaction added" must be processed before "reaction removed" for the same entity. Grouping by type key would lose this inter-type ordering.

Product SDKs should process the list sequentially and apply all updates in one atomic state mutation, so UI recomposes once per batch instead of once per event.

#### Handling Aggregated Events in Subscriptions

When subscribing to `StreamClient` events via `StreamClientListener`, your `onEvent` handler may receive either an individual event or a `StreamAggregatedEvent`. Product SDKs must handle both:

```kotlin
val listener = object : StreamClientListener {
override fun onEvent(event: Any) {
when (event) {
is StreamAggregatedEvent<*> -> {
// Apply all events in one state transaction
val events = event.events.filterIsInstance<MyProductEvent>()
stateStore.atomicUpdate { state ->
events.fold(state) { acc, e -> applyEvent(acc, e) }
}
}
is MyProductEvent -> {
// Single event — apply directly
stateStore.update { state -> applyEvent(state, event) }
}
}
}
}
```

Failing to handle `StreamAggregatedEvent` means aggregated events are silently ignored during traffic spikes.

#### Use Cases

- WebSocket event delivery (automatic — built into `StreamSocketSession`)
- High-frequency event streams (chat rooms with many participants)
- Any scenario where burst traffic causes excessive UI recomposition

---

### Threading Utilities

Safe cross-thread execution with timeout protection.
Expand Down
Loading