-
Notifications
You must be signed in to change notification settings - Fork 1
fix: handle gzip-compressed responses in toolbar injection #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f699e0
468ce2a
f867495
6be939f
499dd68
9121fa8
acb3f30
b354e93
5ab416e
0a80ea7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import gzip | ||
| import logging | ||
| import re | ||
| import time | ||
|
|
@@ -190,20 +191,27 @@ async def _handle_response_body( | |
| async def _send_html_response(self, send: Send, context: RequestContext, state: ResponseState) -> None: | ||
| """Process and send buffered HTML response with toolbar injection.""" | ||
| full_body = b"".join(state.body_chunks) | ||
| content_encoding = state.headers.get("content-encoding", "") | ||
|
|
||
| try: | ||
| await self.toolbar.process_response(context) | ||
| modified_body = self._inject_toolbar(full_body, context) | ||
| modified_body, new_encoding = self._inject_toolbar(full_body, context, content_encoding) | ||
| server_timing = self.toolbar.get_server_timing_header(context) | ||
| except Exception: | ||
| logger.debug("Toolbar processing failed, sending original response", exc_info=True) | ||
| modified_body = full_body | ||
| new_encoding = content_encoding | ||
| server_timing = None | ||
|
|
||
| # Build headers, excluding content-length (recalculated) and content-encoding (may have changed) | ||
| excluded_headers = {"content-length", "content-encoding"} | ||
| new_headers: list[tuple[bytes, bytes]] = [ | ||
| (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() != "content-length" | ||
| (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() not in excluded_headers | ||
| ] | ||
| new_headers.append((b"content-length", str(len(modified_body)).encode())) | ||
| # Only add content-encoding if we still have one (not stripped due to decompression) | ||
| if new_encoding: | ||
| new_headers.append((b"content-encoding", new_encoding.encode())) | ||
| if server_timing: | ||
| new_headers.append((b"server-timing", server_timing.encode())) | ||
|
|
||
|
|
@@ -471,20 +479,39 @@ def _populate_routes_metadata(self, request: Request, context: RequestContext) - | |
| context.metadata["routes"] = [] | ||
| context.metadata["matched_route"] = "" | ||
|
|
||
| def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: | ||
| def _inject_toolbar(self, body: bytes, context: RequestContext, content_encoding: str = "") -> tuple[bytes, str]: | ||
| """Inject the toolbar HTML into the response body. | ||
|
|
||
| Args: | ||
| body: The original response body. | ||
| body: The original response body (may be compressed). | ||
| context: The request context with collected data. | ||
| content_encoding: The content-encoding header value (e.g., "gzip"). | ||
|
|
||
| Returns: | ||
| The modified response body with toolbar injected. | ||
| Tuple of (modified body, content_encoding to use). | ||
| If gzip was decompressed, returns uncompressed body with empty encoding. | ||
| """ | ||
| # Handle gzip-compressed responses | ||
| # Track whether we successfully decompressed the body | ||
| decompressed = False | ||
| encodings = [e.strip() for e in content_encoding.lower().split(",")] if content_encoding else [] | ||
| if "gzip" in encodings: | ||
| try: | ||
| body = gzip.decompress(body) | ||
| decompressed = True | ||
| except gzip.BadGzipFile: | ||
| # Not valid gzip, try to decode as-is | ||
| pass | ||
|
|
||
| try: | ||
| html = body.decode("utf-8") | ||
| except UnicodeDecodeError: | ||
| return body | ||
| # Can't decode. If we successfully decompressed gzip, return the | ||
| # decompressed body with no content-encoding. Otherwise, return | ||
| # the body as-is with the original encoding. | ||
| if decompressed: | ||
| return body, "" | ||
| return body, content_encoding | ||
|
Comment on lines
+509
to
+514
|
||
|
|
||
| toolbar_data = self.toolbar.get_toolbar_data(context) | ||
| toolbar_html = self._render_toolbar(toolbar_data) | ||
|
|
@@ -496,7 +523,10 @@ def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: | |
| pattern = re.compile(re.escape(insert_before), re.IGNORECASE) | ||
| html = pattern.sub(toolbar_html + insert_before, html, count=1) | ||
|
|
||
| return html.encode("utf-8") | ||
| # Return body as uncompressed UTF-8 with empty content-encoding. | ||
| # This applies to all successful toolbar injections, regardless of whether | ||
| # the input was originally compressed (we decompress before processing). | ||
| return html.encode("utf-8"), "" | ||
|
|
||
| def _render_toolbar(self, data: dict[str, Any]) -> str: | ||
JacobCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Render the toolbar HTML. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.