Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1ca4677
smp server: messaging services (#1565)
epoberezkin Nov 7, 2025
3ccf854
servers: maintain xor-hash of all associated queue IDs in PostgreSQL …
epoberezkin Nov 25, 2025
5e9b164
agent: fail when per-connection transport isolation is used with serv…
epoberezkin Nov 25, 2025
38e8999
agent: service subscription events (#1671)
epoberezkin Nov 27, 2025
ff7bdbc
Merge branch 'master' into rcv-services
epoberezkin Dec 3, 2025
2ea9a9a
agent: finalize initial service subscriptions, remove associations on…
epoberezkin Dec 5, 2025
35fe5ac
Merge branch 'master' into rcv-services
epoberezkin Dec 5, 2025
8389407
Merge branch 'master' into rcv-services
epoberezkin Dec 13, 2025
f5eb735
servers: service stats and logging, allow services without option (re…
epoberezkin Dec 14, 2025
568500c
Merge branch 'master' into rcv-services
epoberezkin Dec 14, 2025
c8a7243
Merge branch 'master' into rcv-services
epoberezkin Dec 15, 2025
a1277bf
agent: remove service queue association when service ID changed, proc…
epoberezkin Dec 19, 2025
11ae20e
ntf server: use different client certs for each SMP server, remove su…
epoberezkin Dec 22, 2025
bafdbc1
smp protocol: fix encoding for SOKS/ENDS responses (#1683)
epoberezkin Dec 25, 2025
9e813c2
Merge branch 'master' into rcv-services
epoberezkin Dec 25, 2025
db4b27e
agent: create user with option to enable client service (#1684)
epoberezkin Dec 27, 2025
d908404
Merge branch 'master' into rcv-services
epoberezkin Jan 15, 2026
502d923
agent: minor fixes
epoberezkin Jan 17, 2026
ac825b0
Merge branch 'master' into rcv-services
epoberezkin Jan 24, 2026
84e8b72
docs: update protocol (#1705)
epoberezkin Jan 27, 2026
aebc01b
Merge branch 'master' into rcv-services
epoberezkin Mar 3, 2026
1d30579
Merge branch 'master' into rcv-services
epoberezkin Mar 4, 2026
8518f60
docs: agent threat model
evgeny-simplex Mar 7, 2026
c624a10
Merge branch 'master' into rcv-services
evgeny-simplex Mar 9, 2026
3a25561
Merge branch 'master' into rcv-services
epoberezkin Mar 9, 2026
3c57523
update protocol docs
evgeny-simplex Mar 9, 2026
583f4e0
update RFCs (#1730)
epoberezkin Mar 9, 2026
f745ce5
docs: fix minor issues in protocols
evgeny-simplex Mar 10, 2026
01785d5
docs: add e2e encrypted message wire encoding to PQDR spec
evgeny-simplex Mar 10, 2026
98351cf
docs: add missing encodings and other protocol corrections
evgeny-simplex Mar 10, 2026
b81670c
docs: move implemented rfcs
evgeny-simplex Mar 10, 2026
48eba59
Merge branch 'master' into rcv-services
epoberezkin Mar 12, 2026
8f42747
smp: service fixes (#1737)
epoberezkin Mar 20, 2026
1a255f2
Merge branch 'master' into rcv-services
epoberezkin Mar 20, 2026
5f08457
Merge branch 'master' into rcv-services
epoberezkin Mar 20, 2026
e762e84
prometheus: fix metrics names (#1747)
shumvgolove Mar 23, 2026
a54518a
test: rcv service re-association on restart (#1746)
evgeny-simplex Mar 23, 2026
2012236
agent: correct log message
evgeny-simplex Mar 23, 2026
909c974
docs: update whitepaper
epoberezkin Mar 26, 2026
c3a041a
Merge branch 'master' into rcv-services
epoberezkin Mar 26, 2026
3134d62
smp: fix messaging client service issues (#1751)
epoberezkin Mar 28, 2026
d930bba
Merge branch 'master' into rcv-services
evgeny-simplex Mar 28, 2026
7a3713f
Merge branch 'master' into rcv-services
epoberezkin Mar 31, 2026
0ebea15
agent: refactor cleanup if no pending subs (#1757)
epoberezkin Mar 31, 2026
bd01e78
Merge branch 'master' into rcv-services
evgeny-simplex Apr 1, 2026
fe30d69
smp server: batch processing of subscription messages (#1753)
epoberezkin Apr 1, 2026
99f9de7
Merge branch 'master' into rcv-services
epoberezkin Apr 7, 2026
3e68fa2
Merge branch 'master' into rcv-services
epoberezkin Apr 11, 2026
d2957ff
Merge branch 'master' into rcv-services
epoberezkin Apr 29, 2026
64d5413
Merge branch 'master' into rcv-services
epoberezkin May 3, 2026
ef3339a
Merge branch 'master' into rcv-services
epoberezkin May 8, 2026
8bd3193
smp: batch queue association updates on subscriptions (#1760)
epoberezkin May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions plans/20260328_01_server_batched_sub_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Server: batched SUB command processing

Implementation plan for Part 1 of [RFC 2026-03-28-subscription-performance](../rfcs/2026-03-28-subscription-performance.md).

## Current state

When a batch of ~135 SUB commands arrives, the server already batches:
- Queue record lookups (`getQueueRecs` in `receive`, Server.hs:1151)
- Command verification (`verifyLoadedQueue`, Server.hs:1152)

But command processing is per-command (`foldrM process` in `client`, Server.hs:1372-1375). Each SUB calls `subscribeQueueAndDeliver` which calls `tryPeekMsg` - one DB query per queue. For Postgres, that's ~135 individual `SELECT ... FROM messages WHERE recipient_id = ? ORDER BY message_id ASC LIMIT 1` queries per batch.

## Goal

Replace ~135 individual message peek queries with 1 batched query per batch. No protocol changes.

## Implementation

### Step 1: Add `tryPeekMsgs` to MsgStoreClass

File: `src/Simplex/Messaging/Server/MsgStore/Types.hs`

Add to `MsgStoreClass`:

```haskell
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
```

Returns a map from recipient ID to earliest pending message for each queue that has one. Queues with no messages are absent from the map.

### Step 2: Parameterize `deliver` to accept pre-fetched message

File: `src/Simplex/Messaging/Server.hs`

Currently `deliver` (inside `subscribeQueueAndDeliver`, line 1641) calls `tryPeekMsg ms q`. Add a parameter for an optional pre-fetched message:

```haskell
deliver :: Maybe Message -> (Bool, Maybe Sub) -> M s ResponseAndMessage
deliver prefetchedMsg (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- maybe (tryPeekMsg ms q) (pure . Just) prefetchedMsg
...
```

When `Nothing` is passed, falls back to individual `tryPeekMsg` (existing behavior). When `Just msg` is passed, uses it directly (batched path).

### Step 3: Pre-fetch messages before the processing loop

File: `src/Simplex/Messaging/Server.hs`

Currently (lines 1372-1375):

```haskell
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
>>= \(rs_, msgs) -> ...
```

Add a pre-fetch step before the existing loop:

```haskell
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
>>= \(rs_, msgs) -> ...
```

`prefetchMsgs` scans the batch, collects queues from SUB commands that have a verified queue (`q_ = Just (q, _)`), calls `tryPeekMsgs` once, returns the map. For batches with no SUBs it returns an empty map (no DB call).

`process` passes the looked-up message (or Nothing) through to `processCommand` and down to `deliver`.

The `foldrM process` loop, `processCommand`, `subscribeQueueAndDeliver`, and all other command handlers stay structurally the same. Only `deliver` gains one parameter, and the `client` loop gains one pre-fetch call.

### Step 4: Review

Review the typeclass signature and server usage. Confirm the interface has the right shape before implementing store backends.

### Step 5: Implement for each store backend

#### Postgres

File: `src/Simplex/Messaging/Server/MsgStore/Postgres.hs`

Single query using `DISTINCT ON`:

```sql
SELECT DISTINCT ON (recipient_id)
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id IN ?
ORDER BY recipient_id, message_id ASC
```

Build `Map RecipientId Message` from results.

#### STM

File: `src/Simplex/Messaging/Server/MsgStore/STM.hs`

Loop over queues, call `tryPeekMsg` for each, collect into map.

#### Journal

File: `src/Simplex/Messaging/Server/MsgStore/Journal.hs`

Loop over queues, call `tryPeekMsg` for each, collect into map.

### Step 6: Handle edge cases

1. **Mixed batches**: `prefetchMsgs` collects only SUB queues. Non-SUB commands get Nothing for the pre-fetched message and process unchanged.

2. **Already-subscribed queues**: Include in pre-fetch - `deliver` is called for re-SUBs too (delivers pending message).

3. **Service subscriptions**: The pre-fetch doesn't care about service state. `sharedSubscribeQueue` handles service association in STM; message peek is the same.

4. **Error queues**: Verification errors from `receive` are Left values in the batch. `prefetchMsgs` only looks at Right values with SUB commands.

5. **Empty pre-fetch**: If batch has no SUBs (e.g., all ACKs), `prefetchMsgs` returns empty map, no DB call made.

### Step 7: Batch other commands (future, not in scope)

The same pattern (pre-fetch before loop, parameterize handler) can extend to:
- `ACK` with `tryDelPeekMsg` - batch delete+peek
- `GET` with `tryPeekMsg` - same map lookup

Lower priority since these don't have the N-at-once pattern of subscriptions.

## File changes summary

| File | Change |
|---|---|
| `src/Simplex/Messaging/Server/MsgStore/Types.hs` | Add `tryPeekMsgs` to typeclass |
| `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` | Implement `tryPeekMsgs` with batch SQL |
| `src/Simplex/Messaging/Server/MsgStore/STM.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server.hs` | Add `prefetchMsgs`, parameterize `deliver` |

## Testing

1. Existing server tests must pass unchanged (correctness preserved).
2. Add a test that subscribes a batch of queues (some with pending messages, some without) and verifies all get correct SOK + MSG responses.
3. Prometheus metrics: existing `qSub` stat should still increment correctly.

## Performance expectation

For 300K queues across ~2200 batches:
- Before: ~300K individual DB queries
- After: ~2200 batched DB queries (one per batch of ~135)
- ~136x reduction in DB round-trips
126 changes: 126 additions & 0 deletions plans/20260401_01_batch_queue_associations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Server: batch queue service associations

When a batch of SUB or NSUB commands arrives from a service client, each command that needs a new or removed service association calls `setQueueService` individually - one DB write per command. For 135 commands per batch, that's 135 individual `UPDATE msg_queues` queries.

## Goal

Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated.

Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract.

## Contract

```haskell
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
```

`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error.

`Right map` = per-queue results as a tuple:
- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message
- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue.

One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`.

Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected.

## prepareBatch implementation

One accumulating fold over the batch, collecting three lists:
- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch
- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`)
- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`)

Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query.

Then three store calls (each skipped if its list is empty):
1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message`
2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId`
3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId`

Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`:
- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)`
- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)`

Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`.

If any of the three calls fails entirely, return `Left e`.

## Store interface

Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`:

```haskell
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
```

No `SParty p` polymorphism. Each function knows its column.

### Postgres implementation

`setRcvQueueServices`:
```sql
UPDATE msg_queues SET rcv_service_id = ?
WHERE recipient_id IN ? AND deleted_at IS NULL
RETURNING recipient_id
```

`setNtfQueueServices`:
```sql
UPDATE msg_queues SET ntf_service_id = ?
WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL
RETURNING recipient_id
```

After each batch query, for each queue in the returned set:
1. Read QueueRec TVar, update with new serviceId
2. Write store log entry

### STM implementation

Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set.

## Downstream changes in Server.hs

### processCommand

Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`.

SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`.

NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`.

Forwarded commands: pass `M.empty`.

### subscribeQueueAndDeliver

Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them.

### sharedSubscribeQueue

Takes `Maybe (Either ErrorType ())`. On paths needing association update:
- `Just (Left e)` -> return error
- `Just (Right ())` -> skip `setQueueService`, proceed with STM work
- `Nothing` -> no update needed, proceed with existing logic

## Implementation order (top-down)

1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs)
2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs)
3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs)
4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs)
5. Implement for STM as loop (STM.hs)
6. Implement for Journal as delegation (Journal.hs)

At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations.

## Files changed

| File | Change |
|---|---|
| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` |
| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` |
| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates |
| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store |
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# SMP server message storage

# SMP router message storage

## Problem

Currently SMP servers store all queues in server memory. As the traffic grows, so does the number of undelivered messages. What is worse, Haskell is not avoiding heap fragmentation when messages are allocated and then de-allocated - undelivered messages use ByteString and GC cannot move them around, as they use pinned memory.
Currently SMP routers store all queues in router memory. As the traffic grows, so does the number of undelivered messages. What is worse, Haskell is not avoiding heap fragmentation when messages are allocated and then de-allocated - undelivered messages use ByteString and GC cannot move them around, as they use pinned memory.

## Possible solutions

### Solution 1: solve only GC fragmentation problem

Move from ByteString to some other primitive to store messages in memory long term, e.g. ShortByteString, or manage allocation/de-allocation of stored messages manually in some other way.

Pros: the simplest solution that avoids substantial re-engineering of the server.
Pros: the simplest solution that avoids substantial re-engineering of the router.

Cons:
- not a long term solution, as memory growth still has limits.
Expand All @@ -22,12 +23,12 @@ Use files or RocksDB to store messages.

Pros:
- much lower memory usage.
- no message loss in case of abnormal server termination (important until clients have delivery redundancy).
- no message loss in case of abnormal router termination (important until clients have delivery redundancy).
- this is a long term solution, and at some point it might need to be done anyway.

Cons:
- substantial re-engineering costs and risks.
- metadata privacy. Currently we only save undelivered messages when server is restarted, with this approach all messages will be stored for some time. this argument is limited, as hosting providers of VMs can make memory snapshots too, on the other hand they are harder to analyze than files. On another hand, with this approach messages will be stored for a shorter time.
- metadata privacy. Currently we only save undelivered messages when router is restarted, with this approach all messages will be stored for some time. this argument is limited, as hosting providers of VMs can make memory snapshots too, on the other hand they are harder to analyze than files. On another hand, with this approach messages will be stored for a shorter time.

#### RocksDB and other key-value stores

Expand Down Expand Up @@ -67,7 +68,7 @@ queueLogLine =
%s"write_msg=" digits
```

When queue is first requested by the server:
When queue is first requested by the router:

```c
if queue folder exists:
Expand All @@ -87,7 +88,7 @@ nextReadMsg = read_msg
open write_file in AppendMode
```

When message is added to the queue (assumes that queue state is loaded to server memory, if not the previous section will be done first):
When message is added to the queue (assumes that queue state is loaded to router memory, if not the previous section will be done first):

```c
if write_msg > max_queue_messages:
Expand Down Expand Up @@ -128,7 +129,7 @@ else
nextReadByte = current position in file
```

When message delivery is acknowledged, the read queue needs to be advanced, and possibly switched to read from the current write_queue:
When message delivery is acknowledged, the read queue needs to be advanced, and possibly switched to read from the current write queue:

```c
if nextReadByte == read_byte:
Expand Down Expand Up @@ -162,9 +163,9 @@ Most Linux systems use EXT4 filesystem where the file lookup time scales linearl

So storing all queue folders in one folder won't scale.

To solve this problem we could use recipient queue ID in base64url format not as a folder name, but as a folder path, splitting it to path fragments of some length. The number of fragments can be configurable and migration to a different fragment size can be supported as the number of queues on a given server grows.
To solve this problem we could use recipient queue ID in base64url format not as a folder name, but as a folder path, splitting it to path fragments of some length. The number of fragments can be configurable and migration to a different fragment size can be supported as the number of queues on a given router grows.

Currently, queue ID is 24 bytes random number, thus allowing 2^192 possible queue IDs. If we assume that a server must hold 1b queues, it means that we have ~2^162 possible addresses for each existing queue. 24 bytes in base64 is 32 characters that can be split into say 8 fragments with 4 characters each, so that queue folder path for queue with ID `abcdefghijklmnopqrstuvwxyz012345` would be:
Currently, queue ID is 24 bytes random number, thus allowing 2^192 possible queue IDs. If we assume that a router must hold 1b queues, it means that we have ~2^162 possible addresses for each existing queue. 24 bytes in base64 is 32 characters that can be split into say 8 fragments with 4 characters each, so that queue folder path for queue with ID `abcdefghijklmnopqrstuvwxyz012345` would be:

`/var/opt/simplex/messages/abcd/efgh/ijkl/mnop/qrst/uvwx/yz01/2345`

Expand All @@ -174,6 +175,6 @@ So we could use an unequal split of path, two letters each and the last being lo

`/var/opt/simplex/messages/ab/cd/ef/ghijklmnopqrstuvwxyz012345`

The first three levels in this case can have 4096 subfolders each, and it gives 68b possible subfolders (64^2^3), so the last level will be sparse in case of 1b queues on the server. So we could make it 4 levels with 2 letters to never think about it, accounting for a large variance of the random numbers distribution:
The first three levels in this case can have 4096 subfolders each, and it gives 68b possible subfolders (64^2^3), so the last level will be sparse in case of 1b queues on the router. So we could make it 4 levels with 2 letters to never think about it, accounting for a large variance of the random numbers distribution:

`/var/opt/simplex/messages/ab/cd/ef/gh/ijklmnopqrstuvwxyz012345`
Loading
Loading