Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ To allow connections to remain open indefinitely, set both `GRPC_SERVER_MAX_CONN
| `DISABLE_DEBUG` | bool | `false` | Disable pprof debug endpoints at `/debug/` |
| `USE_JSON_BUILTIN_MARSHALLER` | bool | `false` | Use `encoding/json` instead of the default protojson marshaller for `application/json` |
| `JSON_BUILTIN_MARSHALLER_MIME` | string | `application/json` | Content-Type for the JSON builtin marshaller |
| `DISABLE_SSE_MARSHALER` | bool | `false` | Disable the auto-registered `text/event-stream` marshaler. When `false` (default), server-streaming RPCs are consumable as Server-Sent Events by clients that send `Accept: text/event-stream` — see [Server-Sent Events](/howto/server-sent-events/) |
| `HTTP_HEADER_PREFIXES` | []string | `""` | HTTP header prefixes to forward as gRPC metadata (comma-separated) |
| `TRACE_HEADER_NAME` | string | `x-trace-id` | HTTP header name for trace ID propagation to log/trace contexts |
| `DISABLE_HTTP_COMPRESSION` | bool | `false` | Disable gzip/zstd compression for HTTP gateway responses |
Expand Down
2 changes: 1 addition & 1 deletion howto/cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
layout: default
title: "Cache"
parent: "How To"
nav_order: 23
nav_order: 24
description: "How to wire a Redis or Valkey cache into a ColdBrew service: client init in PreStart, drain in Stop, cache-aside, and tracing via NewDatastoreSpan"
---
# Cache
Expand Down
2 changes: 1 addition & 1 deletion howto/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
layout: default
title: "Database"
parent: "How To"
nav_order: 22
nav_order: 23
description: "How to wire a database connection pool into a ColdBrew service: pool init in PreStart, drain in Stop, and tracing via NewDatastoreSpan. Library-agnostic with a pgx Postgres example"
---
# Database
Expand Down
1 change: 1 addition & 0 deletions howto/gateway-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Registered options are applied **after** ColdBrew's built-ins. Built-ins include
- The incoming-header matcher derived from `HTTP_HEADER_PREFIXES`
- Marshalers for `application/proto` and `application/protobuf`
- The internal `spanRouteMiddleware` (sets the OTEL span name + `http.route` attribute)
- A `text/event-stream` marshaler (`core.SSEMarshaler`) so server-streaming RPCs are browser `EventSource`-consumable out of the box. Set `DISABLE_SSE_MARSHALER=true` to suppress, or register your own marshaler for `text/event-stream` to override the default. See [Server-Sent Events](/howto/server-sent-events/) for the framing details
- Optionally the JSON builtin marshaler when `USE_JSON_BUILTIN_MARSHALLER=true`

Because grpc-gateway's option model is last-write-wins for some options and additive for others, the practical effect is:
Expand Down
2 changes: 1 addition & 1 deletion howto/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
layout: default
title: "Messaging"
parent: "How To"
nav_order: 24
nav_order: 25
description: "How to run Kafka or NATS consumers in a ColdBrew service via workers.Worker, with graceful drain on shutdown and built-in tracing"
---
# Messaging
Expand Down
156 changes: 156 additions & 0 deletions howto/server-sent-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
---
layout: default
title: "Server-Sent Events (SSE)"
parent: "How To"
nav_order: 22
description: "Server-Sent Events over the ColdBrew HTTP gateway — browser-consumable streaming for AI/LLM tokens, progress feeds, and live updates with built-in EventSource support and per-token cancellation"
---
# Server-Sent Events (SSE)

## Table of contents
{: .no_toc .text-delta }

1. TOC
{:toc}

---

ColdBrew exposes every server-streaming gRPC method as Server-Sent Events for free. A browser `EventSource(...)` can consume any `rpc Foo(Req) returns (stream Resp)` endpoint directly — no per-service wiring, no proto changes, no custom HTTP handler. This is the path of least resistance for AI/LLM token streams, progress feeds, change notifications, and any other server → client push that benefits from staying on plain HTTP.

The marshaler is registered by default. There is nothing to import in your service code. Clients pick SSE by sending `Accept: text/event-stream`; everything else continues to receive newline-delimited JSON as before.

## When to use SSE

| Use SSE for | Use something else for |
|---|---|
| AI/LLM token-by-token streaming | One-shot responses (unary RPC) |
| Server → client push (notifications, live counters, progress) | Bidirectional or high-frequency client → server messaging (WebSocket) |
| Browser clients you don't want to ship a gRPC-web library for | Server-to-server streams (use native gRPC) |
| Anything where a `curl -N` or `EventSource(...)` consumer is good enough | Binary streams (use the proto/protobuf gateway marshaler) |

If you need true bidi over HTTP, SSE is the wrong primitive — register a WebSocket handler via [HTTP Gateway Extensions](/howto/gateway-extensions). For server-only push, prefer SSE: it reuses your existing gRPC stream + gateway plumbing instead of doubling the surface area.

## Wire format

Each streamed gRPC message becomes one SSE frame:

```
data: {"result":{"token":"hello","index":0}}

data: {"result":{"token":"world","index":1}}

```

Two newlines (`\n\n`) terminate a frame, matching the SSE spec. The JSON payload uses protojson (same field naming and well-known-type handling as the gateway's default `application/json` responses).

{: .note }
grpc-gateway wraps server-streaming responses in `{"result": <message>}` over HTTP — this is the documented gateway convention, not an SSE artifact. Native gRPC clients still see the unwrapped message. If you need full control over the wire bytes (no `"result"` wrapper, custom `event:`/`id:` fields), use `google.api.HttpBody` as the stream's response type and marshal the SSE frame yourself in the handler.

## Defining a streaming endpoint

A streaming method is just a `stream` response in your `.proto`. Nothing changes for SSE specifically:

```protobuf
rpc StreamTokens(StreamTokensRequest) returns (stream Token) {
option (google.api.http) = {
post: "/api/v1/stream/tokens"
body: "*"
};
}

message Token {
string text = 1;
int32 index = 2;
}
```

Implement using `grpc.ServerStreamingServer[Token]`:

```go
func (s *svc) StreamTokens(req *proto.StreamTokensRequest, stream grpc.ServerStreamingServer[proto.Token]) error {
ctx := stream.Context()
start := time.Now()

for i, tok := range produce(ctx, req) {
// Stop generating (and stop paying for) tokens when the client disconnects.
// ctx.Err() goes non-nil when the HTTP connection drops — this is the
// load-bearing safety property for AI/LLM workloads.
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "stream canceled")
}
if err := stream.Send(&proto.Token{Text: tok, Index: int32(i)}); err != nil {
return errors.Wrap(err, "stream send")
}
if i == 0 {
metrics.ObserveTTFT(time.Since(start))
}
}
return nil
}
```

Two things matter for production:

1. **Check `stream.Context().Err()` before every `Send`.** A browser tab closing cancels the HTTP context, which cancels the gRPC stream context. Pass the same context into your LLM SDK call so cancellation propagates to the upstream provider — otherwise the model keeps generating (and billing) after the user is gone.
2. **Record time-to-first-token (TTFT) as a distinct metric.** Total stream duration mixes upstream latency with generation throughput. Separating TTFT surfaces which one is degrading. Record it once per stream, on the first successful `Send`.

## Calling a streaming endpoint

### Browser (`EventSource`)

```javascript
const events = new EventSource("/api/v1/stream/tokens");
events.onmessage = (e) => {
const frame = JSON.parse(e.data);
console.log(frame.result.text); // unwrap the gateway envelope
};
events.onerror = () => events.close();
```

`EventSource` is the standard browser API — auto-reconnects on transient network failures, available in every modern browser, no dependencies. Note: it always sends `GET`. For `POST` streams (most non-trivial endpoints), use `fetch(..., { method: "POST" })` and a streaming response reader, or use a small library like [@microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source) that handles POST + SSE parsing.

### curl

```console
$ # Default — newline-delimited JSON:
$ curl -N -X POST -H 'Content-Type: application/json' \
-d '{"msg":"hello world"}' \
http://localhost:9091/api/v1/stream/tokens

$ # SSE — request text/event-stream:
$ curl -N -X POST -H 'Content-Type: application/json' -H 'Accept: text/event-stream' \
-d '{"msg":"hello world"}' \
http://localhost:9091/api/v1/stream/tokens
```

`-N` (no-buffer) is required — without it, curl will hold the response until the stream completes.

### Native gRPC

The same method is reachable as a native gRPC server-streaming call. SSE is purely a gateway concern; gRPC clients see plain proto-encoded `Token` messages with no `{"result": ...}` wrapping.

## Disabling or replacing the default marshaler

Two opt-out paths:

| Goal | How |
|---|---|
| Turn off SSE entirely (force JSON for all streams) | Set `DISABLE_SSE_MARSHALER=true` |
| Keep SSE but customize the framing | Register your own `text/event-stream` marshaler from `PreStart` — see [HTTP Gateway Extensions](/howto/gateway-extensions). Service-registered marshalers win over ColdBrew's defaults on the same MIME |

A custom marshaler is the right answer when you need richer SSE features (named events via `event:`, IDs for client-side dedup via `id:`, multi-line `data:` fields). Embed `core.SSEMarshaler` and override `Marshal` to add the extra fields.

## Common pitfalls

- **Compression buffers SSE.** gzip/zstd over an event stream stalls frame delivery because compressors hold bytes until they have enough to flush. ColdBrew's HTTP compression wrapper automatically excludes `text/event-stream`, so this Just Works — but if you put a reverse proxy in front (Nginx, Cloudflare, an in-house CDN) and it re-applies compression, you'll see stalls. Send `Content-Encoding: identity` or configure the proxy to skip SSE.
- **Reverse proxies also buffer responses.** Nginx in particular holds chunks until ~4KB by default. Set `X-Accel-Buffering: no` on the response, or `proxy_buffering off;` for the upstream block. Cloudflare typically passes SSE through but check your zone settings.
- **`EventSource` can only `GET`.** If your RPC's HTTP annotation is `post`, use `fetch` with a streaming reader, or [microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source). The streaming format on the wire is identical.
- **Mid-stream errors render in the trailers.** gRPC stream errors arrive after the last `Send`, encoded as a final SSE frame (or as HTTP trailers, depending on gateway config). For nicer client behavior — explicit error events the JS side can handle — use `runtime.WithStreamErrorHandler` to control the format.
- **The `{"result": ...}` wrapper is gateway-imposed.** It applies to every streaming RPC over HTTP, not just SSE. Either unwrap on the client (`JSON.parse(e.data).result`) or use `google.api.HttpBody` as the response type for full control.

## Related

- [Streaming RPCs](/howto/streaming-rpcs/) — Proto definitions, handler patterns, deadline propagation, and the gateway's behavior for every gRPC method shape.
- [HTTP Gateway Extensions](/howto/gateway-extensions/) — Registering custom marshalers, error handlers, middleware, and additional routes on the gateway.
- [Configuration Reference](/config-reference/) — `DISABLE_SSE_MARSHALER` and related HTTP gateway options.
- [Metrics](/howto/Metrics/) — Where to surface TTFT and per-stream counters alongside ColdBrew's default Prometheus metrics.
Comment on lines +151 to +156
6 changes: 4 additions & 2 deletions howto/streaming-rpcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,23 @@ grpc-gateway translates streaming RPCs to HTTP, but the four shapes don't map cl
| gRPC shape | HTTP behaviour through the gateway |
|---|---|
| Unary | Standard request/response. |
| Server-streaming | Newline-delimited JSON (NDJSON) over a chunked HTTP response. Clients read line-by-line. |
| Server-streaming | Newline-delimited JSON (NDJSON) over a chunked HTTP response by default, or Server-Sent Events when the client sends `Accept: text/event-stream`. |
| Client-streaming | Limited — the gateway buffers and forwards as a unary gRPC stream. Don't rely on this for large or long-lived uploads. |
| Bidirectional | Limited — no true concurrent interleaving over HTTP/1.1. Avoid for HTTP clients. |

Practical things to know:

- **Server-Sent Events work out of the box.** ColdBrew registers a `text/event-stream` marshaler by default so any server-streaming RPC is consumable by a browser `EventSource(...)` without extra code. See [Server-Sent Events](/howto/server-sent-events/) for the framing, opt-out, and AI/LLM patterns; set `DISABLE_SSE_MARSHALER=true` to suppress.
- **Reverse proxies buffer streamed responses.** Nginx, Cloudflare, and similar will hold chunks until they have "enough." Set `X-Accel-Buffering: no` on the response (or the upstream config) to disable buffering when you actually need server-streaming over HTTP.
- **Stream errors need a handler.** Use `runtime.WithStreamErrorHandler` when registering the gateway to control how mid-stream errors render in the HTTP response. The default trailers-only behaviour is awkward to consume from a JSON client.
- **Native HTTP alternatives.** If your HTTP clients need true bidirectional or high-frequency push, consider a separate WebSocket or Server-Sent Events endpoint registered via [HTTP Gateway Extensions](/howto/gateway-extensions). Keep the gRPC stream as the canonical implementation and have the WebSocket handler delegate to it.
- **Bidirectional or push-style HTTP.** True bidi over HTTP/1.1 isn't viable through the gateway. For server → client push, prefer Server-Sent Events (above) over a separate WebSocket endpoint — it reuses the existing gRPC stream + gateway plumbing. WebSockets are still the right answer when you need client → server messages on the same connection; see [HTTP Gateway Extensions](/howto/gateway-extensions) for the registration recipe.

See the [grpc-gateway streaming examples](https://github.com/grpc-ecosystem/grpc-gateway/tree/main/examples/internal/proto/examplepb) for the full HTTP semantics.

## Related

- [APIs how-to](/howto/APIs) — Defining gRPC and HTTP endpoints from proto.
- [Interceptors](/howto/interceptors) — The stream interceptor chain and how to add your own.
- [Server-Sent Events](/howto/server-sent-events/) — Browser-consumable streams over the gateway, ideal for AI/LLM token streaming.
- [HTTP Gateway Extensions](/howto/gateway-extensions) — Custom marshalers and routes when the gateway alone isn't enough.
- [Tracing](/howto/Tracing) — Trace IDs propagate through stream contexts the same way they do for unary.
7 changes: 4 additions & 3 deletions tests/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export const allHowtoPages = [
"/howto/auth/", // 19
"/howto/gateway-extensions/", // 20
"/howto/streaming-rpcs/", // 21
"/howto/database/", // 22
"/howto/cache/", // 23
"/howto/messaging/", // 24
"/howto/server-sent-events/", // 22
"/howto/database/", // 23
"/howto/cache/", // 24
"/howto/messaging/", // 25
];
Loading