Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions src/qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@

from ._common import with_logger # noqa

# strong references to running background tasks
background_tasks = set()


@with_logger
class _QThreadWorker(QtCore.QThread):
Expand Down Expand Up @@ -437,6 +440,8 @@ def run_until_complete(self, future):
raise RuntimeError("Event loop already running")

self.__log_debug("Running %s until complete", future)

# future may actually be a coroutine. This ensures it is wrapped in a Task.
future = asyncio.ensure_future(future, loop=self)

def stop(*args):
Expand Down Expand Up @@ -855,23 +860,27 @@ def asyncClose(fn):

@functools.wraps(fn)
def wrapper(*args, **kwargs):
f = asyncio.ensure_future(fn(*args, **kwargs))
while not f.done():
QApplication.instance().processEvents()
loop = asyncio.get_running_loop()
assert isinstance(loop, QEventLoop)
task = loop.create_task(fn(*args, **kwargs))
while not task.done():
QApplication.processEvents(AllEvents)
try:
return task.result()
except asyncio.CancelledError:
pass

return wrapper


def asyncSlot(*args, **kwargs):
"""Make a Qt async slot run on asyncio loop."""

def _error_handler(task):
async def _error_handler(fn, args, kwargs):
try:
task.result()
await fn(*args, **kwargs)
except Exception:
sys.excepthook(*sys.exc_info())
except asyncio.CancelledError:
pass

def outer_decorator(fn):
@Slot(*args, **kwargs)
Expand All @@ -894,9 +903,10 @@ def wrapper(*args, **kwargs):
"asyncSlot was not callable from Signal. Potential signature mismatch."
)
else:
task = asyncio.ensure_future(fn(*args, **kwargs))
task.add_done_callback(_error_handler)
return task
task = asyncio.create_task(_error_handler(fn, args, kwargs))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
return

return wrapper

Expand Down
120 changes: 114 additions & 6 deletions tests/test_qeventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from unittest import mock

import pytest

Expand Down Expand Up @@ -595,8 +596,8 @@

loop._add_reader(c_sock.fileno(), cb1)

_client_task = asyncio.ensure_future(client_coro())
_server_task = asyncio.ensure_future(server_coro())
_client_task = loop.create_task(client_coro())
_server_task = loop.create_task(server_coro())

both_done = asyncio.gather(client_done, server_done)
loop.run_until_complete(asyncio.wait_for(both_done, timeout=1.0))
Expand Down Expand Up @@ -640,8 +641,8 @@
loop._remove_reader(c_sock.fileno())
assert (await loop.sock_recv(c_sock, 3)) == b"foo"

client_done = asyncio.ensure_future(client_coro())
server_done = asyncio.ensure_future(server_coro())
client_done = loop.create_task(client_coro())
server_done = loop.create_task(server_coro())

both_done = asyncio.wait(
[server_done, client_done], return_when=asyncio.FIRST_EXCEPTION
Expand Down Expand Up @@ -758,7 +759,7 @@
handler_called = True

loop.set_exception_handler(exct_handler)
asyncio.ensure_future(future_except())
loop.create_task(future_except())
loop.run_forever()

assert coro_run
Expand All @@ -775,7 +776,11 @@
loop.set_exception_handler(exct_handler)
fut1 = asyncio.Future()
fut1.set_exception(ExceptionTester())
asyncio.ensure_future(fut1)

async def coro(future):
await future

loop.create_task(coro(fut1))
del fut1
loop.call_later(0.1, loop.stop)
loop.run_forever()
Expand All @@ -798,6 +803,8 @@
no_args_called = asyncio.Event()
with_args_called = asyncio.Event()
trailing_args_called = asyncio.Event()
error_called = asyncio.Event()
cancel_called = asyncio.Event()

async def slot_no_args():
no_args_called.set()
Expand All @@ -812,6 +819,14 @@

async def slot_signature_mismatch(_: bool): ...

async def slot_with_error():
error_called.set()
raise ValueError("Test")

async def slot_with_cancel():
cancel_called.set()
raise asyncio.CancelledError()

async def main():
# passing kwargs to the underlying Slot such as name, arguments, return
sig = qasync._make_signaller(qasync.QtCore)
Expand All @@ -836,6 +851,73 @@
)
await asyncio.wait_for(all_done, timeout=1.0)

with mock.patch.object(sys, "excepthook") as excepthook:
sig3 = qasync._make_signaller(qasync.QtCore)
sig3.signal.connect(qasync.asyncSlot()(slot_with_error))
sig3.signal.emit()
await asyncio.wait_for(error_called.wait(), timeout=1.0)
excepthook.assert_called_once()
assert isinstance(excepthook.call_args[0][1], ValueError)

with mock.patch.object(sys, "excepthook") as excepthook:
sig4 = qasync._make_signaller(qasync.QtCore)
sig4.signal.connect(qasync.asyncSlot()(slot_with_cancel))
sig4.signal.emit()
await asyncio.wait_for(cancel_called.wait(), timeout=1.0)
excepthook.assert_not_called()

loop.run_until_complete(main())


def test_async_close(loop, application):
close_called = asyncio.Event()
close_err_called = asyncio.Event()
close_hang_called = asyncio.Event()

@qasync.asyncClose
async def close():
close_called.set()
return 33

@qasync.asyncClose
async def close_err():
close_err_called.set()
raise ValueError("Test")

@qasync.asyncClose
async def close_hang():
# do an actual cancel instead of directly raising, for completeness.
current = asyncio.current_task()
assert current is not None

async def killer():
await asyncio.sleep(0.001)
current.cancel()

asyncio.create_task(killer())
close_hang_called.set()
await asyncio.Event().wait()
assert False, "Should have been cancelled"

Check warning on line 900 in tests/test_qeventloop.py

View workflow job for this annotation

GitHub Actions / collect coverage

Missing coverage

Missing coverage on line 900

# need to run in async context to have a running event loop
async def main():
# close() is a synchronous top level call, need
# to wrap it to be able to enter event loop

# test that a regular close works
assert await qasync.asyncWrap(close) == 33
assert close_called.is_set()

# test that an exception in the async close is propagated
with pytest.raises(ValueError) as err:
await qasync.asyncWrap(close_err)
assert err.value.args[0] == "Test"
assert close_err_called.is_set()

# test that a CancelledError is not propagated
assert await qasync.asyncWrap(close_hang) is None
assert close_hang_called.is_set()

loop.run_until_complete(main())


Expand Down Expand Up @@ -915,6 +997,14 @@
assert loop.run_until_complete(asyncio.wait_for(coro(), timeout=1)) == 42


def test_run_until_complete_future(loop):
"""Test that run_until_complete accepts futures"""

fut = asyncio.Future()
loop.call_soon(lambda: fut.set_result(42))
assert loop.run_until_complete(fut) == 42


def test_run_forever_custom_exit_code(loop, application):
if hasattr(application, "exec"):
orig_exec = application.exec
Expand All @@ -932,6 +1022,24 @@
application.exec_ = orig_exec


def test_loop_non_reentrant(loop):
async def noop():
pass

async def task():
t = loop.create_task(noop())
with pytest.raises(RuntimeError):
loop.run_forever()

with pytest.raises(RuntimeError):
loop.run_until_complete(t)
return 43

t = loop.create_task(task())
loop.run_until_complete(t)
assert t.result() == 43


@pytest.mark.parametrize("qtparent", [False, True])
def test_qeventloop_in_qthread(qtparent):
class CoroutineExecutorThread(qasync.QtCore.QThread):
Expand Down
Loading