Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ repos:
- id: pyupgrade
args: ['--py37-plus']
- repo: https://github.com/PyCQA/flake8
rev: '7.2.0'
rev: '7.3.0'
hooks:
- id: flake8
additional_dependencies:
Expand Down
1 change: 1 addition & 0 deletions CHANGES/11269.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added initial trailer parsing logic to Python HTTP parser -- by :user:`Dreamsorcerer`.
1 change: 1 addition & 0 deletions CHANGES/11273.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed :py:meth:`ClientSession.close() <aiohttp.ClientSession.close>` hanging indefinitely when using HTTPS requests through HTTP proxies -- by :user:`bdraco`.
22 changes: 21 additions & 1 deletion aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,26 @@ def closed(self) -> bool:
return self._protocol is None or not self._protocol.is_connected()


class _ConnectTunnelConnection(Connection):
"""Special connection wrapper for CONNECT tunnels that must never be pooled.

This connection wraps the proxy connection that will be upgraded with TLS.
It must never be released to the pool because:
1. Its 'closed' future will never complete, causing session.close() to hang
2. It represents an intermediate state, not a reusable connection
3. The real connection (with TLS) will be created separately
"""

def release(self) -> None:
"""Do nothing - don't pool or close the connection.

These connections are an intermediate state during the CONNECT tunnel
setup and will be cleaned up naturally after the TLS upgrade. If they
were to be pooled, they would never be properly closed, causing
session.close() to wait forever for their 'closed' future.
"""


class _TransportPlaceholder:
"""placeholder for BaseConnector.connect function"""

Expand Down Expand Up @@ -1496,7 +1516,7 @@ async def _create_proxy_connection(
key = req.connection_key._replace(
proxy=None, proxy_auth=None, proxy_headers_hash=None
)
conn = Connection(self, key, proto, self._loop)
conn = _ConnectTunnelConnection(self, key, proto, self._loop)
proxy_resp = await proxy_req.send(conn)
try:
protocol = conn._protocol
Expand Down
71 changes: 37 additions & 34 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class ChunkState(IntEnum):
PARSE_CHUNKED_SIZE = 0
PARSE_CHUNKED_CHUNK = 1
PARSE_CHUNKED_CHUNK_EOF = 2
PARSE_MAYBE_TRAILERS = 3
PARSE_TRAILERS = 4


Expand All @@ -142,8 +141,8 @@ def parse_headers(
# note: "raw" does not mean inclusion of OWS before/after the field value
raw_headers = []

lines_idx = 1
line = lines[1]
lines_idx = 0
line = lines[lines_idx]
line_count = len(lines)

while line:
Expand Down Expand Up @@ -394,6 +393,7 @@ def get_content_length() -> Optional[int]:
response_with_body=self.response_with_body,
auto_decompress=self._auto_decompress,
lax=self.lax,
headers_parser=self._headers_parser,
)
if not payload_parser.done:
self._payload_parser = payload_parser
Expand All @@ -412,6 +412,7 @@ def get_content_length() -> Optional[int]:
compression=msg.compression,
auto_decompress=self._auto_decompress,
lax=self.lax,
headers_parser=self._headers_parser,
)
elif not empty_body and length is None and self.read_until_eof:
payload = StreamReader(
Expand All @@ -430,6 +431,7 @@ def get_content_length() -> Optional[int]:
response_with_body=self.response_with_body,
auto_decompress=self._auto_decompress,
lax=self.lax,
headers_parser=self._headers_parser,
)
if not payload_parser.done:
self._payload_parser = payload_parser
Expand Down Expand Up @@ -467,6 +469,10 @@ def get_content_length() -> Optional[int]:

eof = True
data = b""
if isinstance(
underlying_exc, (InvalidHeader, TransferEncodingError)
):
raise

if eof:
start_pos = 0
Expand Down Expand Up @@ -629,7 +635,7 @@ def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
compression,
upgrade,
chunked,
) = self.parse_headers(lines)
) = self.parse_headers(lines[1:])

if close is None: # then the headers weren't set in the request
if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
Expand Down Expand Up @@ -715,7 +721,7 @@ def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
compression,
upgrade,
chunked,
) = self.parse_headers(lines)
) = self.parse_headers(lines[1:])

if close is None:
if version_o <= HttpVersion10:
Expand Down Expand Up @@ -758,6 +764,8 @@ def __init__(
response_with_body: bool = True,
auto_decompress: bool = True,
lax: bool = False,
*,
headers_parser: HeadersParser,
) -> None:
self._length = 0
self._type = ParseState.PARSE_UNTIL_EOF
Expand All @@ -766,6 +774,8 @@ def __init__(
self._chunk_tail = b""
self._auto_decompress = auto_decompress
self._lax = lax
self._headers_parser = headers_parser
self._trailer_lines: list[bytes] = []
self.done = False

# payload decompression wrapper
Expand Down Expand Up @@ -833,7 +843,7 @@ def feed_data(
size_b = chunk[:i] # strip chunk-extensions
# Verify no LF in the chunk-extension
if b"\n" in (ext := chunk[i:pos]):
exc = BadHttpMessage(
exc = TransferEncodingError(
f"Unexpected LF in chunk-extension: {ext!r}"
)
set_exception(self.payload, exc)
Expand All @@ -854,7 +864,7 @@ def feed_data(

chunk = chunk[pos + len(SEP) :]
if size == 0: # eof marker
self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
self._chunk = ChunkState.PARSE_TRAILERS
if self._lax and chunk.startswith(b"\r"):
chunk = chunk[1:]
else:
Expand Down Expand Up @@ -888,38 +898,31 @@ def feed_data(
self._chunk_tail = chunk
return False, b""

# if stream does not contain trailer, after 0\r\n
# we should get another \r\n otherwise
# trailers needs to be skipped until \r\n\r\n
if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS:
head = chunk[: len(SEP)]
if head == SEP:
# end of stream
self.payload.feed_eof()
return True, chunk[len(SEP) :]
# Both CR and LF, or only LF may not be received yet. It is
# expected that CRLF or LF will be shown at the very first
# byte next time, otherwise trailers should come. The last
# CRLF which marks the end of response might not be
# contained in the same TCP segment which delivered the
# size indicator.
if not head:
return False, b""
if head == SEP[:1]:
self._chunk_tail = head
return False, b""
self._chunk = ChunkState.PARSE_TRAILERS

# read and discard trailer up to the CRLF terminator
if self._chunk == ChunkState.PARSE_TRAILERS:
pos = chunk.find(SEP)
if pos >= 0:
chunk = chunk[pos + len(SEP) :]
self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
else:
if pos < 0: # No line found
self._chunk_tail = chunk
return False, b""

line = chunk[:pos]
chunk = chunk[pos + len(SEP) :]
if SEP == b"\n": # For lax response parsing
line = line.rstrip(b"\r")
self._trailer_lines.append(line)

# \r\n\r\n found, end of stream
if self._trailer_lines[-1] == b"":
# Headers and trailers are defined the same way,
# so we reuse the HeadersParser here.
try:
trailers, raw_trailers = self._headers_parser.parse_headers(
self._trailer_lines
)
finally:
self._trailer_lines.clear()
self.payload.feed_eof()
return True, chunk

# Read all bytes until eof
elif self._type == ParseState.PARSE_UNTIL_EOF:
self.payload.feed_data(chunk)
Expand Down
2 changes: 1 addition & 1 deletion aiohttp/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ async def _read_boundary(self) -> None:
raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}")

async def _read_headers(self) -> "CIMultiDictProxy[str]":
lines = [b""]
lines = []
while True:
chunk = await self._content.readline()
chunk = chunk.strip()
Expand Down
29 changes: 29 additions & 0 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
AddrInfoType,
Connection,
TCPConnector,
_ConnectTunnelConnection,
_DNSCacheTable,
)
from aiohttp.pytest_plugin import AiohttpClient, AiohttpServer
Expand Down Expand Up @@ -4454,3 +4455,31 @@ async def test_available_connections_no_limits(
connection1.close()
assert conn._available_connections(key) == 1
assert conn._available_connections(other_host_key2) == 1


async def test_connect_tunnel_connection_release(
loop: asyncio.AbstractEventLoop,
) -> None:
"""Test _ConnectTunnelConnection.release() does not pool the connection."""
connector = mock.create_autospec(
aiohttp.BaseConnector, spec_set=True, instance=True
)
key = mock.create_autospec(ConnectionKey, spec_set=True, instance=True)
protocol = mock.create_autospec(ResponseHandler, spec_set=True, instance=True)

# Create a connect tunnel connection
conn = _ConnectTunnelConnection(connector, key, protocol, loop)

# Verify protocol is set
assert conn._protocol is protocol

# Release should do nothing (not pool the connection)
conn.release()

# Protocol should still be there (not released to pool)
assert conn._protocol is protocol
# Connector._release should NOT have been called
connector._release.assert_not_called()

# Clean up to avoid resource warning
conn.close()
Loading
Loading