Skip to content

Commit a1ccb74

Browse files
Merge pull request #583 from DefangLabs/jordan/rename-long-sample-names
Shortening some long sample names
0 parents  commit a1ccb74

11 files changed

Lines changed: 638 additions & 0 deletions

File tree

.devcontainer/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
FROM mcr.microsoft.com/devcontainers/python:3.12-bookworm

.devcontainer/devcontainer.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"build": {
3+
"dockerfile": "Dockerfile",
4+
"context": ".."
5+
},
6+
"features": {
7+
"ghcr.io/defanglabs/devcontainer-feature/defang-cli:1.0.4": {},
8+
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
9+
"ghcr.io/devcontainers/features/aws-cli:1": {}
10+
}
11+
}

.github/workflows/deploy.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
name: Deploy
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
8+
jobs:
9+
deploy:
10+
environment: playground
11+
runs-on: ubuntu-latest
12+
permissions:
13+
contents: read
14+
id-token: write
15+
16+
steps:
17+
- name: Checkout Repo
18+
uses: actions/checkout@v4
19+
20+
- name: Deploy
21+
uses: DefangLabs/defang-github-action@v1.1.3
22+
with:
23+
config-env-vars: POSTGRES_PASSWORD SSL_MODE
24+
env:
25+
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
26+
SSL_MODE: ${{ secrets.SSL_MODE }}

README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# FastAPI Postgres Pub/Sub Chat
2+
3+
[![1-click-deploy](https://raw.githubusercontent.com/DefangLabs/defang-assets/main/Logos/Buttons/SVG/deploy-with-defang.svg)](https://portal.defang.dev/redirect?url=https%3A%2F%2Fgithub.com%2Fnew%3Ftemplate_name%3Dsample-fastapi-postgres-pubsub-template%26template_owner%3DDefangSamples)
4+
5+
This sample pairs FastAPI with PostgreSQL `LISTEN/NOTIFY` to demonstrate real-time updates between two application containers. A minimal chat UI sends messages with a REST request, and both FastAPI instances broadcast the new message over WebSockets after Postgres notifies them.
6+
7+
## Prerequisites
8+
9+
1. Download [Defang CLI](https://github.com/DefangLabs/defang)
10+
2. (Optional) If you are using [Defang BYOC](https://docs.defang.io/docs/concepts/defang-byoc) authenticate with your cloud provider account
11+
3. (Optional for local development) [Docker CLI](https://docs.docker.com/engine/install/)
12+
13+
## Development
14+
15+
To run the application locally, you can use the following command:
16+
17+
```bash
18+
docker compose -f compose.dev.yaml up --build
19+
```
20+
21+
Once everything is running:
22+
- Visit [http://localhost:8000](http://localhost:8000) for the first FastAPI service.
23+
- Visit [http://localhost:8001](http://localhost:8001) for the second service.
24+
- Send a chat message in either window. Both pages should update immediately, proving Postgres `LISTEN/NOTIFY` fans the event across containers.
25+
26+
Stop the stack with `Ctrl+C`, then run `docker compose -f compose.dev.yaml down`.
27+
28+
## Configuration
29+
30+
For this sample, you can rely on the defaults. Override them with environment variables if needed:
31+
32+
> Note that if you are using the 1-click deploy option, you can set these values as secrets in your GitHub repository and the action will automatically deploy them for you.
33+
34+
### `POSTGRES_PASSWORD`
35+
Database password (default `chat_password`).
36+
```bash
37+
defang config set POSTGRES_PASSWORD --random
38+
```
39+
40+
### `SSL_MODE`
41+
42+
Postgres SSL mode (default `disable`, should set to `require` in production).
43+
```bash
44+
defang config set SSL_MODE=require
45+
```
46+
47+
## Deployment
48+
49+
> [!NOTE]
50+
> Download [Defang CLI](https://github.com/DefangLabs/defang)
51+
52+
### Defang Playground
53+
54+
Deploy your application to the Defang Playground by opening up your terminal and typing:
55+
```bash
56+
defang compose up
57+
```
58+
59+
### BYOC
60+
61+
If you want to deploy to your own cloud account, you can [use Defang BYOC](https://docs.defang.io/docs/tutorials/deploy-to-your-cloud).
62+
63+
---
64+
65+
Title: FastAPI Postgres Pub/Sub
66+
67+
Short Description: FastAPI sample that stores messages in Postgres and streams them to two app instances via LISTEN/NOTIFY.
68+
69+
Tags: FastAPI, PostgreSQL, WebSockets, PubSub
70+
71+
Languages: Python, SQL

app/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.venv
2+
.env
3+

app/Dockerfile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM python:3.11-slim
2+
3+
ENV PYTHONUNBUFFERED=1
4+
ENV PYTHONDONTWRITEBYTECODE=1
5+
6+
WORKDIR /app
7+
8+
RUN apt-get update \
9+
&& apt-get install -y --no-install-recommends build-essential libpq-dev curl \
10+
&& rm -rf /var/lib/apt/lists/*
11+
12+
COPY requirements.txt ./
13+
RUN pip install --no-cache-dir --upgrade pip \
14+
&& pip install --no-cache-dir -r requirements.txt
15+
16+
COPY . .
17+
18+
EXPOSE 8000
19+
20+
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

app/main.py

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
"""Simplest possible FastAPI chat demo using PostgreSQL LISTEN/NOTIFY.
2+
3+
The goal of this file is to show the basic flow end to end:
4+
5+
1. A user submits a chat message with ``POST /messages``.
6+
2. The API stores the message in PostgreSQL and issues ``pg_notify``.
7+
3. Every running API instance listens for that notification and forwards it
8+
to connected WebSocket clients.
9+
4. The browser keeps a WebSocket open so it can display new messages instantly.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import json
16+
import logging
17+
import os
18+
from contextlib import asynccontextmanager
19+
from datetime import datetime, timezone
20+
from pathlib import Path
21+
from typing import Any
22+
23+
import asyncpg
24+
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
25+
from fastapi.responses import HTMLResponse
26+
from pydantic import BaseModel, Field
27+
28+
logger = logging.getLogger("chat")
29+
logging.basicConfig(level=logging.INFO)
30+
31+
CHAT_CHANNEL = "chat_messages"
32+
MIGRATION_LOCK_ID = 101
33+
TEMPLATE_PATH = Path(__file__).resolve().parent / "templates" / "index.html"
34+
35+
36+
class MessageCreate(BaseModel):
37+
"""Request body for ``POST /messages``."""
38+
39+
message: str = Field(..., min_length=1, max_length=500)
40+
41+
42+
class WebSocketRegistry:
43+
"""Keeps track of the browsers that are connected via WebSocket."""
44+
45+
def __init__(self) -> None:
46+
self._sockets: set[WebSocket] = set()
47+
self._lock = asyncio.Lock()
48+
49+
async def add(self, socket: WebSocket) -> None:
50+
async with self._lock:
51+
self._sockets.add(socket)
52+
53+
async def remove(self, socket: WebSocket) -> None:
54+
async with self._lock:
55+
self._sockets.discard(socket)
56+
57+
async def broadcast(self, payload: dict[str, Any]) -> None:
58+
async with self._lock:
59+
sockets = list(self._sockets)
60+
for socket in sockets:
61+
try:
62+
await socket.send_json(payload)
63+
except Exception:
64+
await self.remove(socket)
65+
66+
67+
def build_database_dsn() -> str:
68+
"""Build a PostgreSQL connection string from environment variables."""
69+
70+
url = os.getenv("DATABASE_URL")
71+
if url:
72+
return url
73+
raise ValueError("DATABASE_URL environment variable is not set")
74+
75+
async def create_tables(pool: asyncpg.Pool) -> None:
76+
"""Make sure the ``messages`` table exists."""
77+
78+
async with pool.acquire() as conn:
79+
await conn.execute("SELECT pg_advisory_lock($1)", MIGRATION_LOCK_ID)
80+
try:
81+
await conn.execute(
82+
"""
83+
CREATE TABLE IF NOT EXISTS messages (
84+
id BIGSERIAL PRIMARY KEY,
85+
message TEXT NOT NULL,
86+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
87+
);
88+
"""
89+
)
90+
finally:
91+
await conn.execute("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
92+
93+
94+
async def fetch_recent_messages(pool: asyncpg.Pool, limit: int = 50) -> list[dict[str, Any]]:
95+
"""Return the most recent chat messages."""
96+
97+
async with pool.acquire() as conn:
98+
rows = await conn.fetch(
99+
"""
100+
SELECT id, message, created_at
101+
FROM messages
102+
ORDER BY created_at ASC
103+
LIMIT $1
104+
""",
105+
limit,
106+
)
107+
return [
108+
{
109+
"id": row["id"],
110+
"message": row["message"],
111+
"created_at": row["created_at"].isoformat(),
112+
}
113+
for row in rows
114+
]
115+
116+
117+
async def save_message(pool: asyncpg.Pool, text: str) -> dict[str, Any]:
118+
"""Insert a message and notify other app instances about it."""
119+
120+
message = text.strip()
121+
if not message:
122+
raise HTTPException(status_code=400, detail="Message must not be empty")
123+
124+
async with pool.acquire() as conn:
125+
async with conn.transaction():
126+
record = await conn.fetchrow(
127+
"""
128+
INSERT INTO messages (message)
129+
VALUES ($1)
130+
RETURNING id, message, created_at
131+
""",
132+
message,
133+
)
134+
body = {
135+
"type": "message",
136+
"data": {
137+
"id": record["id"],
138+
"message": record["message"],
139+
"created_at": record["created_at"].isoformat(),
140+
},
141+
}
142+
await conn.execute("SELECT pg_notify($1, $2)", CHAT_CHANNEL, json.dumps(body))
143+
return body["data"]
144+
145+
146+
async def forward_notification(payload: str) -> None:
147+
"""Handle the JSON payload coming from PostgreSQL."""
148+
149+
try:
150+
data = json.loads(payload)
151+
except json.JSONDecodeError:
152+
logger.warning("Ignoring unexpected payload: %s", payload)
153+
return
154+
await app.state.connections.broadcast(data)
155+
156+
157+
async def notification_listener(stop_event: asyncio.Event) -> None:
158+
"""Listen for ``pg_notify`` events until the app shuts down."""
159+
160+
loop = asyncio.get_running_loop()
161+
database_dsn = build_database_dsn()
162+
conn = await asyncpg.connect(dsn=database_dsn)
163+
164+
def _listener(_connection: asyncpg.Connection, _pid: int, _channel: str, payload: str) -> None:
165+
loop.create_task(forward_notification(payload))
166+
167+
await conn.add_listener(CHAT_CHANNEL, _listener)
168+
try:
169+
await stop_event.wait()
170+
finally:
171+
await conn.remove_listener(CHAT_CHANNEL, _listener)
172+
await conn.close()
173+
174+
175+
@asynccontextmanager
176+
async def lifespan(app: FastAPI):
177+
"""Manage application lifespan events."""
178+
# Startup
179+
database_dsn = build_database_dsn()
180+
pool = await asyncpg.create_pool(dsn=database_dsn, min_size=1, max_size=5)
181+
app.state.db_pool = pool
182+
await create_tables(pool)
183+
184+
stop_event = asyncio.Event()
185+
app.state.listener_stop = stop_event
186+
app.state.listener_task = asyncio.create_task(notification_listener(stop_event))
187+
188+
yield
189+
190+
# Shutdown
191+
stop_event = app.state.listener_stop
192+
if stop_event:
193+
stop_event.set()
194+
listener_task = app.state.listener_task
195+
if listener_task:
196+
await listener_task
197+
198+
pool = app.state.db_pool
199+
if pool:
200+
await pool.close()
201+
202+
203+
app = FastAPI(title="FastAPI Postgres Pub/Sub Chat", lifespan=lifespan)
204+
app.state.connections = WebSocketRegistry()
205+
app.state.listener_stop: asyncio.Event | None = None
206+
app.state.listener_task: asyncio.Task | None = None
207+
app.state.db_pool: asyncpg.Pool | None = None
208+
209+
210+
def get_pool() -> asyncpg.Pool:
211+
pool = app.state.db_pool
212+
if pool is None:
213+
raise HTTPException(status_code=500, detail="Database connection not ready")
214+
return pool
215+
216+
217+
218+
219+
@app.get("/", response_class=HTMLResponse)
220+
async def index() -> HTMLResponse:
221+
if not TEMPLATE_PATH.exists():
222+
raise HTTPException(status_code=500, detail="Template not found")
223+
return HTMLResponse(TEMPLATE_PATH.read_text(encoding="utf-8"))
224+
225+
226+
@app.post("/messages")
227+
async def create_message(payload: MessageCreate) -> dict[str, Any]:
228+
pool = get_pool()
229+
message = await save_message(pool, payload.message)
230+
return message
231+
232+
233+
@app.websocket("/ws")
234+
async def websocket_endpoint(websocket: WebSocket) -> None:
235+
pool = get_pool()
236+
await websocket.accept()
237+
await app.state.connections.add(websocket)
238+
239+
try:
240+
history = await fetch_recent_messages(pool)
241+
await websocket.send_json({"type": "history", "messages": history})
242+
243+
while True:
244+
await websocket.receive_text()
245+
except WebSocketDisconnect:
246+
pass
247+
finally:
248+
await app.state.connections.remove(websocket)
249+
250+
251+
@app.get("/healthz")
252+
async def healthcheck() -> dict[str, str]:
253+
pool = get_pool()
254+
async with pool.acquire() as conn:
255+
await conn.execute("SELECT 1")
256+
return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}

0 commit comments

Comments
 (0)