Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES/10991.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Optimized small HTTP requests/responses by coalescing headers and body into a single TCP packet -- by :user:`bdraco`.

This change enhances network efficiency by reducing the number of packets sent for small HTTP payloads, improving latency and reducing overhead. Most importantly, this fixes compatibility with memory-constrained IoT devices that can only perform a single read operation and expect HTTP requests in one packet. The optimization uses zero-copy ``writelines`` when coalescing data and works with both regular and chunked transfer encoding.

When ``aiohttp`` uses client middleware to communicate with an ``aiohttp`` server, connection reuse is more likely to occur since complete responses arrive in a single packet for small payloads.

This aligns ``aiohttp`` with other popular HTTP clients that already coalesce small requests.
7 changes: 7 additions & 0 deletions aiohttp/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ async def write_headers(
) -> None:
"""Write HTTP headers"""

def send_headers(self) -> None:
"""Force sending buffered headers if not already sent.

Required only if write_headers() buffers headers instead of sending immediately.
For backwards compatibility, this method does nothing by default.
"""


class AbstractAccessLogger(ABC):
"""Abstract writer to access log."""
Expand Down
5 changes: 5 additions & 0 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ async def write_bytes(
"""
# 100 response
if self._continue is not None:
# Force headers to be sent before waiting for 100-continue
writer.send_headers()
await writer.drain()
await self._continue

Expand Down Expand Up @@ -763,7 +765,10 @@ async def send(self, conn: "Connection") -> "ClientResponse":

# status + headers
status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"

# Buffer headers for potential coalescing with body
await writer.write_headers(status_line, self.headers)

task: Optional["asyncio.Task[None]"]
if self.body or self._continue is not None or protocol.writing_paused:
coro = self.write_bytes(writer, conn, self._get_content_length())
Expand Down
158 changes: 144 additions & 14 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import sys
from typing import ( # noqa
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Expand Down Expand Up @@ -71,6 +72,8 @@ def __init__(
self.loop = loop
self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent
self._on_headers_sent: _T_OnHeadersSent = on_headers_sent
self._headers_buf: Optional[bytes] = None
self._headers_written: bool = False

@property
def transport(self) -> Optional[asyncio.Transport]:
Expand Down Expand Up @@ -118,14 +121,58 @@ def _writelines(
else:
transport.writelines(chunks) # type: ignore[arg-type]

def _write_chunked_payload(
self, chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
) -> None:
"""Write a chunk with proper chunked encoding."""
chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
self._writelines((chunk_len_pre, chunk, b"\r\n"))

def _send_headers_with_payload(
self,
chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
is_eof: bool,
) -> None:
"""Send buffered headers with payload, coalescing into single write."""
# Mark headers as written
self._headers_written = True
headers_buf = self._headers_buf
self._headers_buf = None

if TYPE_CHECKING:
# Safe because callers (write() and write_eof()) only invoke this method
# after checking that self._headers_buf is truthy
assert headers_buf is not None

if not self.chunked:
# Non-chunked: coalesce headers with body
if chunk:
self._writelines((headers_buf, chunk))
else:
self._write(headers_buf)
return

# Coalesce headers with chunked data
if chunk:
chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
if is_eof:
self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))
else:
self._writelines((headers_buf, chunk_len_pre, chunk, b"\r\n"))
elif is_eof:
self._writelines((headers_buf, b"0\r\n\r\n"))
else:
self._write(headers_buf)

async def write(
self,
chunk: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"],
*,
drain: bool = True,
LIMIT: int = 0x10000,
) -> None:
"""Writes chunk of data to a stream.
"""
Writes chunk of data to a stream.

write_eof() indicates end of stream.
writer can't be used after write_eof() method being called.
Expand Down Expand Up @@ -154,31 +201,75 @@ async def write(
if not chunk:
return

# Handle buffered headers for small payload optimization
if self._headers_buf and not self._headers_written:
self._send_headers_with_payload(chunk, False)
if drain and self.buffer_size > LIMIT:
self.buffer_size = 0
await self.drain()
return

if chunk:
if self.chunked:
self._writelines(
(f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n")
)
self._write_chunked_payload(chunk)
else:
self._write(chunk)

if self.buffer_size > LIMIT and drain:
if drain and self.buffer_size > LIMIT:
self.buffer_size = 0
await self.drain()

async def write_headers(
self, status_line: str, headers: "CIMultiDict[str]"
) -> None:
"""Write request/response status and headers."""
"""Write headers to the stream."""
if self._on_headers_sent is not None:
await self._on_headers_sent(headers)

# status + headers
buf = _serialize_headers(status_line, headers)
self._write(buf)
self._headers_written = False
self._headers_buf = buf

def send_headers(self) -> None:
"""Force sending buffered headers if not already sent."""
if not self._headers_buf or self._headers_written:
return

self._headers_written = True
headers_buf = self._headers_buf
self._headers_buf = None

if TYPE_CHECKING:
# Safe because we only enter this block when self._headers_buf is truthy
assert headers_buf is not None

self._write(headers_buf)

def set_eof(self) -> None:
"""Indicate that the message is complete."""
if self._eof:
return

# If headers haven't been sent yet, send them now
# This handles the case where there's no body at all
if self._headers_buf and not self._headers_written:
self._headers_written = True
headers_buf = self._headers_buf
self._headers_buf = None

if TYPE_CHECKING:
# Safe because we only enter this block when self._headers_buf is truthy
assert headers_buf is not None

# Combine headers and chunked EOF marker in a single write
if self.chunked:
self._writelines((headers_buf, b"0\r\n\r\n"))
else:
self._write(headers_buf)
elif self.chunked and self._headers_written:
# Headers already sent, just send the final chunk marker
self._write(b"0\r\n\r\n")

self._eof = True

async def write_eof(self, chunk: bytes = b"") -> None:
Expand All @@ -188,6 +279,7 @@ async def write_eof(self, chunk: bytes = b"") -> None:
if chunk and self._on_chunk_sent is not None:
await self._on_chunk_sent(chunk)

# Handle body/compression
if self._compress:
chunks: List[bytes] = []
chunks_len = 0
Expand All @@ -200,23 +292,61 @@ async def write_eof(self, chunk: bytes = b"") -> None:
chunks.append(flush_chunk)
assert chunks_len

# Send buffered headers with compressed data if not yet sent
if self._headers_buf and not self._headers_written:
self._headers_written = True
headers_buf = self._headers_buf
self._headers_buf = None

if self.chunked:
# Coalesce headers with compressed chunked data
chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
self._writelines(
(headers_buf, chunk_len_pre, *chunks, b"\r\n0\r\n\r\n")
)
else:
# Coalesce headers with compressed data
self._writelines((headers_buf, *chunks))
await self.drain()
self._eof = True
return

# Headers already sent, just write compressed data
if self.chunked:
chunk_len_pre = f"{chunks_len:x}\r\n".encode("ascii")
self._writelines((chunk_len_pre, *chunks, b"\r\n0\r\n\r\n"))
elif len(chunks) > 1:
self._writelines(chunks)
else:
self._write(chunks[0])
elif self.chunked:
await self.drain()
self._eof = True
return

# No compression - send buffered headers if not yet sent
if self._headers_buf and not self._headers_written:
# Use helper to send headers with payload
self._send_headers_with_payload(chunk, True)
await self.drain()
self._eof = True
return

# Handle remaining body
if self.chunked:
if chunk:
chunk_len_pre = f"{len(chunk):x}\r\n".encode("ascii")
self._writelines((chunk_len_pre, chunk, b"\r\n0\r\n\r\n"))
# Write final chunk with EOF marker
self._writelines(
(f"{len(chunk):x}\r\n".encode("ascii"), chunk, b"\r\n0\r\n\r\n")
)
else:
self._write(b"0\r\n\r\n")
elif chunk:
self._write(chunk)
await self.drain()
self._eof = True
return

await self.drain()
if chunk:
self._write(chunk)
await self.drain()

self._eof = True

Expand Down
6 changes: 6 additions & 0 deletions aiohttp/web_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class StreamResponse(BaseClass, HeadersMixin, CookieMixin):
_eof_sent: bool = False
_must_be_empty_body: Optional[bool] = None
_body_length = 0
_send_headers_immediately = True

def __init__(
self,
Expand Down Expand Up @@ -441,6 +442,10 @@ async def _write_headers(self) -> None:
status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
await writer.write_headers(status_line, self._headers)

# Send headers immediately if not opted into buffering
if self._send_headers_immediately:
writer.send_headers()

async def write(
self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
) -> None:
Expand Down Expand Up @@ -519,6 +524,7 @@ def __bool__(self) -> bool:
class Response(StreamResponse):

_compressed_body: Optional[bytes] = None
_send_headers_immediately = False

def __init__(
self,
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ initializer
inline
intaking
io
IoT
ip
IP
ipdb
Expand Down
Loading
Loading