-
Notifications
You must be signed in to change notification settings - Fork 101
Python 3.14 compatibility + EZSP-over-TCP stability fixes #720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
2374548
3f34b5c
7cf20c0
3ad85ef
f43c38f
384975d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| import functools | ||
| import inspect | ||
| import logging | ||
|
|
||
| LOGGER = logging.getLogger(__name__) | ||
|
|
@@ -14,7 +15,7 @@ def __init__(self): | |
| self.thread_complete = None | ||
|
|
||
| def run_coroutine_threadsafe(self, coroutine): | ||
| current_loop = asyncio.get_event_loop() | ||
| current_loop = asyncio.get_running_loop() | ||
| future = asyncio.run_coroutine_threadsafe(coroutine, self.loop) | ||
| return asyncio.wrap_future(future, loop=current_loop) | ||
|
|
||
|
|
@@ -30,7 +31,7 @@ def _thread_main(self, init_task): | |
| self.loop = None | ||
|
|
||
| async def start(self): | ||
| current_loop = asyncio.get_event_loop() | ||
| current_loop = asyncio.get_running_loop() | ||
| if self.loop is not None and not self.loop.is_closed(): | ||
| return | ||
|
|
||
|
|
@@ -95,11 +96,21 @@ def func_wrapper(*args, **kwargs): | |
| if loop == curr_loop: | ||
| return call() | ||
| if loop.is_closed(): | ||
| # Disconnected | ||
| LOGGER.warning("Attempted to use a closed event loop") | ||
| return | ||
| if asyncio.iscoroutinefunction(func): | ||
| future = asyncio.run_coroutine_threadsafe(call(), loop) | ||
| raise ConnectionError( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] The two clauses double up — "closed event loop, the connection may have been lost". Either |
||
| "Attempted to use a closed event loop, " | ||
| "the connection may have been lost" | ||
| ) | ||
| if inspect.iscoroutinefunction(func): | ||
| coro = call() | ||
| try: | ||
| future = asyncio.run_coroutine_threadsafe(coro, loop) | ||
| except RuntimeError: | ||
| # Loop closed between is_closed() check and dispatch | ||
| coro.close() | ||
| raise ConnectionError( | ||
| "Attempted to use a closed event loop, " | ||
| "the connection may have been lost" | ||
| ) | ||
| return asyncio.wrap_future(future, loop=curr_loop) | ||
| else: | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,12 +14,16 @@ | |
|
|
||
|
|
||
| class Gateway(zigpy.serial.SerialProtocol): | ||
| def __init__(self, api, connection_done_future=None): | ||
| def __init__(self, api, connection_done_future=None, loop=None): | ||
| super().__init__() | ||
| self._api = api | ||
|
|
||
| self._reset_future = None | ||
| self._startup_reset_future = None | ||
| # Pre-create so reset frames arriving immediately after connect are | ||
| # captured by reset_received() instead of triggering enter_failed_state(). | ||
| # Tests construct Gateway without a loop and expect None here; in that | ||
| # case wait_for_startup_reset() will lazily create the future. | ||
| self._startup_reset_future = loop.create_future() if loop is not None else None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [must-address]
Both go away if the pre-creation is gated to socket ports at the call site in loop = asyncio.get_running_loop()
device_path = config[zigpy.config.CONF_DEVICE_PATH]
is_tcp = urllib.parse.urlparse(device_path).scheme == "socket"
...
gateway = Gateway(api, connection_done_future, loop=loop if is_tcp else None)The lazy creation at L60 then continues to cover the non-TCP path (currently unreachable, but stays correct). |
||
| self._connection_done_future = connection_done_future | ||
|
|
||
| async def send_data(self, data: bytes) -> None: | ||
|
|
@@ -52,8 +56,8 @@ def error_received(self, code: t.NcpResetCode) -> None: | |
|
|
||
| async def wait_for_startup_reset(self) -> None: | ||
| """Wait for the first reset frame on startup.""" | ||
| assert self._startup_reset_future is None | ||
| self._startup_reset_future = asyncio.get_running_loop().create_future() | ||
| if self._startup_reset_future is None: | ||
| self._startup_reset_future = asyncio.get_running_loop().create_future() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [optional / consider] Once |
||
|
|
||
| try: | ||
| await self._startup_reset_future | ||
|
|
@@ -98,19 +102,19 @@ async def reset(self): | |
| return await self._reset_future | ||
|
|
||
| self._transport.send_reset() | ||
| self._reset_future = asyncio.get_event_loop().create_future() | ||
| self._reset_future = asyncio.get_running_loop().create_future() | ||
| self._reset_future.add_done_callback(self._reset_cleanup) | ||
|
|
||
| async with asyncio_timeout(RESET_TIMEOUT): | ||
| return await self._reset_future | ||
|
|
||
|
|
||
| async def _connect(config, api): | ||
| loop = asyncio.get_event_loop() | ||
| loop = asyncio.get_running_loop() | ||
|
|
||
| connection_done_future = loop.create_future() | ||
|
|
||
| gateway = Gateway(api, connection_done_future) | ||
| gateway = Gateway(api, connection_done_future, loop=loop) | ||
| protocol = AshProtocol(gateway) | ||
|
|
||
| if config[zigpy.config.CONF_DEVICE_FLOW_CONTROL] is None: | ||
|
|
@@ -135,7 +139,7 @@ async def _connect(config, api): | |
|
|
||
| async def connect(config, api, use_thread=True): | ||
| if use_thread: | ||
| api = ThreadsafeProxy(api, asyncio.get_event_loop()) | ||
| api = ThreadsafeProxy(api, asyncio.get_running_loop()) | ||
| thread = EventLoopThread() | ||
| await thread.start() | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional / consider] This suppresses all
ConnectionError, not just the closed-loop case the newThreadsafeProxyraises. Today that's the only source, but if a future serial backend (serialx surfacing a low-level IO error during a final flush, for instance) ever raisesConnectionErrorhere, the gateway reference would be dropped silently. Probably the desired semantics for disconnect-on-cleanup — but if you want to narrow it later, a marker subclass raised by the proxy would do the trick:Not blocking.