Skip to content

Commit bd269b0

Browse files
authored
added websocket and publish new_msg (#19)
1 parent ee3e20b commit bd269b0

10 files changed

Lines changed: 706 additions & 50 deletions

File tree

README.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,13 @@ The server starts on port `8000` by default. Override with `FMSG_API_PORT`.
126126
The HTTP server is configured with `ReadHeaderTimeout: 10s`, `WriteTimeout: 65s`,
127127
and `IdleTimeout: 120s`. The write timeout exceeds the `/wait` endpoint's
128128
maximum long-poll duration (60 s) so connections are not dropped prematurely.
129+
These timeouts do not apply to `/fmsg/ws` connections: once upgraded, a
130+
WebSocket connection is hijacked from the HTTP server and kept alive by its own
131+
ping/pong heartbeat.
129132

130133
## API Routes
131134

132-
All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer <token>` header.
135+
All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer <token>` header. The one exception is the WebSocket route `/fmsg/ws`, which additionally accepts the token via an `access_token` query parameter (browsers cannot set headers on a WebSocket).
133136

134137
Rate limiting is enforced at the host level (e.g. `nftables`) rather than in
135138
the application.
@@ -139,6 +142,7 @@ the application.
139142
| `GET` | `/fmsg` | List messages for user |
140143
| `GET` | `/fmsg/sent` | List authored messages (sent + drafts) |
141144
| `GET` | `/fmsg/wait` | Long-poll for new messages |
145+
| `GET` | `/fmsg/ws` | WebSocket for pushed event notifications |
142146
| `POST` | `/fmsg` | Create a draft message |
143147
| `GET` | `/fmsg/:id` | Retrieve a message |
144148
| `PUT` | `/fmsg/:id` | Update a draft message |
@@ -189,6 +193,45 @@ loop:
189193
# on 204 or transient error: loop immediately (with brief back-off on error)
190194
```
191195

196+
### GET `/fmsg/ws`
197+
198+
Upgrades the connection to a WebSocket over which the server pushes events that
199+
pertain to the authenticated user. Intended for always-connected clients
200+
(browsers, desktop apps) as a more scalable alternative to long-polling
201+
`/fmsg/wait`: a single shared PostgreSQL listener fans events out to all
202+
connected clients, so the number of connections does not consume database
203+
connection-pool capacity.
204+
205+
**Authentication:** the JWT is verified exactly as for the REST API. Supply it
206+
either as an `Authorization: Bearer <token>` header (non-browser clients) or as
207+
an `access_token` query parameter (browsers, which cannot set headers on a
208+
WebSocket). The handshake fails with `401`/`400`/`403`/`503` — the same statuses
209+
as the REST middleware — before the connection is upgraded.
210+
211+
**Events:** every frame is a JSON envelope with a `type` discriminator so new
212+
event types can be added without breaking clients:
213+
214+
```json
215+
{ "type": "new_msg", "data": { ...message... } }
216+
```
217+
218+
| `type` | `data` | Sent when |
219+
| ---------- | ------ | --------- |
220+
| `new_msg` | A message object, same shape as an item in the `GET /fmsg` list response (includes `id`). | A new message arrives for the authenticated user. |
221+
222+
A client only ever receives events for messages it is a participant on. The
223+
server sends periodic WebSocket pings; clients should respond with pongs (most
224+
WebSocket libraries do this automatically) to keep the connection alive.
225+
226+
**Browser example:**
227+
```js
228+
const ws = new WebSocket(`wss://api.example.com/fmsg/ws?access_token=${jwt}`);
229+
ws.onmessage = (e) => {
230+
const event = JSON.parse(e.data);
231+
if (event.type === "new_msg") displayMessage(event.data);
232+
};
233+
```
234+
192235
### GET `/fmsg`
193236

194237
Returns messages where the authenticated user is a recipient (listed in `msg_to` or `msg_add_to`), ordered by message ID descending.

src/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ require (
66
github.com/MicahParks/keyfunc/v3 v3.8.0
77
github.com/gin-gonic/gin v1.12.0
88
github.com/golang-jwt/jwt/v5 v5.3.1
9+
github.com/gorilla/websocket v1.5.3
910
github.com/jackc/pgx/v5 v5.8.0
1011
github.com/joho/godotenv v1.5.1
12+
golang.org/x/sync v0.20.0
1113
)
1214

1315
require (
@@ -41,7 +43,6 @@ require (
4143
golang.org/x/arch v0.22.0 // indirect
4244
golang.org/x/crypto v0.48.0 // indirect
4345
golang.org/x/net v0.51.0 // indirect
44-
golang.org/x/sync v0.20.0 // indirect
4546
golang.org/x/sys v0.41.0 // indirect
4647
golang.org/x/text v0.34.0 // indirect
4748
golang.org/x/time v0.15.0 // indirect

src/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArs
3636
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
3737
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
3838
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
39+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
40+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
3941
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
4042
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
4143
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -93,8 +95,6 @@ golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
9395
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
9496
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
9597
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
96-
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
97-
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
9898
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
9999
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
100100
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

src/handlers/hub.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package handlers
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"strconv"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/jackc/pgx/v5"
13+
)
14+
15+
// Event type discriminators for the WebSocket envelope. Adding a new event
16+
// type means adding a constant here and a producer that dispatches it.
17+
const (
18+
eventNewMsg = "new_msg"
19+
)
20+
21+
// wsEnvelope is the JSON shape of every frame pushed over a WebSocket. The
22+
// Type field lets clients route events; Data carries the event-specific body.
23+
type wsEnvelope struct {
24+
Type string `json:"type"`
25+
Data interface{} `json:"data"`
26+
}
27+
28+
// Hub maintains the set of connected WebSocket clients and fans out database
29+
// notifications to the clients they pertain to. A single dedicated PostgreSQL
30+
// connection LISTENs on new_msg_to for the whole process, so the number of
31+
// connected clients does not affect the size of the database connection pool.
32+
type Hub struct {
33+
// buildItem produces the message payload pushed for a notification. It is
34+
// a field (rather than a *MessageHandler call) so dispatch can be unit
35+
// tested without a database.
36+
buildItem func(ctx context.Context, msgID int64, recipient string) (*messageListItem, error)
37+
38+
mu sync.RWMutex
39+
// registry maps a lower-cased user address to the set of that user's
40+
// currently connected clients (a user may have several connections).
41+
registry map[string]map[*wsClient]struct{}
42+
}
43+
44+
// NewHub creates a Hub that builds pushed message payloads via msgs.
45+
func NewHub(msgs *MessageHandler) *Hub {
46+
return &Hub{
47+
buildItem: msgs.messageItemFor,
48+
registry: make(map[string]map[*wsClient]struct{}),
49+
}
50+
}
51+
52+
// Register adds a client to the registry under its authenticated address.
53+
func (h *Hub) Register(c *wsClient) {
54+
h.mu.Lock()
55+
defer h.mu.Unlock()
56+
set := h.registry[c.addr]
57+
if set == nil {
58+
set = make(map[*wsClient]struct{})
59+
h.registry[c.addr] = set
60+
}
61+
set[c] = struct{}{}
62+
}
63+
64+
// Unregister removes a client from the registry.
65+
func (h *Hub) Unregister(c *wsClient) {
66+
h.mu.Lock()
67+
defer h.mu.Unlock()
68+
set := h.registry[c.addr]
69+
if set == nil {
70+
return
71+
}
72+
delete(set, c)
73+
if len(set) == 0 {
74+
delete(h.registry, c.addr)
75+
}
76+
}
77+
78+
// Run owns the dedicated listener connection. It blocks until ctx is cancelled,
79+
// reconnecting with capped exponential backoff if the connection drops.
80+
func (h *Hub) Run(ctx context.Context) {
81+
const maxBackoff = 30 * time.Second
82+
backoff := time.Second
83+
for ctx.Err() == nil {
84+
err := h.listen(ctx, func() { backoff = time.Second })
85+
if ctx.Err() != nil {
86+
return
87+
}
88+
log.Printf("ws hub: listener stopped (%v); reconnecting in %s", err, backoff)
89+
select {
90+
case <-ctx.Done():
91+
return
92+
case <-time.After(backoff):
93+
}
94+
if backoff < maxBackoff {
95+
backoff *= 2
96+
}
97+
}
98+
}
99+
100+
// listen opens a dedicated connection, LISTENs on new_msg_to, and dispatches
101+
// every notification until the connection fails or ctx is cancelled. onConnected
102+
// is invoked once the LISTEN has succeeded so the caller can reset its backoff.
103+
func (h *Hub) listen(ctx context.Context, onConnected func()) error {
104+
// An empty connection string makes pgx read the standard PG* environment
105+
// variables, exactly as the pgxpool in db.New does.
106+
conn, err := pgx.Connect(ctx, "")
107+
if err != nil {
108+
return err
109+
}
110+
defer conn.Close(context.Background())
111+
112+
if _, err := conn.Exec(ctx, "LISTEN new_msg_to"); err != nil {
113+
return err
114+
}
115+
log.Println("ws hub: listening on new_msg_to")
116+
onConnected()
117+
118+
for {
119+
n, err := conn.WaitForNotification(ctx)
120+
if err != nil {
121+
return err
122+
}
123+
msgID, addr, ok := parseNotifyPayload(n.Payload)
124+
if !ok {
125+
log.Printf("ws hub: ignoring malformed notification payload %q", n.Payload)
126+
continue
127+
}
128+
h.dispatch(ctx, msgID, addr)
129+
}
130+
}
131+
132+
// parseNotifyPayload parses a new_msg_to payload of the form "msgID,addr".
133+
func parseNotifyPayload(payload string) (msgID int64, addr string, ok bool) {
134+
comma := strings.IndexByte(payload, ',')
135+
if comma < 0 {
136+
return 0, "", false
137+
}
138+
id, err := strconv.ParseInt(payload[:comma], 10, 64)
139+
if err != nil {
140+
return 0, "", false
141+
}
142+
addr = payload[comma+1:]
143+
if addr == "" {
144+
return 0, "", false
145+
}
146+
return id, addr, true
147+
}
148+
149+
// dispatch pushes message msgID to every client connected as addr. The message
150+
// is fetched and marshalled only when at least one such client is connected, so
151+
// notifications for addresses with no live WebSocket cost nothing beyond a map
152+
// lookup. addr originates from a msg_to/msg_add_to row, so any client connected
153+
// as addr is by definition a participant of the message.
154+
func (h *Hub) dispatch(ctx context.Context, msgID int64, addr string) {
155+
h.mu.RLock()
156+
set := h.registry[strings.ToLower(addr)]
157+
clients := make([]*wsClient, 0, len(set))
158+
for c := range set {
159+
clients = append(clients, c)
160+
}
161+
h.mu.RUnlock()
162+
if len(clients) == 0 {
163+
return
164+
}
165+
166+
item, err := h.buildItem(ctx, msgID, addr)
167+
if err != nil {
168+
log.Printf("ws hub: build message %d for %s: %v", msgID, addr, err)
169+
return
170+
}
171+
payload, err := json.Marshal(wsEnvelope{Type: eventNewMsg, Data: item})
172+
if err != nil {
173+
log.Printf("ws hub: marshal message %d: %v", msgID, err)
174+
return
175+
}
176+
177+
for _, c := range clients {
178+
select {
179+
case c.send <- payload:
180+
default:
181+
// Slow client: drop the connection rather than stall the
182+
// shared fan-out for every other client.
183+
log.Printf("ws hub: client %s send buffer full, closing", c.addr)
184+
c.close()
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)