Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pyodide/internal/serializeJsModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export function createImportProxy(
if (!IS_CREATING_SNAPSHOT) {
return mod;
}
if (!mod || typeof mod !== 'object') {
if (!mod || (typeof mod !== 'object' && typeof mod !== 'function')) {
return mod;
}
return new Proxy(mod, {
Expand Down
90 changes: 39 additions & 51 deletions src/pyodide/internal/workers-api/src/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any

import js
from workers import Context, Request
from workers import Context, Request, wait_until

ASGI = {"spec_version": "2.0", "version": "3.0"}
logger = logging.getLogger("asgi")
Expand Down Expand Up @@ -115,7 +115,12 @@ async def send(got):


async def process_request(
app: Any, req: "Request | js.Request", env: Any, ctx: Context
app: Any,
req: "Request | js.Request",
env: Any,
# added for waitUntil, but not used anymore
# TODO(later): remove this parameter after unvendoring Python SDK from workerd
ctx: Context | None,
) -> js.Response:
from js import Object, Response, TransformStream

Expand All @@ -124,9 +129,11 @@ async def process_request(
status = None
headers = None
result = Future()
is_sse = False
finished_response = Event()

# Streaming state — initialized lazily on first body chunk with more_body=True.
writer = None

receive_queue = Queue()
if req.body:
async for data in req.body:
Expand All @@ -148,60 +155,52 @@ async def receive():
message = {"type": "http.disconnect"}
return message

# Create a transform stream for handling streaming responses
transform_stream = TransformStream.new()
readable = transform_stream.readable
writable = transform_stream.writable
writer = writable.getWriter()

async def send(got):
nonlocal status
nonlocal headers
nonlocal is_sse
nonlocal writer

if got["type"] == "http.response.start":
status = got["status"]
# Like above, we need to convert byte-pairs into string explicitly.
headers = [(k.decode(), v.decode()) for k, v in got["headers"]]
# Check if this is a server-sent events response
for k, v in headers:
if k.lower() == "content-type" and v.lower().startswith(
"text/event-stream"
):
is_sse = True
break
if is_sse:
# For SSE, create and return the response immediately after http.response.start
resp = Response.new(
readable, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)

elif got["type"] == "http.response.body":
body = got["body"]
more_body = got.get("more_body", False)

# Convert body to JS buffer
px = create_proxy(body)
buf = px.getBuffer()
px.destroy()

if is_sse:
# For SSE, write chunk to the stream
await writer.write(buf.data)
# If this is the last chunk, close the writer
if writer is not None:
# Already in streaming mode — write chunk to the stream.
with acquire_js_buffer(body) as jsbytes:
await writer.write(jsbytes)
if not more_body:
await writer.close()
finished_response.set()
elif more_body:
# First body chunk with more data coming — switch to streaming.
# Create a TransformStream so the runtime can start consuming
# body chunks as they are written.
transform_stream = TransformStream.new()
readable = transform_stream.readable
writer = transform_stream.writable.getWriter()
resp = Response.new(
readable, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)
with acquire_js_buffer(body) as jsbytes:
await writer.write(jsbytes)
else:
# Complete body in a single chunk
px = create_proxy(body)
buf = px.getBuffer()
px.destroy()
resp = Response.new(
buf.data, headers=Object.fromEntries(headers), status=status
)
result.set_result(resp)
await writer.close()
finished_response.set()

# Run the application in the background to handle SSE
# Run the application in the background
async def run_app():
try:
await app(request_to_scope(req, env), receive, send)
Expand All @@ -212,30 +211,19 @@ async def run_app():
except Exception as e:
if not result.done():
result.set_exception(e)
await writer.close() # Close the writer
if writer is not None:
await writer.close()
finished_response.set()
else:
# Response already sent — exception can't be propagated to the
# client, so log it to avoid silently swallowing errors.
logger.exception("Exception in ASGI application after response started")

# Create task to run the application in the background
app_task = create_task(run_app())

# Wait for the result (the response)
response = await result

# For non-SSE responses, we need to wait for the application to complete
if not is_sse:
await app_task
else: # noqa: PLR5501
if ctx is not None:
ctx.waitUntil(create_proxy(app_task))
else:
raise RuntimeError(
"Server-Side-Events require ctx to be passed to asgi.fetch"
)
return response
app_task = create_proxy(create_task(run_app()))
wait_until(app_task)
app_task.destroy()
return await result


async def process_websocket(app: Any, req: "Request | js.Request") -> js.Response:
Expand Down
3 changes: 2 additions & 1 deletion src/pyodide/internal/workers-api/src/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@
"python_from_rpc",
"python_to_rpc",
"waitUntil",
"wait_until",
]


def __getattr__(key):
if key == "env":
cloudflare_workers = import_from_javascript("cloudflare:workers")
return cloudflare_workers.env
if key == "waitUntil":
if key in ("wait_until", "waitUntil"):
cloudflare_workers = import_from_javascript("cloudflare:workers")
return cloudflare_workers.waitUntil
raise AttributeError(f"module {__name__!r} has no attribute {key!r}")
5 changes: 4 additions & 1 deletion src/pyodide/python-entrypoint-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ function patchWaitUntil(ctx: {

export type PyodideEntrypointHelper = {
doAnImport: (mod: string) => Promise<any>;
cloudflareWorkersModule: { env: any };
cloudflareWorkersModule: {
env: any;
waitUntil: (p: Promise<void> | PyFuture<void>) => void;
};
cloudflareSocketsModule: any;
workerEntrypoint: any;
patchWaitUntil: typeof patchWaitUntil;
Expand Down
2 changes: 0 additions & 2 deletions src/workerd/server/tests/python/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ py_wd_test("env-param")

py_wd_test("asgi")

py_wd_test("asgi-sse")

py_wd_test("random")

py_wd_test("subdirectory")
Expand Down
17 changes: 0 additions & 17 deletions src/workerd/server/tests/python/asgi-sse/asgi-sse.wd-test

This file was deleted.

102 changes: 0 additions & 102 deletions src/workerd/server/tests/python/asgi-sse/worker.py

This file was deleted.

Loading