Skip to content

ENG-9028: Move event queue to backend#6267

Open
masenf wants to merge 55 commits intomainfrom
masenf/event-context-rb
Open

ENG-9028: Move event queue to backend#6267
masenf wants to merge 55 commits intomainfrom
masenf/event-context-rb

Conversation

@masenf
Copy link
Copy Markdown
Collaborator

@masenf masenf commented Apr 1, 2026

This patch moves the primary responsibility of chaining/queueing events to the backend. Previously the frontend was responsible for sending events to the backend. This created additional round-trip latency when chaining events using yield.

Summary: Event Processing & State Management Overhaul

Contexts are King

Instead of relying on ClassVar, module globals, and other hacks to track reflex objects, this PR introduces contextvars to the code base and leverages them to manage global state safely from anywhere in the app.

New Modules (in reflex_core._internal)

  • registry.py — ContextVar-based registry for BaseState classes and EventHandlers, replacing implicit global lookups. Currently used for states, events and stateful components. Eventually will be used for DECORATED_PAGES and other weird globals.
  • event/context.pyEventContext (inherits BaseContext) holding per-event metadata: token, state manager ref, enqueue/emit/delta callbacks
  • event/processor/ — New BaseStateEventProcessor and EventProcessor classes that own the full event lifecycle (previously spread across App and State methods)
  • context/base.py — Generic BaseContext ContextVar wrapper

Breaking Changes (0.9.0)

  • Event.token field removedEvent no longer carries a token; the token lives in EventContext
  • Event.substate_token replaced by Event.state_cls property (resolved via the registry)
  • Delta type refined from dict[str, Any]dict[str, dict[str, Any]] (nested by substate name)
  • StateManager.create() no longer takes a state= arg — state classes are discovered from the registry
  • StateToken introduced (reflex/istate/manager/token.py) — typed generic token replacing raw "client_token_substate" strings in all state managers (disk, memory, redis)
  • EventHandlerSetVar.state_cls.state field rename to align with EventHandler change.
  • fix_events removed from sortof public event API, replaced by Event.from_event_type()
  • get_hydrate_event removed — replaced by internal rehydration; simulated pre-hydrated states removed
  • App._background_tasks replaced by App.event_processor._tasks (processor manages background task lifecycle in the same way as normal event tasks)
  • AppHarness (reflex/testing.py) simplifiedstate_manager property and related methods removed

State Manager Changes

  • All three managers (disk, memory, redis) refactored to use StateToken[BaseState] instead of raw string tokens
  • State managers now close() old locks properly
  • OPLOCK_ENABLED support in state manager close/tests

Frontend (state.js)

  • Params passed around as a ref instead of inline values
  • StateUpdate only includes non-empty fields, reducing bytes over the wire.

Test Infrastructure

  • Large test suite overhaul: new fixtures in conftest.py for EventProcessor, registry, and context
  • get_app / mock_app dependencies removed from tests in favor of the new processor-based approach
  • New unit tests for registry, event context, base state processor, and event processor
  • Integration tests updated to replace all backend state assertions with in-app equivalents.

masenf added 30 commits March 17, 2026 15:03
Special case access patterns for BaseState retrieval to allow for other types
of state that have different semantics.
Update all unit test cases to use the new StateToken / BaseStateToken API
Create the EventProcessor class to manage the backend event queue.

Move event processing logic out of the BaseState and into a separate module.
* No tasks start until `.start()` is called
* add graceful shutdown timeout to allow tasks to finish before cancellation
* use more keyword only parameters
* move BaseState-specific processing to new BaseStateEventProcessor subclass
* add test fixtures for `mock_event_processor` that can process simple registered events
make Event.substate_token no longer work, because we're deprecating `token` as
an Event field, so we cannot rely on it under the covers.
Use the new mock_base_state_event_processor fixture to process arbitrary events
and assert on emitted events or deltas.
…Handler

This allows better control over which states and events are part of a given app
and avoiding true global variables makes cleanup and testing much simpler.
remove null/default fields when serializing Event from the frontend and
StateUpdate from the backend.
Fix issue with background task delta calculation not starting from the root state
remove extra `event_context` ContextVar being passed around
EventProcessor.enqueue now returns a Future that tracks the completion of the
event (and can be used to cancel the event)

EventProcessor.enqueue_stream_delta overrides the default emit_delta
implementation and instead yields deltas directly to the caller as the event is
processing.
The function () => params.current baked inside the ensureSocketConnected
function was getting a stale reference and the early events (hydrate, on load,
client state) were missing the query parameters in their router_data and thus
on_load was not working correctly.
Store the state_full_name to substate mapping in RegistrationContext

Make it easier to register / re-register select states and event handlers in a
new RegistrationContext

Store StatefulComponent cached components in RegistrationContext for easier
resetting/dropping after compilation or for use in testing.
masenf added 9 commits April 1, 2026 00:32
Update all associated tests to make assertions using the browser/app and not
attempting to fetch the backend state directly.

This makes the tests more robust, reduces state_manager related hacks, and
makes the tests easier to eventually migrate to an external process using
granian where direct state_manager access will not be available.
instead of telling the frontend to reload, just hydrate and run on_load
internally before processing the user's requested event.
raise coverage bar back up to 72 at least
Add unit test cases for reflex.istate.manager.token
@linear
Copy link
Copy Markdown

linear bot commented Apr 1, 2026

@codspeed-hq
Copy link
Copy Markdown

codspeed-hq bot commented Apr 1, 2026

Merging this PR will not alter performance

✅ 8 untouched benchmarks


Comparing masenf/event-context-rb (43f97ab) with main (980a97d)

Open in CodSpeed

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 1, 2026

Greptile Summary

This is a large, well-structured architectural PR that moves event queuing and serialization from the frontend to the backend, reducing round-trip latency for chained events. The core ideas — ContextVar-based EventContext/RegistrationContext, the new EventProcessor/BaseStateEventProcessor classes, typed StateToken/BaseStateToken replacing raw strings, and the removal of the frontend event_processing gate — are all sound and internally consistent.

Key changes:

  • EventProcessor now owns the full event lifecycle via an asyncio.Queue; App.process(), _process_background(), and _background_tasks are gone.
  • Event.token removed; token lives in EventContext (contextvar), propagated via fork() for child events.
  • All three state managers (memory, disk, redis) updated to accept typed StateToken[T] instead of raw strings, eliminating the _split_substate_key/_substate_key helpers throughout.
  • Frontend state.js drops event_processing flag and the reload socket event; events can now be enqueued to the backend immediately without waiting for final: true.
  • AppHarness backend thread now explicitly copies the contextvar context so RegistrationContext is visible in the server thread.

Issues found:

  • P1enqueue_stream_delta in event_processor.py has a two-part bug in its inner _emit_delta_impl closure: cross-token deltas are emitted to the stream token instead of their real recipient, and all deltas (including those for other sessions) are unconditionally pushed into the local deltas queue and yielded back to the upload caller. This affects the upload streaming code path in _upload.py.
  • P2StateToken.deserialize does not enforce mutual exclusivity of data/fp; supplying both silently uses fp, and the error message for the neither-provided case is misleading.
  • P2base_state_processor._transform_event_arg carries over a copy-paste bug from old state.py: hinted_args is set or hinted_args is set and hinted_args is tuple or hinted_args is tuple are both tautologies (the or branch is dead code).
  • P2 — A # TODO comment in Event.from_event_type about real Python types in the backend queue should be tracked as a ticket and removed per project conventions.

Confidence Score: 4/5

Safe to merge after fixing the cross-token delta routing bug in enqueue_stream_delta; all other findings are style/correctness improvements.

One confirmed P1 logic bug: enqueue_stream_delta._emit_delta_impl sends cross-session deltas to the wrong token and leaks them into the upload stream. This directly affects the file-upload code path. The rest of the changes are architecturally sound, well-tested, and represent a clear improvement over the previous design. Score is 4 rather than 5 solely because of the streaming delta misrouting.

packages/reflex-core/src/reflex_core/_internal/event/processor/event_processor.py (lines 396–406) needs the _emit_delta_impl token fix before merging.

Important Files Changed

Filename Overview
packages/reflex-core/src/reflex_core/_internal/event/processor/event_processor.py New EventProcessor class owning the full event lifecycle via an asyncio queue; contains a logic bug in enqueue_stream_delta where cross-token deltas are misrouted and incorrectly yielded to the streaming caller.
packages/reflex-core/src/reflex_core/_internal/event/processor/base_state_processor.py New BaseStateEventProcessor handling state lock acquisition, rehydration, middleware, and background task dispatch; contains a pre-existing copy-paste error with duplicate type-coercion conditions for set/tuple.
packages/reflex-core/src/reflex_core/_internal/event/context.py New EventContext dataclass holding per-event metadata with a clean fork() mechanism for child event contexts.
packages/reflex-core/src/reflex_core/_internal/registry.py New RegistrationContext replacing implicit global lookups; well-structured with ensure_context() fallback.
reflex/istate/manager/token.py New StateToken/BaseStateToken typed generic tokens; deserialize has a validation gap.
reflex/app.py Significant simplification — process(), _process_background(), and _background_tasks removed in favour of EventProcessor.
packages/reflex-core/src/reflex_core/event.py Major refactor — Event.token removed, fix_events deprecated, get_fn_signature removed; TODO comment present.
reflex/state.py Large reduction — ~300 lines removed; class_subclasses replaced by RegistrationContext-backed get_substates().
packages/reflex-core/src/reflex_core/.templates/web/utils/state.js Frontend drops event_processing flag and reload event; ReflexEvent emits only non-empty fields.
packages/reflex-components-core/src/reflex_components_core/core/_upload.py Upload handlers delegate to event_processor; affected by the cross-token delta bug in enqueue_stream_delta.
reflex/istate/manager/init.py StateManager.state field removed; get_state_manager() now reads from EventContext.
reflex/testing.py AppHarness simplified; backend thread copies contextvar context for RegistrationContext visibility.

Reviews (1): Last reviewed commit: "Merge remote-tracking branch 'origin/mai..." | Re-trigger Greptile

Comment on lines +47 to +61
"""
if timeout is None:
return cls(drain_deadline=None)
return cls(drain_deadline=time.time() + timeout)

def __enter__(self) -> float:
"""Enter the context and yield the remaining time.

Returns:
The remaining time in seconds until the overall timeout is reached, or 0 if the timeout
has already been reached.
"""
if self.drain_deadline is not None:
return max(0, self.drain_deadline - time.time())
return 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 DrainTimeoutManager.__enter__ returns 0 for no-deadline case, blocking no drain

When drain_deadline is None (i.e., graceful_shutdown_timeout=None), __enter__ returns 0. The callers in stop() guard drain/stop operations with if remaining_time > 0, so a None timeout means nothing is awaited before force-cancelling tasks. This is the intended behaviour for immediate shutdown, but returning 0 rather than a sentinel value (like None) means the semantics are implicit and easy to misread.

This is a style/readability concern — the current behaviour is functionally correct.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

masenf and others added 16 commits April 1, 2026 09:48
Use the delta's token when emitting to the processor queue.

Return after emitting to the processor queue so the caller does not get deltas from unrelated tokens.

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
…/base_state_processor.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Remove TODO now that issue is created in repo.
* Vendor the new async iterator `asyncio.as_completed` for < 3.13
* Alternative Queue.shutdown mechanism for < 3.13
* Alternative to `Task.get_context` for < 3.12
* from __future__ import annotations for new modules (so TYPE_CHECKING imports work)
* bug with cls super() call on dataclass with slots=True
* typing_extensions.deprecated
Join the queue that we None'd out to make sure all the tasks have flushed.

Fix some import issues being reported on my branch.
If there is another event to process, chain to processEvent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant