-
Notifications
You must be signed in to change notification settings - Fork 1
Add support for deflate, brotli, and zstd compression formats #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f699e0
468ce2a
f867495
6be939f
499dd68
9121fa8
acb3f30
b354e93
5ab416e
0a80ea7
80c96d2
c388641
59f5c23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,9 +2,11 @@ | |||||||||||
|
|
||||||||||||
| from __future__ import annotations | ||||||||||||
|
|
||||||||||||
| import gzip | ||||||||||||
| import logging | ||||||||||||
| import re | ||||||||||||
| import time | ||||||||||||
| import zlib | ||||||||||||
| from dataclasses import dataclass, field | ||||||||||||
| from typing import TYPE_CHECKING, Any, Literal, cast | ||||||||||||
| from uuid import uuid4 | ||||||||||||
|
|
@@ -15,6 +17,17 @@ | |||||||||||
| from debug_toolbar.litestar.panels.events import collect_events_metadata | ||||||||||||
| from litestar.middleware import AbstractMiddleware | ||||||||||||
|
|
||||||||||||
| # Optional compression libraries | ||||||||||||
| try: | ||||||||||||
| import brotli | ||||||||||||
| except ImportError: | ||||||||||||
| brotli = None # type: ignore[assignment] | ||||||||||||
|
|
||||||||||||
| try: | ||||||||||||
| import zstandard | ||||||||||||
| except ImportError: | ||||||||||||
| zstandard = None # type: ignore[assignment] | ||||||||||||
|
|
||||||||||||
| if TYPE_CHECKING: | ||||||||||||
| from litestar.types import ( | ||||||||||||
| ASGIApp, | ||||||||||||
|
|
@@ -190,20 +203,27 @@ async def _handle_response_body( | |||||||||||
| async def _send_html_response(self, send: Send, context: RequestContext, state: ResponseState) -> None: | ||||||||||||
| """Process and send buffered HTML response with toolbar injection.""" | ||||||||||||
| full_body = b"".join(state.body_chunks) | ||||||||||||
| content_encoding = state.headers.get("content-encoding", "") | ||||||||||||
|
|
||||||||||||
| try: | ||||||||||||
| await self.toolbar.process_response(context) | ||||||||||||
| modified_body = self._inject_toolbar(full_body, context) | ||||||||||||
| modified_body, new_encoding = self._inject_toolbar(full_body, context, content_encoding) | ||||||||||||
| server_timing = self.toolbar.get_server_timing_header(context) | ||||||||||||
| except Exception: | ||||||||||||
| logger.debug("Toolbar processing failed, sending original response", exc_info=True) | ||||||||||||
| modified_body = full_body | ||||||||||||
| new_encoding = content_encoding | ||||||||||||
| server_timing = None | ||||||||||||
|
|
||||||||||||
| # Build headers, excluding content-length (recalculated) and content-encoding (may have changed) | ||||||||||||
| excluded_headers = {"content-length", "content-encoding"} | ||||||||||||
| new_headers: list[tuple[bytes, bytes]] = [ | ||||||||||||
| (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() != "content-length" | ||||||||||||
| (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() not in excluded_headers | ||||||||||||
| ] | ||||||||||||
| new_headers.append((b"content-length", str(len(modified_body)).encode())) | ||||||||||||
| # Only add content-encoding if we still have one (not stripped due to decompression) | ||||||||||||
| if new_encoding: | ||||||||||||
| new_headers.append((b"content-encoding", new_encoding.encode())) | ||||||||||||
| if server_timing: | ||||||||||||
| new_headers.append((b"server-timing", server_timing.encode())) | ||||||||||||
|
|
||||||||||||
|
|
@@ -471,20 +491,86 @@ def _populate_routes_metadata(self, request: Request, context: RequestContext) - | |||||||||||
| context.metadata["routes"] = [] | ||||||||||||
| context.metadata["matched_route"] = "" | ||||||||||||
|
|
||||||||||||
| def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: | ||||||||||||
| def _decompress_body(self, body: bytes, encodings: list[str]) -> tuple[bytes, bool]: # noqa: PLR0912 | ||||||||||||
| """Decompress response body based on content-encoding. | ||||||||||||
|
|
||||||||||||
| Args: | ||||||||||||
| body: The compressed response body. | ||||||||||||
| encodings: List of encoding formats (e.g., ["gzip", "deflate"]). | ||||||||||||
|
|
||||||||||||
| Returns: | ||||||||||||
| Tuple of (decompressed body, success flag). | ||||||||||||
| Success flag is True if decompression succeeded, False otherwise. | ||||||||||||
| """ | ||||||||||||
| decompressed = False | ||||||||||||
| # Process encodings in reverse order (last applied encoding is first to remove) | ||||||||||||
| for encoding in reversed(encodings): | ||||||||||||
| if encoding == "gzip": | ||||||||||||
| try: | ||||||||||||
| body = gzip.decompress(body) | ||||||||||||
| decompressed = True | ||||||||||||
| except (gzip.BadGzipFile, OSError): | ||||||||||||
| # Not valid gzip, try next encoding or decode as-is | ||||||||||||
| break | ||||||||||||
| elif encoding == "deflate": | ||||||||||||
| try: | ||||||||||||
| body = zlib.decompress(body) | ||||||||||||
| decompressed = True | ||||||||||||
| except zlib.error: | ||||||||||||
| # Not valid deflate, try next encoding or decode as-is | ||||||||||||
| break | ||||||||||||
| elif encoding == "br": | ||||||||||||
| if brotli is not None: | ||||||||||||
| try: | ||||||||||||
| body = brotli.decompress(body) | ||||||||||||
| decompressed = True | ||||||||||||
| except brotli.error: # type: ignore[attr-defined] | ||||||||||||
| # Not valid brotli, try next encoding or decode as-is | ||||||||||||
| break | ||||||||||||
| else: | ||||||||||||
| # Brotli not available, can't decompress | ||||||||||||
| logger.debug("Brotli compression detected but brotli library not installed") | ||||||||||||
| break | ||||||||||||
| elif encoding == "zstd": | ||||||||||||
| if zstandard is not None: | ||||||||||||
| try: | ||||||||||||
| dctx = zstandard.ZstdDecompressor() | ||||||||||||
| body = dctx.decompress(body) | ||||||||||||
| decompressed = True | ||||||||||||
| except zstandard.ZstdError: | ||||||||||||
| # Not valid zstd, try next encoding or decode as-is | ||||||||||||
| break | ||||||||||||
|
Comment on lines
+507
to
+542
|
||||||||||||
| else: | ||||||||||||
| # Zstandard not available, can't decompress | ||||||||||||
| logger.debug("Zstandard compression detected but zstandard library not installed") | ||||||||||||
| break | ||||||||||||
|
||||||||||||
| break | |
| break | |
| elif encoding == "identity": | |
| # "identity" means no encoding; body is already uncompressed | |
| continue |
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a compression library is not available (brotli or zstandard), the code logs a debug message and breaks from the decompression loop. However, after breaking, the decompressed flag remains False even though the method returns the original body unchanged.
This could be confusing because the function signature suggests decompressed=False means "decompression failed", but in this case it means "decompression wasn't attempted". The calling code in _inject_toolbar will then try to decode the body as UTF-8 and potentially inject the toolbar into still-compressed data. Consider logging a warning instead of debug, or returning the original encoding so the response can be passed through unchanged.
Copilot
AI
Dec 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The encodings list includes empty strings when the content-encoding header value ends with a comma (e.g., "gzip,") or contains double commas (e.g., "gzip,,deflate"). The strip() operation on line 562 removes whitespace but not empty strings resulting from split.
Consider filtering out empty strings from the encodings list to avoid unnecessary loop iterations: encodings = [e.strip() for e in content_encoding.lower().split(",") if e.strip()]
| encodings = [e.strip() for e in content_encoding.lower().split(",")] if content_encoding else [] | |
| encodings = [e.strip() for e in content_encoding.lower().split(",") if e.strip()] if content_encoding else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new
ZstdDecompressorinstance is created for every zstd decompression operation. For frequently accessed compressed responses, this could have a performance impact.Consider instantiating the decompressor once at the class or module level (when zstandard is available) and reusing it, or verify if the overhead is negligible. The brotli library uses
brotli.decompress()as a module-level function, which is more efficient.