Skip to content

docs: add Event Aggregator section to README#61

Closed
aleksandar-apostolov wants to merge 16 commits intodevelopfrom
docs/event-aggregator-readme
Closed

docs: add Event Aggregator section to README#61
aleksandar-apostolov wants to merge 16 commits intodevelopfrom
docs/event-aggregator-readme

Conversation

@aleksandar-apostolov
Copy link
Copy Markdown
Collaborator

@aleksandar-apostolov aleksandar-apostolov commented Apr 29, 2026

Goal

Document the Event Aggregator in the README, with specific guidance on handling aggregated events in subscriptions.

Implementation

  • New Event Aggregator section in Feature Guides covering:
    • Adaptive behavior (individual vs batched delivery)
    • Event ordering: flat List<T> in arrival order (not Map<String, List<T>>)
    • Why ordering matters (e.g. "reaction added" before "reaction removed")
    • Code examples for onEvent handling both paths
    • Specific subsection on handling StreamAggregatedEvent in StreamClientListener
  • Updated Subscription Management section with a note about aggregated events
  • Updated Table of Contents

Testing

  • README renders correctly on GitHub
  • Code examples match current API (List-based StreamAggregatedEvent)

pr: core

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced event aggregation for improved performance during high-traffic periods; listeners may now receive batched events grouped together in addition to individual events.
  • Documentation

    • Updated README with comprehensive guidance on the new event aggregator feature, including configuration examples and best practices for handling aggregated events.
  • Tests

    • Added extensive test coverage for the event aggregation system.

Replace the batching approach in StreamSocketSession with an adaptive
event aggregator that switches between individual and aggregated
delivery based on traffic volume.

Low traffic: events pass through individually (no overhead).
Spike: events grouped by type into StreamAggregatedEvent, enabling
product SDKs to apply one state update instead of thousands.

Architecture: two decoupled coroutines (collector + dispatcher) with
configurable threshold, maxWindow ceiling, and bounded dispatch queue.

BREAKING CHANGE: StreamSocketConfig batch params replaced with
aggregation params. StreamComponentProvider.batcher replaced with
eventAggregator.
…alizers

Wrap typeExtractor and deserializer calls in try-catch so a throwing
function doesn't kill the collector coroutine. Previously only
Result.failure was handled — a thrown exception would stop all event
processing.

Also adds 6 edge case tests:
- typeExtractor throws → collector survives
- deserializer throws (not Result.failure) → collector survives
- handler throws → subsequent events still delivered
- exactly-at-threshold boundary → triggers aggregation
- all events fail deserialization → batch dropped, no crash
- dispatch queue full → events dropped with warning, no crash
… sanitized logging

- Defensive copy in StreamAggregatedEvent (mutable lists no longer leak)
- AtomicReference for eventHandler (thread-safe handler registration)
- Terminal stop() with closed guard (prevent resurrection after stop)
- Parameter validation in factory (require threshold/window/capacity > 0)
- Sanitize socket message logging (log byte count, not raw payload)
- Explicit aggregator.start() in session connect (not just auto-start)
- Handle listener dispatch Result failures with onFailure logging
- Log aggregator.stop() failures in cleanup
- KDoc "exceed" → "reach or exceed" for threshold semantics
…, add 10K stress test

- Replace Channel with StreamRestartableChannel for inbox and dispatch
  queue — supports stop/start lifecycle across socket reconnect cycles
- Remove auto-start from offer() — session manages lifecycle explicitly
- Remove terminal closed guard (no longer needed with restartable channels)
- Add braces to all single-line if statements
- Add 10K event stress test: 10,000 events → 200 handler calls (50x
  reduction), 129ms total processing time
- Replace "offer returns false after stop" test with "stop then start
  resumes processing" test verifying restartable behavior
- All events delivered (count == offered)
- Each batch ≤ threshold events
- Fewer handler calls than raw events
- Wire logger through factory constructor (no longer nullable internal var only)
- Assert custom aggregation values in config factory test (not just existence)
- Assert queue-full warning logged in dispatch queue test
- Tighten aggregate shape assertion in session test (verify core events
  excluded from product aggregate, verify exact product event contents)
Test all require() guards: aggregationThreshold, maxWindowMs, and
dispatchQueueCapacity reject zero and negative values.
Replace two loose lambdas (typeExtractor + deserializer) and separate
config values with a validated policy object. Policy combines behavior
(type extraction, deserialization) with tuning (threshold, window,
queue capacity). Validated at construction via require() — once you
have a policy, it's guaranteed valid.

StreamSocketConfig keeps the tuning values for product SDK
configuration. The factory builds the policy internally from config +
event parser. Custom aggregator injection via StreamComponentProvider
bypasses the policy entirely.
Replace manual try/catch blocks with runCatchingCancellable to ensure
CancellationException is always rethrown (structured concurrency) and
no catch blocks are left empty. All failure paths now log via onFailure.
…external calls

Collector and dispatcher loops catch ClosedReceiveChannelException
explicitly for graceful shutdown logging. CancellationException
propagates naturally (structured concurrency). runCatchingCancellable
is used only for external code (handler, deserializer, type extractor)
that can throw unexpectedly.
- offer before start returns false
- double start is idempotent (no duplicate workers)
- handler exception on aggregated event does not break dispatcher

Line coverage: 89% → 93%, Branch: 74% → 78%, Instruction: 91% → 92%.
The impl now takes StreamEventAggregationPolicy<T> instead of 5
separate fields. Policy is the single source of truth for behavior
and configuration — no unpacking at the boundary.
Fields moved from impl to policy — reflection path updated.
Remove unnecessary Thread.sleep, assert each offer succeeds, increase
latch timeout from 30s to 60s for loaded CI runners.
StreamAggregatedEvent now holds a flat List<T> in arrival order instead
of Map<String, List<T>> grouped by type.

Addresses Gian's review: event ordering across types matters for
correctness (e.g. "reaction added" must precede "reaction removed" for
the same entity). Grouping by type key loses this inter-type ordering.

Product SDKs process the list sequentially in one atomic state update.

- StreamAggregatedEvent: Map<String, List<T>> → List<T>
- StreamEventAggregatorImpl: removed groupBy, collects flat list
- StreamSocketSession.handleAggregatedEvent: iterates flat list
- Removed unused safeExtractType from impl
- Updated all tests
- New Feature Guide section for StreamEventAggregator covering adaptive
  behavior, event ordering (flat List, not Map), and code examples
- Specific subsection on handling StreamAggregatedEvent in subscriptions
  — product SDKs must handle both individual and aggregated events
- Note in Subscription Management section about aggregated events
- Updated Table of Contents
@aleksandar-apostolov
Copy link
Copy Markdown
Collaborator Author

wrong branch

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 29, 2026

Caution

Review failed

Pull request was closed or merged during review

Walkthrough

This PR replaces the event batching mechanism with a new adaptive event aggregation pipeline. It introduces StreamEventAggregator, StreamEventAggregationPolicy, and StreamAggregatedEvent to handle traffic spikes by collecting events into batches, delivering them either individually or as aggregated payloads. Socket configuration migrates from batch parameters to aggregation-specific tuning.

Changes

Cohort / File(s) Summary
Configuration & Socket Model
stream-android-core/.../StreamSocketConfig.kt, stream-android-core/.../StreamComponentProvider.kt
Removes batch parameters (batchSize, batchInitialDelayMs, batchMaxDelayMs) and introduces aggregation parameters (aggregationThreshold, aggregationMaxWindowMs, aggregationDispatchQueueCapacity). Data class property batcher replaced with eventAggregator.
Core Aggregator API & Implementation
stream-android-core/.../StreamEventAggregator.kt, stream-android-core/.../StreamEventAggregationPolicy.kt, stream-android-core/.../StreamAggregatedEvent.kt, stream-android-core/.../StreamEventAggregatorImpl.kt
Introduces public interface StreamEventAggregator<T> with lifecycle methods (start, stop, offer) and event handler registration. Policy class validates aggregation tuning with factory method. StreamAggregatedEvent<T> model holds batched events. Implementation uses coroutines for collection, deserialization, and dispatch with timeout-based threshold triggering.
Client & Socket Wiring
stream-android-core/.../StreamClient.kt, stream-android-core/.../StreamSocketSession.kt, stream-android-core/.../StreamCompositeEventSerializationImpl.kt
Removes StreamBatcher wiring; injects StreamEventAggregator into socket session. Socket session registers event handler that branches on aggregation vs. individual events. peekType visibility changed to internal for policy use.
Documentation & Tests
README.md, stream-android-core/.../StreamClientConfigFactoryTest.kt, stream-android-core/.../StreamClientFactoryTest.kt, stream-android-core/.../StreamSocketConfigTest.kt, stream-android-core/.../StreamEventAggregatorImplTest.kt, stream-android-core/.../StreamSocketSessionTest.kt
README documents aggregator semantics and configuration. Test suites updated to verify aggregation instead of batching; new comprehensive test class validates policy constraints, lifecycle idempotency, individual/aggregated dispatch, timeout-based dispatch, deserialization resilience, queue-full handling, and stress scenarios (10K events).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client/<br/>App
    participant Session as Socket<br/>Session
    participant Aggregator as Event<br/>Aggregator
    participant Collector as Collector<br/>Coroutine
    participant Dispatcher as Dispatcher<br/>Coroutine
    participant Handler as Event<br/>Handler
    participant Parser as Event<br/>Parser

    Client->>Session: connect()
    Session->>Aggregator: start()
    Aggregator->>Collector: spawn (await first event or timeout)
    Aggregator->>Dispatcher: spawn (await dispatch items)

    Note over Session: Low traffic scenario
    Session->>Aggregator: offer(raw string event)
    Aggregator->>Collector: enqueue to inbox
    Collector->>Parser: peekType & deserialize
    Parser-->>Collector: T (decoded event)
    Collector->>Aggregator: send Individual(T)
    Aggregator->>Dispatcher: enqueue dispatch item
    Dispatcher->>Handler: invoke(T)

    Note over Session: High traffic / Spike scenario
    Session->>Aggregator: offer(raw string 1)
    Session->>Aggregator: offer(raw string 2)
    Session->>Aggregator: offer(raw string N)
    Aggregator->>Collector: collect up to threshold<br/>or timeout
    Collector->>Parser: deserialize each
    Parser-->>Collector: T1, T2, ..., TN
    Collector->>Aggregator: send Aggregated([T1..TN])
    Aggregator->>Dispatcher: enqueue dispatch item
    Dispatcher->>Handler: invoke(StreamAggregatedEvent)
    Handler->>Handler: process event.events sequentially

    Client->>Session: disconnect()
    Session->>Aggregator: stop()
    Aggregator->>Collector: cancel job
    Aggregator->>Dispatcher: cancel job
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

pr:breaking-change

Suggested reviewers

  • gpunto

Poem

🐰 Hop! Events now aggregate with grace,
When traffic spikes fill up the place,
Thresholds met, time windows pass,
Batched together, landing fast—
No more lonely queues, just grouped delight! 🐇✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.65% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding Event Aggregator documentation to the README.
Description check ✅ Passed The description covers Goal, Implementation, and Testing sections with specific details about the Event Aggregator documentation changes, but lacks a complete Checklist section.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch docs/event-aggregator-readme
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch docs/event-aggregator-readme

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:documentation Documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant