Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/advanced/authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ HTTP digest authentication is a challenge-response authentication scheme. Unlike
[<Response [401 UNAUTHORIZED]>]
```

HTTPX also supports digest authentication using `auth-int` quality-of-protection, which provides message integrity protection. When `qop="auth-int"` is used, the authentication response includes a hash of the request body, protecting against tampering with the message content during transmission. This provides additional security over `qop="auth"` by ensuring that both the authentication credentials and the message body integrity are verified.

```pycon
>>> auth = httpx.DigestAuth(username="olivia", password="secret")
>>> client = httpx.Client(auth=auth)
>>> response = client.get("https://httpbin.org/digest-auth/auth-int/olivia/secret")
>>> response
<Response [200 OK]>
>>> response.history
[<Response [401 UNAUTHORIZED]>]
```

## NetRC authentication

HTTPX can be configured to use [a `.netrc` config file](https://everything.curl.dev/usingcurl/netrc) for authentication.
Expand Down Expand Up @@ -229,4 +241,4 @@ class MyCustomAuth(httpx.Auth):

async def async_auth_flow(self, request):
raise RuntimeError("Cannot use a sync authentication class with httpx.AsyncClient")
```
```
12 changes: 7 additions & 5 deletions httpx/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ def digest(data: bytes) -> bytes:

path = request.url.raw_path
A2 = b":".join((request.method.encode(), path))
# TODO: implement auth-int
if challenge.qop == b"auth-int":
A2 += b":" + digest(request.content)
HA2 = digest(A2)

nc_value = b"%08x" % self._nonce_count
Expand Down Expand Up @@ -294,7 +295,7 @@ def digest(data: bytes) -> bytes:
if challenge.opaque:
format_args["opaque"] = challenge.opaque
if qop:
format_args["qop"] = b"auth"
format_args["qop"] = qop
format_args["nc"] = nc_value
format_args["cnonce"] = cnonce

Expand Down Expand Up @@ -330,12 +331,13 @@ def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
if qop is None:
return None
qops = re.split(b", ?", qop)

# Defer to the strongest supplied qop (auth-int > auth)
if b"auth-int" in qops:
return b"auth-int"
if b"auth" in qops:
return b"auth"

if qops == [b"auth-int"]:
raise NotImplementedError("Digest auth-int support is not yet implemented")

message = f'Unexpected qop value "{qop!r}" in digest auth'
raise ProtocolError(message, request=request)

Expand Down
43 changes: 39 additions & 4 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,50 @@ async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str)
assert len(response.history) == 1


@pytest.mark.parametrize(
"algorithm,expected_hash_length,expected_response_length",
[
("MD5", 64, 32),
("MD5-SESS", 64, 32),
("SHA", 64, 40),
("SHA-SESS", 64, 40),
("SHA-256", 64, 64),
("SHA-256-SESS", 64, 64),
("SHA-512", 64, 128),
("SHA-512-SESS", 64, 128),
],
)
@pytest.mark.anyio
async def test_digest_auth_qop_auth_int_not_implemented() -> None:
async def test_digest_auth_qop_auth_int(
algorithm: str, expected_hash_length: int, expected_response_length: int
) -> None:
url = "https://example.org/"
auth = httpx.DigestAuth(username="user", password="password123")
app = DigestApp(qop="auth-int")
app = DigestApp(qop="auth-int", algorithm=algorithm)

async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
with pytest.raises(NotImplementedError):
await client.get(url, auth=auth)
response = await client.get(url, auth=auth)

assert response.status_code == 200
assert len(response.history) == 1

authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
scheme, _, fields = authorization.partition(" ")
assert scheme == "Digest"

response_fields = [field.strip() for field in fields.split(",")]
digest_data = dict(field.split("=") for field in response_fields)

assert digest_data["username"] == '"user"'
assert digest_data["realm"] == '"httpx@example.org"'
assert "nonce" in digest_data
assert digest_data["uri"] == '"/"'
assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
assert len(digest_data["opaque"]) == expected_hash_length + 2
assert digest_data["algorithm"] == algorithm
assert digest_data["qop"] == "auth-int"
assert digest_data["nc"] == "00000001"
assert len(digest_data["cnonce"]) == 16 + 2


@pytest.mark.anyio
Expand Down
124 changes: 120 additions & 4 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"qop=auth, "
"algorithm=MD5, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
Expand All @@ -232,7 +232,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert "qop=auth," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
Expand Down Expand Up @@ -268,7 +268,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
'qop="auth, auth-int", '
"qop=auth, "
"algorithm=SHA-256, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
Expand All @@ -292,7 +292,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth" in request.headers["Authorization"]
assert "qop=auth," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
Expand All @@ -306,3 +306,119 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)


def test_digest_auth_int_rfc_7616_md5(monkeypatch):
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()

auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)

request = httpx.Request("GET", "https://www.example.com/dir/index.html")

# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers

# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
"qop=auth-int, "
"algorithm=MD5, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=MD5" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth-int," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="8804a53d3640a40a4f73cea12c5ba451"'
in request.headers["Authorization"]
)

# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)


def test_digest_auth_int_rfc7616_sha256(monkeypatch):
def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes:
return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode()

auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life")
monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce)

request = httpx.Request("GET", "https://www.example.com/dir/index.html")

# The initial request should not include an auth header.
flow = auth.sync_auth_flow(request)
request = next(flow)
assert "Authorization" not in request.headers

# If a 401 response is returned, then a digest auth request is made.
headers = {
"WWW-Authenticate": (
'Digest realm="http-auth@example.org", '
"qop=auth-int, "
"algorithm=SHA-256, "
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", '
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
)
}
response = httpx.Response(
content=b"Auth required", status_code=401, headers=headers, request=request
)
request = flow.send(response)
assert request.headers["Authorization"].startswith("Digest")
assert 'username="Mufasa"' in request.headers["Authorization"]
assert 'realm="http-auth@example.org"' in request.headers["Authorization"]
assert 'uri="/dir/index.html"' in request.headers["Authorization"]
assert "algorithm=SHA-256" in request.headers["Authorization"]
assert (
'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"'
in request.headers["Authorization"]
)
assert "nc=00000001" in request.headers["Authorization"]
assert (
'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"'
in request.headers["Authorization"]
)
assert "qop=auth-int," in request.headers["Authorization"]
assert (
'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"'
in request.headers["Authorization"]
)
assert (
'response="8bdf6f15638e260831e905028de5450562816d093c9bfc5c13d3a46adcdde940"'
in request.headers["Authorization"]
)

# No other requests are made.
response = httpx.Response(content=b"Hello, world!", status_code=200)
with pytest.raises(StopIteration):
flow.send(response)