Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ known-first-party = ["httpx2", "httpcore2"]
ignore_missing_imports = true
strict = true

[[tool.mypy.overrides]]
module = "tests.httpx2.*"
disallow_untyped_defs = false
check_untyped_defs = true

[tool.pytest.ini_options]
addopts = "-rxXs --import-mode=importlib"
testpaths = ["tests"]
Expand Down
69 changes: 37 additions & 32 deletions tests/httpx2/client/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

import httpx2

if typing.TYPE_CHECKING:
from conftest import TestServer


@pytest.mark.anyio
async def test_get(server):
async def test_get(server: TestServer) -> None:
url = server.url
async with httpx2.AsyncClient(http2=True) as client:
response = await client.get(url)
Expand All @@ -30,14 +33,14 @@ async def test_get(server):
],
)
@pytest.mark.anyio
async def test_get_invalid_url(server, url):
async def test_get_invalid_url(server: TestServer, url: str) -> None:
async with httpx2.AsyncClient() as client:
with pytest.raises((httpx2.UnsupportedProtocol, httpx2.LocalProtocolError)):
await client.get(url)


@pytest.mark.anyio
async def test_build_request(server):
async def test_build_request(server: TestServer) -> None:
url = server.url.copy_with(path="/echo_headers")
headers = {"Custom-header": "value"}
async with httpx2.AsyncClient() as client:
Expand All @@ -52,23 +55,23 @@ async def test_build_request(server):


@pytest.mark.anyio
async def test_post(server):
async def test_post(server: TestServer) -> None:
url = server.url
async with httpx2.AsyncClient() as client:
response = await client.post(url, content=b"Hello, world!")
assert response.status_code == 200


@pytest.mark.anyio
async def test_post_json(server):
async def test_post_json(server: TestServer) -> None:
url = server.url
async with httpx2.AsyncClient() as client:
response = await client.post(url, json={"text": "Hello, world!"})
assert response.status_code == 200


@pytest.mark.anyio
async def test_stream_response(server):
async def test_stream_response(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
async with client.stream("GET", server.url) as response:
body = await response.aread()
Expand All @@ -79,7 +82,7 @@ async def test_stream_response(server):


@pytest.mark.anyio
async def test_access_content_stream_response(server):
async def test_access_content_stream_response(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
async with client.stream("GET", server.url) as response:
pass
Expand All @@ -90,7 +93,7 @@ async def test_access_content_stream_response(server):


@pytest.mark.anyio
async def test_stream_request(server):
async def test_stream_request(server: TestServer) -> None:
async def hello_world() -> typing.AsyncIterator[bytes]:
yield b"Hello, "
yield b"world!"
Expand All @@ -101,7 +104,7 @@ async def hello_world() -> typing.AsyncIterator[bytes]:


@pytest.mark.anyio
async def test_cannot_stream_sync_request(server):
async def test_cannot_stream_sync_request(server: TestServer) -> None:
def hello_world() -> typing.Iterator[bytes]: # pragma: no cover
yield b"Hello, "
yield b"world!"
Expand All @@ -112,7 +115,7 @@ def hello_world() -> typing.Iterator[bytes]: # pragma: no cover


@pytest.mark.anyio
async def test_raise_for_status(server):
async def test_raise_for_status(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
for status_code in (200, 400, 404, 500, 505):
response = await client.request("GET", server.url.copy_with(path=f"/status/{status_code}"))
Expand All @@ -126,45 +129,45 @@ async def test_raise_for_status(server):


@pytest.mark.anyio
async def test_options(server):
async def test_options(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
response = await client.options(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"


@pytest.mark.anyio
async def test_head(server):
async def test_head(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
response = await client.head(server.url)
assert response.status_code == 200
assert response.text == ""


@pytest.mark.anyio
async def test_put(server):
async def test_put(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
response = await client.put(server.url, content=b"Hello, world!")
assert response.status_code == 200


@pytest.mark.anyio
async def test_patch(server):
async def test_patch(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
response = await client.patch(server.url, content=b"Hello, world!")
assert response.status_code == 200


@pytest.mark.anyio
async def test_delete(server):
async def test_delete(server: TestServer) -> None:
async with httpx2.AsyncClient() as client:
response = await client.delete(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"


@pytest.mark.anyio
async def test_100_continue(server):
async def test_100_continue(server: TestServer) -> None:
headers = {"Expect": "100-continue"}
content = b"Echo request body"

Expand All @@ -176,23 +179,24 @@ async def test_100_continue(server):


@pytest.mark.anyio
async def test_context_managed_transport():
async def test_context_managed_transport() -> None:
class Transport(httpx2.AsyncBaseTransport):
def __init__(self) -> None:
self.events: list[str] = []

async def aclose(self):
async def aclose(self) -> None:
# The base implementation of httpx2.AsyncBaseTransport just
# calls into `.aclose`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__aenter__`/`__aexit__`.
self.events.append("transport.aclose")

async def __aenter__(self):
async def __aenter__(self) -> Transport:
await super().__aenter__()
self.events.append("transport.__aenter__")
return self

async def __aexit__(self, *args):
async def __aexit__(self, *args: typing.Any) -> None:
await super().__aexit__(*args)
self.events.append("transport.__aexit__")

Expand All @@ -208,24 +212,25 @@ async def __aexit__(self, *args):


@pytest.mark.anyio
async def test_context_managed_transport_and_mount():
async def test_context_managed_transport_and_mount() -> None:
class Transport(httpx2.AsyncBaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
self.events: list[str] = []

async def aclose(self):
async def aclose(self) -> None:
# The base implementation of httpx2.AsyncBaseTransport just
# calls into `.aclose`, so simple transport cases can just override
# this method for any cleanup, where more complex cases
# might want to additionally override `__aenter__`/`__aexit__`.
self.events.append(f"{self.name}.aclose")

async def __aenter__(self):
async def __aenter__(self) -> Transport:
await super().__aenter__()
self.events.append(f"{self.name}.__aenter__")
return self

async def __aexit__(self, *args):
async def __aexit__(self, *args: typing.Any) -> None:
await super().__aexit__(*args)
self.events.append(f"{self.name}.__aexit__")

Expand All @@ -246,12 +251,12 @@ async def __aexit__(self, *args):
]


def hello_world(request):
def hello_world(request: httpx2.Request) -> httpx2.Response:
return httpx2.Response(200, text="Hello, world!")


@pytest.mark.anyio
async def test_client_closed_state_using_implicit_open():
async def test_client_closed_state_using_implicit_open() -> None:
client = httpx2.AsyncClient(transport=httpx2.MockTransport(hello_world))

assert not client.is_closed
Expand All @@ -272,7 +277,7 @@ async def test_client_closed_state_using_implicit_open():


@pytest.mark.anyio
async def test_client_closed_state_using_with_block():
async def test_client_closed_state_using_with_block() -> None:
transport = httpx2.MockTransport(hello_world)
async with httpx2.AsyncClient(transport=transport) as client:
assert not client.is_closed
Expand All @@ -294,7 +299,7 @@ def mounted(request: httpx2.Request) -> httpx2.Response:


@pytest.mark.anyio
async def test_mounted_transport():
async def test_mounted_transport() -> None:
transport = httpx2.MockTransport(unmounted)
mounts = {"custom://": httpx2.MockTransport(mounted)}

Expand All @@ -309,7 +314,7 @@ async def test_mounted_transport():


@pytest.mark.anyio
async def test_async_mock_transport():
async def test_async_mock_transport() -> None:
async def hello_world(request: httpx2.Request) -> httpx2.Response:
return httpx2.Response(200, text="Hello, world!")

Expand All @@ -322,7 +327,7 @@ async def hello_world(request: httpx2.Request) -> httpx2.Response:


@pytest.mark.anyio
async def test_cancellation_during_stream():
async def test_cancellation_during_stream() -> None:
"""
If any BaseException is raised during streaming the response, then the
stream should be closed.
Expand All @@ -338,7 +343,7 @@ async def test_cancellation_during_stream():
"""
stream_was_closed = False

def response_with_cancel_during_stream(request):
def response_with_cancel_during_stream(request: httpx2.Request) -> httpx2.Response:
class CancelledStream(httpx2.AsyncByteStream):
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
yield b"Hello"
Expand All @@ -360,7 +365,7 @@ async def aclose(self) -> None:


@pytest.mark.anyio
async def test_server_extensions(server):
async def test_server_extensions(server: TestServer) -> None:
url = server.url
async with httpx2.AsyncClient(http2=True) as client:
response = await client.get(url)
Expand Down
2 changes: 1 addition & 1 deletion tests/httpx2/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ async def handle_async_request(self, request: httpx2.Request) -> httpx2.Response


@pytest.mark.anyio
async def test_digest_auth_unavailable_streaming_body():
async def test_digest_auth_unavailable_streaming_body() -> None:
url = "https://example.org/"
auth = httpx2.DigestAuth(username="user", password="password123")
app = DigestApp()
Expand Down
Loading
Loading