Skip to content

Commit 69b665f

Browse files
committed
queue backpressure
1 parent 8b0c95d commit 69b665f

2 files changed

Lines changed: 46 additions & 3 deletions

File tree

src/reactpy/core/layout.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ async def __aexit__(
9999
await t
100100

101101
await self._unmount_model_states([root_model_state])
102+
await self._rendering_queue.close()
102103

103104
# delete attributes here to avoid access after exiting context manager
104105
del self._event_handlers
@@ -773,17 +774,39 @@ def __init__(self) -> None:
773774
self._loop = get_running_loop()
774775
self._queue: Queue[_Type] = Queue(REACTPY_MAX_QUEUE_SIZE.current)
775776
self._pending: set[_Type] = set()
777+
self._put_tasks: dict[_Type, Task[None]] = {}
776778

777779
def put(self, value: _Type) -> None:
778780
if value not in self._pending:
779781
self._pending.add(value)
780-
self._loop.call_soon_threadsafe(self._queue.put_nowait, value)
782+
self._loop.call_soon_threadsafe(self._schedule_put, value)
783+
784+
def _schedule_put(self, value: _Type) -> None:
785+
self._put_tasks[value] = create_task(self._put_with_backpressure(value))
786+
787+
async def _put_with_backpressure(self, value: _Type) -> None:
788+
try:
789+
await self._queue.put(value)
790+
except BaseException:
791+
self._pending.discard(value)
792+
raise
793+
finally:
794+
self._put_tasks.pop(value, None)
781795

782796
async def get(self) -> _Type:
783797
value = await self._queue.get()
784798
self._pending.remove(value)
785799
return value
786800

801+
async def close(self) -> None:
802+
for task in list(self._put_tasks.values()):
803+
task.cancel()
804+
for task in list(self._put_tasks.values()):
805+
with suppress(CancelledError):
806+
await task
807+
self._put_tasks.clear()
808+
self._pending.clear()
809+
787810

788811
def _get_children_info(
789812
children: list[VdomChild],

tests/test_core/test_layout.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111

1212
import reactpy
1313
from reactpy import html
14-
from reactpy.config import REACTPY_ASYNC_RENDERING, REACTPY_DEBUG
14+
from reactpy.config import (
15+
REACTPY_ASYNC_RENDERING,
16+
REACTPY_DEBUG,
17+
REACTPY_MAX_QUEUE_SIZE,
18+
)
1519
from reactpy.core.component import component
1620
from reactpy.core.events import EventHandler
1721
from reactpy.core.hooks import use_async_effect, use_effect, use_state
18-
from reactpy.core.layout import Layout
22+
from reactpy.core.layout import Layout, _ThreadSafeQueue
1923
from reactpy.testing import (
2024
HookCatcher,
2125
StaticEventHandler,
@@ -102,6 +106,22 @@ def SimpleComponent():
102106
)
103107

104108

109+
async def test_thread_safe_queue_applies_backpressure():
110+
with patch.object(REACTPY_MAX_QUEUE_SIZE, "current", 1):
111+
queue = _ThreadSafeQueue[int]()
112+
113+
queue.put(1)
114+
queue.put(2)
115+
116+
await asyncio.sleep(0)
117+
assert await asyncio.wait_for(queue.get(), 1) == 1
118+
119+
await asyncio.sleep(0)
120+
assert await asyncio.wait_for(queue.get(), 1) == 2
121+
122+
await queue.close()
123+
124+
105125
async def test_nested_component_layout():
106126
parent_set_state = reactpy.Ref(None)
107127
child_set_state = reactpy.Ref(None)

0 commit comments

Comments
 (0)