docs: add Event Aggregator section to README#61
docs: add Event Aggregator section to README#61aleksandar-apostolov wants to merge 16 commits intodevelopfrom
Conversation
Replace the batching approach in StreamSocketSession with an adaptive event aggregator that switches between individual and aggregated delivery based on traffic volume. Low traffic: events pass through individually (no overhead). Spike: events grouped by type into StreamAggregatedEvent, enabling product SDKs to apply one state update instead of thousands. Architecture: two decoupled coroutines (collector + dispatcher) with configurable threshold, maxWindow ceiling, and bounded dispatch queue. BREAKING CHANGE: StreamSocketConfig batch params replaced with aggregation params. StreamComponentProvider.batcher replaced with eventAggregator.
…alizers Wrap typeExtractor and deserializer calls in try-catch so a throwing function doesn't kill the collector coroutine. Previously only Result.failure was handled — a thrown exception would stop all event processing. Also adds 6 edge case tests: - typeExtractor throws → collector survives - deserializer throws (not Result.failure) → collector survives - handler throws → subsequent events still delivered - exactly-at-threshold boundary → triggers aggregation - all events fail deserialization → batch dropped, no crash - dispatch queue full → events dropped with warning, no crash
… sanitized logging - Defensive copy in StreamAggregatedEvent (mutable lists no longer leak) - AtomicReference for eventHandler (thread-safe handler registration) - Terminal stop() with closed guard (prevent resurrection after stop) - Parameter validation in factory (require threshold/window/capacity > 0) - Sanitize socket message logging (log byte count, not raw payload) - Explicit aggregator.start() in session connect (not just auto-start) - Handle listener dispatch Result failures with onFailure logging - Log aggregator.stop() failures in cleanup - KDoc "exceed" → "reach or exceed" for threshold semantics
…, add 10K stress test - Replace Channel with StreamRestartableChannel for inbox and dispatch queue — supports stop/start lifecycle across socket reconnect cycles - Remove auto-start from offer() — session manages lifecycle explicitly - Remove terminal closed guard (no longer needed with restartable channels) - Add braces to all single-line if statements - Add 10K event stress test: 10,000 events → 200 handler calls (50x reduction), 129ms total processing time - Replace "offer returns false after stop" test with "stop then start resumes processing" test verifying restartable behavior
- All events delivered (count == offered) - Each batch ≤ threshold events - Fewer handler calls than raw events
- Wire logger through factory constructor (no longer nullable internal var only) - Assert custom aggregation values in config factory test (not just existence) - Assert queue-full warning logged in dispatch queue test - Tighten aggregate shape assertion in session test (verify core events excluded from product aggregate, verify exact product event contents)
Test all require() guards: aggregationThreshold, maxWindowMs, and dispatchQueueCapacity reject zero and negative values.
Replace two loose lambdas (typeExtractor + deserializer) and separate config values with a validated policy object. Policy combines behavior (type extraction, deserialization) with tuning (threshold, window, queue capacity). Validated at construction via require() — once you have a policy, it's guaranteed valid. StreamSocketConfig keeps the tuning values for product SDK configuration. The factory builds the policy internally from config + event parser. Custom aggregator injection via StreamComponentProvider bypasses the policy entirely.
Replace manual try/catch blocks with runCatchingCancellable to ensure CancellationException is always rethrown (structured concurrency) and no catch blocks are left empty. All failure paths now log via onFailure.
…external calls Collector and dispatcher loops catch ClosedReceiveChannelException explicitly for graceful shutdown logging. CancellationException propagates naturally (structured concurrency). runCatchingCancellable is used only for external code (handler, deserializer, type extractor) that can throw unexpectedly.
- offer before start returns false - double start is idempotent (no duplicate workers) - handler exception on aggregated event does not break dispatcher Line coverage: 89% → 93%, Branch: 74% → 78%, Instruction: 91% → 92%.
The impl now takes StreamEventAggregationPolicy<T> instead of 5 separate fields. Policy is the single source of truth for behavior and configuration — no unpacking at the boundary.
Fields moved from impl to policy — reflection path updated.
Remove unnecessary Thread.sleep, assert each offer succeeds, increase latch timeout from 30s to 60s for loaded CI runners.
StreamAggregatedEvent now holds a flat List<T> in arrival order instead of Map<String, List<T>> grouped by type. Addresses Gian's review: event ordering across types matters for correctness (e.g. "reaction added" must precede "reaction removed" for the same entity). Grouping by type key loses this inter-type ordering. Product SDKs process the list sequentially in one atomic state update. - StreamAggregatedEvent: Map<String, List<T>> → List<T> - StreamEventAggregatorImpl: removed groupBy, collects flat list - StreamSocketSession.handleAggregatedEvent: iterates flat list - Removed unused safeExtractType from impl - Updated all tests
- New Feature Guide section for StreamEventAggregator covering adaptive behavior, event ordering (flat List, not Map), and code examples - Specific subsection on handling StreamAggregatedEvent in subscriptions — product SDKs must handle both individual and aggregated events - Note in Subscription Management section about aggregated events - Updated Table of Contents
|
wrong branch |
|
Caution Review failedPull request was closed or merged during review WalkthroughThis PR replaces the event batching mechanism with a new adaptive event aggregation pipeline. It introduces Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/<br/>App
participant Session as Socket<br/>Session
participant Aggregator as Event<br/>Aggregator
participant Collector as Collector<br/>Coroutine
participant Dispatcher as Dispatcher<br/>Coroutine
participant Handler as Event<br/>Handler
participant Parser as Event<br/>Parser
Client->>Session: connect()
Session->>Aggregator: start()
Aggregator->>Collector: spawn (await first event or timeout)
Aggregator->>Dispatcher: spawn (await dispatch items)
Note over Session: Low traffic scenario
Session->>Aggregator: offer(raw string event)
Aggregator->>Collector: enqueue to inbox
Collector->>Parser: peekType & deserialize
Parser-->>Collector: T (decoded event)
Collector->>Aggregator: send Individual(T)
Aggregator->>Dispatcher: enqueue dispatch item
Dispatcher->>Handler: invoke(T)
Note over Session: High traffic / Spike scenario
Session->>Aggregator: offer(raw string 1)
Session->>Aggregator: offer(raw string 2)
Session->>Aggregator: offer(raw string N)
Aggregator->>Collector: collect up to threshold<br/>or timeout
Collector->>Parser: deserialize each
Parser-->>Collector: T1, T2, ..., TN
Collector->>Aggregator: send Aggregated([T1..TN])
Aggregator->>Dispatcher: enqueue dispatch item
Dispatcher->>Handler: invoke(StreamAggregatedEvent)
Handler->>Handler: process event.events sequentially
Client->>Session: disconnect()
Session->>Aggregator: stop()
Aggregator->>Collector: cancel job
Aggregator->>Dispatcher: cancel job
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
Goal
Document the Event Aggregator in the README, with specific guidance on handling aggregated events in subscriptions.
Implementation
List<T>in arrival order (notMap<String, List<T>>)onEventhandling both pathsStreamAggregatedEventinStreamClientListenerTesting
StreamAggregatedEvent)pr: core
Summary by CodeRabbit
Release Notes
New Features
Documentation
Tests