Skip to content
43 changes: 36 additions & 7 deletions src/debug_toolbar/litestar/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import gzip
import logging
import re
import time
Expand Down Expand Up @@ -190,20 +191,27 @@ async def _handle_response_body(
async def _send_html_response(self, send: Send, context: RequestContext, state: ResponseState) -> None:
"""Process and send buffered HTML response with toolbar injection."""
full_body = b"".join(state.body_chunks)
content_encoding = state.headers.get("content-encoding", "")

try:
await self.toolbar.process_response(context)
modified_body = self._inject_toolbar(full_body, context)
modified_body, new_encoding = self._inject_toolbar(full_body, context, content_encoding)
server_timing = self.toolbar.get_server_timing_header(context)
except Exception:
logger.debug("Toolbar processing failed, sending original response", exc_info=True)
modified_body = full_body
new_encoding = content_encoding
server_timing = None

# Build headers, excluding content-length (recalculated) and content-encoding (may have changed)
excluded_headers = {"content-length", "content-encoding"}
new_headers: list[tuple[bytes, bytes]] = [
(k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() != "content-length"
(k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() not in excluded_headers
]
new_headers.append((b"content-length", str(len(modified_body)).encode()))
# Only add content-encoding if we still have one (not stripped due to decompression)
if new_encoding:
new_headers.append((b"content-encoding", new_encoding.encode()))
if server_timing:
new_headers.append((b"server-timing", server_timing.encode()))

Expand Down Expand Up @@ -471,20 +479,38 @@ def _populate_routes_metadata(self, request: Request, context: RequestContext) -
context.metadata["routes"] = []
context.metadata["matched_route"] = ""

def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes:
def _inject_toolbar(self, body: bytes, context: RequestContext, content_encoding: str = "") -> tuple[bytes, str]:
"""Inject the toolbar HTML into the response body.

Args:
body: The original response body.
body: The original response body (may be compressed).
context: The request context with collected data.
content_encoding: The content-encoding header value (e.g., "gzip").

Returns:
The modified response body with toolbar injected.
Tuple of (modified body, content_encoding to use).
If gzip was decompressed, returns uncompressed body with empty encoding.
"""
# Handle gzip-compressed responses
# Track whether we successfully decompressed the body
decompressed = False
if content_encoding.lower() == "gzip":
try:
body = gzip.decompress(body)
decompressed = True
except (gzip.BadGzipFile, OSError):
# Not valid gzip, try to decode as-is
pass

try:
html = body.decode("utf-8")
except UnicodeDecodeError:
return body
# Can't decode. If we successfully decompressed gzip, return the
# decompressed body with no content-encoding. Otherwise, return
# the body as-is with the original encoding.
if decompressed:
return body, ""
return body, content_encoding

toolbar_data = self.toolbar.get_toolbar_data(context)
toolbar_html = self._render_toolbar(toolbar_data)
Expand All @@ -496,7 +522,10 @@ def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes:
pattern = re.compile(re.escape(insert_before), re.IGNORECASE)
html = pattern.sub(toolbar_html + insert_before, html, count=1)

return html.encode("utf-8")
# Return body as uncompressed UTF-8 with empty content-encoding.
# This applies to all successful toolbar injections, regardless of whether
# the input was originally compressed (we decompress before processing).
return html.encode("utf-8"), ""

def _render_toolbar(self, data: dict[str, Any]) -> str:
"""Render the toolbar HTML.
Expand Down
160 changes: 159 additions & 1 deletion tests/integration/test_litestar_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from __future__ import annotations

import gzip

import pytest
from litestar.testing import TestClient

from debug_toolbar.litestar import DebugToolbarPlugin, LitestarDebugToolbarConfig
from litestar import Litestar, MediaType, get
from litestar import Litestar, MediaType, Response, get
from litestar.status_codes import HTTP_200_OK


@get("/", media_type=MediaType.HTML)
Expand Down Expand Up @@ -298,3 +301,158 @@ async def after_request(response: Response) -> Response:
assert response.status_code == 200
assert b"debug-toolbar" in response.content
assert hook_state["before"], "before_request hook was not called"


class TestGzipCompression:
"""Test toolbar injection with gzip-compressed responses."""

def test_toolbar_injected_with_gzip_compression(self) -> None:
"""Test that toolbar is correctly injected when response is gzip-compressed.

This tests the fix for the issue where gzip-compressed responses would
fail to have the toolbar injected because the middleware couldn't decode
the compressed bytes as UTF-8.
"""
from litestar.config.compression import CompressionConfig

config = LitestarDebugToolbarConfig(enabled=True)
app = Litestar(
route_handlers=[html_handler],
plugins=[DebugToolbarPlugin(config)],
compression_config=CompressionConfig(backend="gzip", minimum_size=1),
debug=True,
)
with TestClient(app) as client:
# Request with Accept-Encoding to trigger compression
response = client.get("/", headers={"Accept-Encoding": "gzip"})
assert response.status_code == 200
# At the TestClient level we see an uncompressed body with the toolbar injected.
assert b"debug-toolbar" in response.content
assert b"</body>" in response.content

def test_toolbar_injected_without_compression(self) -> None:
"""Test that toolbar injection still works without compression."""
config = LitestarDebugToolbarConfig(enabled=True)
app = Litestar(
route_handlers=[html_handler],
plugins=[DebugToolbarPlugin(config)],
debug=True,
)
with TestClient(app) as client:
response = client.get("/")
assert response.status_code == 200
assert b"debug-toolbar" in response.content

def test_invalid_gzip_data_with_gzip_header(self) -> None:
"""Test handling of invalid gzip data with content-encoding: gzip header.

When the response claims to be gzipped but contains invalid gzip data,
the middleware should gracefully fall back to treating it as uncompressed.
"""

@get("/invalid-gzip", media_type=MediaType.HTML)
async def invalid_gzip_handler() -> Response:
"""Return invalid gzip data with gzip content-encoding header."""
# This is not valid gzip data
invalid_gzip = b"This is not gzipped data but pretends to be"
return Response(
content=invalid_gzip,
status_code=HTTP_200_OK,
media_type=MediaType.HTML,
headers={"content-encoding": "gzip"},
)

config = LitestarDebugToolbarConfig(enabled=True)
app = Litestar(
route_handlers=[invalid_gzip_handler],
plugins=[DebugToolbarPlugin(config)],
debug=True,
)
with TestClient(app) as client:
response = client.get("/invalid-gzip")
assert response.status_code == 200
# Should return original content since it couldn't be decompressed
assert b"This is not gzipped data but pretends to be" in response.content

def test_gzip_decompressed_data_fails_utf8_decoding(self) -> None:
"""Test handling of valid gzip data that fails UTF-8 decoding after decompression.

When gzipped data decompresses successfully but contains non-UTF-8 bytes,
the middleware should return the original compressed data.
"""

@get("/binary-gzip", media_type=MediaType.HTML)
async def binary_gzip_handler() -> Response:
"""Return gzipped binary data that's not valid UTF-8."""
# Binary data that's not valid UTF-8
binary_data = b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89"
gzipped = gzip.compress(binary_data)
return Response(
content=gzipped,
status_code=HTTP_200_OK,
media_type=MediaType.HTML,
headers={"content-encoding": "gzip"},
)

config = LitestarDebugToolbarConfig(enabled=True)
app = Litestar(
route_handlers=[binary_gzip_handler],
plugins=[DebugToolbarPlugin(config)],
debug=True,
)
with TestClient(app) as client:
response = client.get("/binary-gzip")
assert response.status_code == 200
# Should return decompressed binary data since UTF-8 decode failed
# The middleware has removed the gzip encoding, so we check for the raw binary content
assert response.content == b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89"

def test_gzip_header_case_insensitive(self) -> None:
"""Test that content-encoding header matching is case-insensitive.

The HTTP spec requires header names to be case-insensitive, so we should
handle various casings of "gzip" (e.g., "GZIP", "Gzip", "GzIp").
"""

@get("/gzip-upper", media_type=MediaType.HTML)
async def gzip_upper_handler() -> Response:
"""Return gzipped HTML with uppercase GZIP encoding."""
html = "<html><body><h1>Test</h1></body></html>"
gzipped = gzip.compress(html.encode("utf-8"))
return Response(
content=gzipped,
status_code=HTTP_200_OK,
media_type=MediaType.HTML,
headers={"content-encoding": "GZIP"},
)

@get("/gzip-mixed", media_type=MediaType.HTML)
async def gzip_mixed_handler() -> Response:
"""Return gzipped HTML with mixed case GzIp encoding."""
html = "<html><body><h1>Test</h1></body></html>"
gzipped = gzip.compress(html.encode("utf-8"))
return Response(
content=gzipped,
status_code=HTTP_200_OK,
media_type=MediaType.HTML,
headers={"content-encoding": "GzIp"},
)

config = LitestarDebugToolbarConfig(enabled=True)
app = Litestar(
route_handlers=[gzip_upper_handler, gzip_mixed_handler],
plugins=[DebugToolbarPlugin(config)],
debug=True,
)
with TestClient(app) as client:
# Test uppercase GZIP
response = client.get("/gzip-upper")
assert response.status_code == 200
assert b"debug-toolbar" in response.content
assert b"</body>" in response.content

# Test mixed case GzIp
response = client.get("/gzip-mixed")
assert response.status_code == 200
assert b"debug-toolbar" in response.content
assert b"</body>" in response.content
Loading