Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
14d8d30
Add StateToken[TOKEN_TYPE] for flexible state manager
masenf Mar 8, 2026
c4b191f
Use StateToken with redis and memory managers
masenf Mar 8, 2026
cdacd22
disambiguate class name
masenf Mar 8, 2026
d463ef8
handle legacy tokens passed to app.modify_state
masenf Mar 17, 2026
b715c27
Add EventContext module
masenf Mar 17, 2026
57d0d01
Move BaseState event processing to reflex.ievent.processor package
masenf Mar 20, 2026
b8036bd
Overhaul EventProcessor lifecycle
masenf Mar 24, 2026
01befb5
Fix test_app to use BaseStateEventProcessor
masenf Mar 25, 2026
cfc706a
Avoid handling the same exception multiple times in EventProcessor
masenf Mar 25, 2026
8bbeab4
Attach cls to setter event handlers
masenf Mar 25, 2026
2562606
fix test_app, test_state and friends
masenf Mar 25, 2026
ea93b5b
ENG-9198: implement ContextVar-based registry for BaseState and Event…
masenf Mar 26, 2026
1cfd974
Remove `token` field from Event
masenf Mar 26, 2026
6be4f72
Clean up frontend Event and StateUpdate
masenf Mar 26, 2026
1e86d61
remove get_app dependency from get_state and background tasks
masenf Mar 26, 2026
a1bc4b5
Remove remaining get_app / mock_app dependency from tests
masenf Mar 26, 2026
fcb24f6
Merge remote-tracking branch 'origin/main' into masenf/event-context
masenf Mar 30, 2026
9256212
EventContext inherits from BaseContext
masenf Mar 26, 2026
0c04bc3
additional fixups
masenf Mar 30, 2026
5940c8d
apply changes to migrated files separately
masenf Mar 30, 2026
b13ad37
add missing import
masenf Mar 30, 2026
ada96c8
remove pyleak integration from base_state_processor
masenf Mar 30, 2026
a1eb180
EventProcessor.enqueue_stream_delta and task Future
masenf Mar 30, 2026
a073b0d
Adapt upload endpoint to new EventProcessor
masenf Mar 30, 2026
700dc74
Fix test_expiration.py and other new state tests
masenf Mar 30, 2026
aa15baa
Fix upload tests for new EventProcessor fixtures
masenf Mar 30, 2026
b709bee
add OPLOCK_ENABLED state_manager.close to tests
masenf Mar 31, 2026
9e706cc
state.js: pass around params as a ref
masenf Mar 31, 2026
53815fd
Merge remote-tracking branch 'origin/main' into masenf/event-context-rb
masenf Mar 31, 2026
fc311d3
registry: substate tracking and stateful component cache
masenf Apr 1, 2026
d9a996d
close old locks in disk/memory state manager
masenf Apr 1, 2026
55e0449
Remove state_manager from AppHarness
masenf Apr 1, 2026
d914ea0
Move reflex._internal to reflex_core._internal
masenf Apr 1, 2026
c50d2b2
move reflex.ievent to reflex_core._internal.event
masenf Apr 1, 2026
9650b0f
replace "reload" functionality with internal rehydration
masenf Apr 1, 2026
465e6d0
incldue coverage from subpackages
masenf Apr 1, 2026
06270b3
remove simulated pre-hydrated states
masenf Apr 1, 2026
5ed431f
Add unit test cases for new registry/context/processor modules
masenf Apr 1, 2026
3980c49
Merge remote-tracking branch 'origin/main' into masenf/event-context-rb
masenf Apr 1, 2026
7bdc7df
Use correct token in enqueue_stream_delta
masenf Apr 1, 2026
c046613
Fix StateToken.deserialize implementation
masenf Apr 1, 2026
945662b
Update packages/reflex-core/src/reflex_core/_internal/event/processor…
masenf Apr 1, 2026
28b105f
Fix StateToken mismerge (Thanks greptile)
masenf Apr 1, 2026
5021453
move EventChain import to avoid circular dep
masenf Apr 1, 2026
d54e178
fix StateToken deserialize tests
masenf Apr 1, 2026
aece85b
Python 3.11 and 3.12 compatibility
masenf Apr 1, 2026
458d99e
Merge remote-tracking branch 'origin/main' into masenf/event-context-rb
masenf Apr 1, 2026
0fff344
py3.10 Self compat
masenf Apr 1, 2026
8729b59
py3.10: typing_extensions Self
masenf Apr 1, 2026
b1ad918
ugh more py3.10 Self compat
masenf Apr 1, 2026
ed82a6c
fix reflex_core -> reflex import
masenf Apr 1, 2026
0f05b66
Handle py3.11 compatible queue shutdown better
masenf Apr 1, 2026
75e160a
state.js: pump the queue in processEvent
masenf Apr 1, 2026
f60c4bd
AppHarness: pre-register SharedState so it's available in the base Re…
masenf Apr 1, 2026
43f97ab
BaseStateEventProcessor: emit deltas before enqueuing events
masenf Apr 1, 2026
8987a7e
set RegistrationContext in ASGI middleware
masenf Apr 1, 2026
5d609e4
EventFuture: tracks execution of chained events
masenf Apr 2, 2026
be1a18d
Add BaseState to reflex_core.event namespace for docgen
masenf Apr 2, 2026
7f3271e
EventProcessor.enqueue only accepts a single Event
masenf Apr 2, 2026
b3c8178
attach the registration_context_middleware in App.__call__
masenf Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from typing import TYPE_CHECKING, Any, BinaryIO, cast

from python_multipart.multipart import MultipartParser, parse_options_header
from reflex_core import constants
from reflex_core.utils import exceptions
from reflex_core.utils.format import json_dumps
from starlette.datastructures import Headers
from starlette.datastructures import UploadFile as StarletteUploadFile
from starlette.exceptions import HTTPException
Expand All @@ -22,11 +22,9 @@
from typing_extensions import Self

if TYPE_CHECKING:
from reflex_core.event import EventHandler
from reflex_core.utils.types import Receive, Scope, Send

from reflex.app import App
from reflex.state import BaseState


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -102,7 +100,7 @@ def __init__(self, *, maxsize: int = 8):
self._condition = asyncio.Condition()
self._closed = False
self._error: Exception | None = None
self._consumer_task: asyncio.Task[Any] | None = None
self._consumer_task: asyncio.Future[Any] | None = None

def __aiter__(self) -> Self:
"""Return the iterator itself.
Expand Down Expand Up @@ -135,7 +133,7 @@ async def __anext__(self) -> UploadChunk:
raise self._error
raise StopAsyncIteration

def set_consumer_task(self, task: asyncio.Task[Any]) -> None:
def set_consumer_task(self, task: asyncio.Future[Any]) -> None:
"""Track the task consuming this iterator.

Args:
Expand Down Expand Up @@ -206,7 +204,7 @@ def _raise_if_consumer_finished(self) -> None:
raise RuntimeError(msg) from task_exc
raise RuntimeError(msg)

def _wake_waiters(self, task: asyncio.Task[Any]) -> None:
def _wake_waiters(self, task: asyncio.Future[Any]) -> None:
"""Wake any producers or consumers blocked on the iterator condition.

Args:
Expand Down Expand Up @@ -446,51 +444,6 @@ def _require_upload_headers(request: Request) -> tuple[str, str]:
return token, handler


async def _get_upload_runtime_handler(
app: App,
token: str,
handler_name: str,
) -> tuple[BaseState, EventHandler]:
"""Resolve the runtime state and event handler for an upload request.

Args:
app: The Reflex app.
token: The client token.
handler_name: The fully qualified event handler name.

Returns:
The root state instance and resolved event handler.
"""
from reflex.state import _substate_key

substate_token = _substate_key(token, handler_name.rpartition(".")[0])
state = await app.state_manager.get_state(substate_token)
_current_state, event_handler = state._get_event_handler(handler_name)
return state, event_handler


def _seed_upload_router_data(state: BaseState, token: str) -> None:
"""Ensure upload-launched handlers have the client token in router state.

Background upload handlers use ``StateProxy`` which derives its mutable-state
token from ``self.router.session.client_token``. Upload requests do not flow
through the normal websocket event pipeline, so we seed the token here.

Args:
state: The root state instance.
token: The client token from the upload request.
"""
from reflex.state import RouterData

router_data = dict(state.router_data)
if router_data.get(constants.RouteVar.CLIENT_TOKEN) == token:
return

router_data[constants.RouteVar.CLIENT_TOKEN] = token
state.router_data = router_data
state.router = RouterData.from_router_data(router_data)


async def _upload_buffered_file(
request: Request,
app: App,
Expand All @@ -507,6 +460,8 @@ async def _upload_buffered_file(
from reflex_core.event import Event
from reflex_core.utils.exceptions import UploadValueError

from reflex.state import StateUpdate

try:
form_data = await request.form()
except ClientDisconnect:
Expand Down Expand Up @@ -545,7 +500,6 @@ def _create_upload_event() -> Event:
)

return Event(
token=token,
name=handler_name,
payload={handler_upload_param[0]: file_uploads},
)
Expand All @@ -567,12 +521,9 @@ async def _ndjson_updates():
Yields:
Each state update as newline-delimited JSON.
"""
async with app.state_manager.modify_state_with_links(
event.substate_token, event=event
) as state:
async for update in state._process(event):
update = await app._postprocess(state, event, update)
yield update.json() + "\n"
# Enqueue the task on the main event loop, but emit deltas to the local queue.
async for delta in app.event_processor.enqueue_stream_delta(token, event):
yield json_dumps(StateUpdate(delta=delta)) + "\n"

return _UploadStreamingResponse(
_ndjson_updates(),
Expand All @@ -583,10 +534,9 @@ async def _ndjson_updates():

def _background_upload_accepted_response() -> StreamingResponse:
"""Return a minimal ndjson response for background upload dispatch."""
from reflex.state import StateUpdate

def _accepted_updates():
yield StateUpdate(final=True).json() + "\n"
yield "{}\n"

return StreamingResponse(
_accepted_updates(),
Expand All @@ -613,23 +563,12 @@ async def _upload_chunk_file(

chunk_iter = UploadChunkIterator(maxsize=8)
event = Event(
token=token,
name=handler_name,
payload={handler_upload_param[0]: chunk_iter},
)
task_future = await app.event_processor.enqueue(token, event)

async with app.state_manager.modify_state_with_links(
event.substate_token,
event=event,
) as state:
_seed_upload_router_data(state, token)
task = app._process_background(state, event)

if task is None:
msg = f"@rx.event(background=True) is required for upload_files_chunk handler `{handler_name}`."
return JSONResponse({"detail": msg}, status_code=400)

chunk_iter.set_consumer_task(task)
chunk_iter.set_consumer_task(task_future)

parser = _UploadChunkMultipartParser(
headers=request.headers,
Expand All @@ -640,9 +579,9 @@ async def _upload_chunk_file(
try:
await parser.parse()
except ClientDisconnect:
task.cancel()
task_future.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
await task_future
return Response()
except (MultiPartException, RuntimeError, ValueError) as err:
await chunk_iter.fail(err)
Expand Down Expand Up @@ -682,15 +621,17 @@ async def upload_file(request: Request):
UploadTypeError: If a non-streaming upload is wired to a background task.
HTTPException: when the request does not include token / handler headers.
"""
from reflex_core._internal.registry import RegistrationContext
from reflex_core.event import (
resolve_upload_chunk_handler_param,
resolve_upload_handler_param,
)

token, handler_name = _require_upload_headers(request)
_state, event_handler = await _get_upload_runtime_handler(
app, token, handler_name
)
registered_event_handler = RegistrationContext.get().event_handlers[
handler_name
]
event_handler = registered_event_handler.handler

if event_handler.is_background:
try:
Expand Down
Loading
Loading