Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 379 additions & 0 deletions doc/developer/design/20260210_incremental_occ_read_then_write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,379 @@
# Incremental, OCC-based Read-Then-Write for Concurrent Writers

## Context

Read-then-write operations (DELETE, UPDATE, INSERT...SELECT) in Materialize
currently rely on in-process pessimistic locking. The Coordinator acquires
write locks before reading, holds them through the write, and releases them
only after the write has been applied to the timestamp oracle. This ensures
that no other write can interleave between the read and write phases of a
single operation.

This approach is correct for a single `environmentd` process, but it does not
extend to multiple processes: the locks are in-memory mutexes that cannot be
shared across process boundaries. We need concurrent multi-process writes for:

- **Zero-downtime upgrades v2**: the old and new `environmentd` processes must
both be able to serve writes during the handover window
- **High availability**: multiple `environmentd` processes must be able to
serve queries concurrently
- **Physical isolation**: separate serving-layer processes (aka.
`environmentd`) for different workloads

This design doc proposes replacing the pessimistic locking approach with
optimistic concurrency control (OCC) for read-then-write operations, backed by
a subscribe that continually tracks the current state of the data.

## Goals

- Make read-then-write operations correct with concurrent writers, including
writers in different `environmentd` processes
- Provide the same user-visible semantics as the current implementation: writes
are based on the latest committed state of the table at the time the write is
applied
- Don't regress performance within reasonable bounds

## Non-Goals

- High-performance writes under heavy contention. The current implementation
serializes writes behind a global lock; the new implementation serializes
them via OCC retries. Neither is designed for high write throughput.
- Removing the in-process locks immediately. During rollout, the old lock-based
path and the new OCC path coexist behind a feature flag. The locks can be
removed once the OCC path is fully rolled out.
- Multi-statement transactions. The OCC approach as described here applies to
single-statement implicit transactions. Explicit multi-statement write
transactions continue to use the existing path. And there are not plans to
support mixed read/write transactions.

## Overview

The core idea is to replace the "lock, peek, write" sequence with a
subscribe-based OCC loop:

1. Open a subscribe on the read expression (the `selection` from the
`ReadThenWrite` plan), starting at the timestamp determined by the oracle
2. Accumulate diffs from the subscribe
3. When the subscribe frontier advances to T (meaning we have a consistent
snapshot), attempt to write the accumulated diffs at timestamp T
4. If the write succeeds, done
5. If the write fails because another writer already committed at timestamp T,
the subscribe will deliver the new state; go back to step 3 with the updated
diffs

This approach is correct by construction: the subscribe always reflects the
committed state of the data, and the timestamped write mechanism ensures that
the write is applied at exactly the timestamp the diffs were computed for. If
anything changes between the read and the write, the write fails and is retried
with fresh data.

Below we describe the current approach, the proposed approach, correctness
arguments, and performance implications.

## The current lock-based approach

The current `sequence_read_then_write` in the Coordinator works as follows:

1. **Acquire write locks**: For each table involved in the read-then-write, an
in-process `OwnedMutexGuard` is acquired. All locks are acquired atomically
(all-or-nothing) to prevent deadlocks. If any lock is unavailable, the
entire operation is deferred until the lock becomes available.
2. **Peek**: A one-shot peek is executed at `QueryWhen::FreshestTableWrite`,
reading the current state of the data.
3. **Compute diffs**: The adapter computes retractions and additions from the
peek results (e.g., for DELETE: negate each row; for UPDATE: negate old rows
and add new rows).
4. **Linearize** (for strict serializable isolation): Wait for the timestamp
oracle to confirm that the read timestamp has been linearized.
5. **Write**: Submit the diffs via `send_diffs`, which adds them to the pending
group commit queue.
6. **Release locks**: Locks are held until after the group commit applies the
write to the timestamp oracle, ensuring no other write can sneak in between
the read and the write.

The correctness of this approach depends entirely on the in-process locks. If
the locks were removed or if a second `environmentd` process were to execute a
concurrent write, the read-then-write would be susceptible to lost updates.

The complexity of this locking mechanism is significant:

- `WriteLocks` uses an all-or-nothing builder pattern to prevent deadlocks
- `GroupCommitWriteLocks` merges compatible locks across concurrent blind
writes
- Deferred write operations must be carefully managed when locks aren't
immediately available
- The lock validation code in `group_commit()` has multiple branches for
different lock states (pre-validated, no locks needed, missing locks)

## The proposed OCC approach

### Architecture

The new approach moves read-then-write sequencing from the Coordinator to the
_session task_ (the per-connection async task), similar to how frontend peek
sequencing already works. The session task does the planning, optimization, and
OCC retry loop. It communicates with the Coordinator only for specific
operations that require Coordinator state:

- Creating and dropping the internal subscribe (which needs Coordinator
bookkeeping for the compute sink)
- Submitting timestamped writes (which go through group commit)

### The OCC loop

```
Session Task Coordinator
| |
|-- plan & optimize MIR/LIR |
| |
|-- acquire OCC semaphore |
| |
|-- CreateReadThenWriteSubscribe ----> |
| <------------ subscribe channel -----|
| |
| +-- OCC Loop ------------------+ |
| | receive diffs from subscribe | |
| | on frontier advance: | |
| | consolidate diffs | |
| | AttemptTimestampedWrite -> |-->|-- group_commit()
| | <-- Success/Failed --------|<--|
| | if Failed: continue loop | |
| | if Success: break | |
| +------------------------------+ |
| |
|-- DropReadThenWriteSubscribe ------> |
| |
```

### Timestamped writes

A timestamped write is a write that must be committed at a specific timestamp.
The group commit machinery has to be extended to supports this by:

1. Checking if the target timestamp is still valid (hasn't been passed by the
oracle)
2. Using the target timestamp directly instead of allocating a new one from the
oracle
3. Advancing the oracle past the target timestamp after the write

Only one timestamped write is processed per group commit round. If multiple
timestamped writes target the same timestamp, one is selected and the others
are failed with a _timestamp passed_ error. This is necessary because
independently computed timestamped writes may be inconsistent with each other:
they were each computed from the state at their respective read timestamps and
could conflict if applied together.

### MIR transformations

The subscribe needs to produce the right diffs directly, rather than raw rows
that the adapter then transforms. We apply the mutation transformation at the
MIR level:

- **DELETE**: Wraps the selection expression in a `Negate`, producing `(row,
-1)` diffs
- **UPDATE**: Uses a `Let` binding to share the selection. The body unions a
negated `Get` (old rows with diff -1) with a mapped `Get` (new rows with diff
+1, applying the assignment expressions)
- **INSERT...SELECT**: The selection passes through unchanged; the subscribe
naturally emits each row with diff +1

### Concurrency limiting

When multiple read-then-write operations run concurrently, each maintains a
subscribe that continuously receives and processes updates. With N concurrent
OCC loops, whenever one loop succeeds, the other N-1 loops must process the
resulting updates and retry. This leads to O(N^2) total work.

To bound this, a semaphore has to limit the number of concurrent OCC operations
(default: 4). Additional operations wait for a permit before starting their
subscribe.

### Internal subscribes

The subscribes created for read-then-write are internal: they do not appear in
`mz_subscriptions` or other introspection tables, and they don't increment the
active subscribes metric. They are created and dropped via dedicated `Command`
variants (`CreateReadThenWriteSubscribe`, `DropReadThenWriteSubscribe`).

## Correctness

The correctness argument has two parts: (1) the OCC loop produces the right
diffs, and (2) the timestamped write mechanism ensures they are applied at the
right timestamp.

### The subscribe produces the right diffs

The subscribe starts at the oracle read timestamp and emits the current state
of the selection expression as its initial snapshot. As other writes commit,
the subscribe emits updates that reflect those writes. At any point, if we
consolidate all diffs received so far, we get the current state of the
expression.

The MIR transformations (Negate for DELETE, Let/Union for UPDATE) ensure that
the diffs represent the correct mutation. For example, after consolidation, a
DELETE subscribe contains `(row, -1)` for each row currently matching the
selection.

### The timestamped write ensures atomicity

The write is submitted at the timestamp corresponding to the subscribe's
frontier. The group commit machinery checks that this timestamp hasn't been
passed by the oracle:

- If the timestamp is still valid: the write is committed at exactly that
timestamp, and the oracle is advanced past it. Any concurrent OCC loops that
were targeting the same timestamp will fail and retry.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to worry about how table commit advances the logical frontier for all tables? At first blush it seems like you might get spurious conflicts . (Where eg. my write to table A advanced the frontier for table B, so my write attempt to B fails even though it hasn't changed.)

In theory the txn mechanism has enough metadata in its shard to know that there are not actually any changes in B between my read time and the new frontier, so it was safe to advance the write timestamp further. But I suppose we're going to find that out via the subscribe pretty quickly anyways...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, any table write will clobber your write attempt. And as is in the prototype, you would learn through the subscribe that nothing changed and re-try. Not ideal, but also ... 🤷

- If the timestamp has already passed (another write committed first): the
write also fails. The OCC loop continues, the subscribe delivers the updates
from the intervening write, and the loop retries at the new frontier.

This ensures that the write is always based on the state of the data at exactly
the write timestamp. There is no window for lost updates: either the write
succeeds because nothing changed since the read, or it fails and retries with
fresh data.

### Linearization

Semantically, a read-then-write is a SELECT followed by a write. Normally we
have to linearize reads, ensuring that the oracle read timestamp is at least
the timestamp chosen for a peek, so that results can't "go backwards". With the
subscribe-based OCC loop, we might observe data timestamped beyond the current
oracle read timestamp. However, actually applying the write bumps the oracle
read timestamp to at least the write timestamp, so at write time it holds that
`write_ts <= oracle_read_ts`. The linearization invariant is maintained.

### Single timestamped write write per group commit round

Only one timestamped write is processed per group commit round. This is correct
because:

1. Each timestamped write is computed independently, based on the state at its
own read timestamp
2. Two independently computed timestamped writes could be inconsistent if
applied at the same timestamp (e.g., both try to delete the same row, but
after one succeeds the other's diff is stale)
3. After committing at timestamp T, the oracle advances past T, so additional
writes at T would fail anyway. We fail them early to avoid unnecessary work.

### Timeouts

We have to be careful about bounding the lifetime of the occ loop, both in
wallclock time and number of retries. With the old approach, a read-then-write
could take arbitrarily long, and block the rest of the system. With the new
approach, the occ loop might try arbitrarily long, without ever succeeding. It
will not block the rest of the system, though, which is a big benefit.

As a safety net, we should bound the lifetime of the occ loop with our existing
statement timeout, and potentially add a hard upper limit on the number of
attempts per occ loop.

### Comparison with the old approach
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe somewhat of a corner case, but a slight concern is what happens when the subscribe can't keep up with input changes, and thus the operation can never complete. With the old approach, the operation would usually either complete after a while or OOM the cluster, making some noise. In contrast, the new approach can silently get into a state where it can never finish, e.g., if there is some window function or cross join that keeps rewriting a significant portion of the subscribe's output at every input change. Maybe we could try to detect if the subscribe just keeps falling behind, and show a notice to the user, or even entirely fail the operation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my prototype I have a timeout and a maximum number of retries that the OCC loop does. This should work okay, I guess?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine for now! Maybe much later we could add a detection specifically for the falling behind case, but certainly not urgent.


In the old approach, correctness depends on:
1. No other writer interleaving between read and write (ensured by in-process
locks)
2. Leadership confirmation via the catalog fencing mechanism

In the new approach, correctness depends on:
1. The subscribe reflecting committed state accurately (guaranteed by the
compute/storage layers)
2. The timestamped write succeeding only if the target timestamp is still valid
(guaranteed by the group commit / timestamp oracle)

The new approach is arguably easier to reason about: there is no global lock
state to consider, no deferred operations, no lock merging. The correctness
argument is local to the OCC loop and the group commit mechanism.

## Performance

The goal is not to make writes faster, but to not regress significantly.
Benchmarking a PoC-level implementation of the OCC approach against `main` for
`UPDATE t SET x = x + 1` shows the following:
Copy link
Copy Markdown
Contributor

@ggevay ggevay Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peeks can be faster than subscribes in various situations:

  • If there is an index, then a fast-path peek can avoid building a dataflow.
  • We can do monotonic TopKs and min/max.

Anyhow, if this turns out that it matters for some users, then a follow-up optimization could be to first try a peek-and-write-at-the-same-timestamp instead of a subscribe, and fall back to the subscribe-based approach only if the first write doesn't complete fast enough and fails.


![update-benchmark](./static/occ_read_then_write/update-benchmark.png)

The benchmark varies concurrency (number of workers) on the x-axis and shows
throughput (left) and latency (right). Key observations:

- At low concurrency (1-7 workers), the OCC approach is comparable or _better_
than `main`. This is because the OCC path begins preparing the write (opening
the subscribe, receiving the snapshot) before the write timestamp is claimed,
whereas the old path only starts the peek after acquiring the lock.
- At higher concurrency, performance degrades as expected due to the O(N^2)
retry behavior: with more concurrent writers, more retries are needed. The
concurrency semaphore (default 4 permits) bounds this in practice.
- The benchmark is for a worst-case workload (all writers updating the same
table). Real workloads with writes to different tables won't experience the
contention.

## Rollout

The new path is controlled by a `enable_adapter_frontend_occ_read_then_write`
dyncfg (default: disabled).

If we did a partial rollout where we check the dyncfg per read-then-write
operation, an OCC write could slip between an old-path reader's read and write
phases without the old path detecting it (since the OCC path doesn't acquire
write locks). We therefore must make the flag sticky per `environmentd` process
lifetime (check on bootstrap only) to avoid this, and keep the current
`confirm_leadership` checks.

Once the OCC path is fully rolled out and validated:

1. Remove the old `sequence_read_then_write` code path
2. Remove the in-process write lock machinery (`WriteLocks`,
`WriteLockBuilder`, `GroupCommitWriteLocks`, deferred write operations)
3. Remove the `confirm_leadership()`-style lock validation in group commit

This removes a significant amount of complexity and uncertainty from the
codebase.

## Alternative: distributed locking service
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative I'd like to see discussed is having the OCC loop run in clusterd as a dataflow export. On first glance, that seems attractive because it avoids having to stash all the data read in environmentd (which might oom or abort the read because its response is too large). I imagine it might not work because table writes have to be performed by envd, possibly due to some txn-wal reason?

Copy link
Copy Markdown
Contributor

@ggevay ggevay Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think we eventually want to make multiple envds able to submit read-then-writes concurrently, in which case whatever multiple envds can cooperate on, a clusterd should be able to do as well, right? E.g., clusterds should also able to access the timestamp oracle.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this section at the end:

## Alternative: an occ loop running on `clusterd`

Instead of sending subscribe results back to `environmentd` and running the OCC
loop there, we could run the OCC loop right on the cluster. This should work,
if we give `clusterd` access to the timestamp oracle. A benefit of this
approach is that we take `environmentd` as much out of the processing path as
possible, and so we get better distribution of work.

Another school of thought will say that we _want_ `environmentd` to be in the
path, because we can maybe be smarter about how we commit data to persist.
There's a separation between data layer, which comes up with the changes and
runs the dataflow, and the control layer, which takes pointers to the changes
and appends them durably, with maybe some smarts in the middle.


Instead of OCC, we could extend the current pessimistic locking approach to
work across processes by using a distributed locking service.

The flow would be:

1. Acquire a distributed lock for the target table
2. Peek at `FreshestTableWrite`
3. Compute diffs
4. Write
5. Release the distributed lock

This preserves the familiar lock-based model but has significant drawbacks:

- **Latency**: Every read-then-write would pay the cost of a CRDB round-trip
(or equivalent) to acquire and release the lock, adding milliseconds to every
single write operation. With OCC, the common case (no contention) succeeds on
the first try without extra round-trips.
- **Brittleness**: Distributed locks require careful handling of lock expiry,
holder crashes, and network partitions. A process that acquires a lock and
then crashes (or becomes slow) must be fenced out, adding the same kind of
complexity we already deal with for the `confirm_leadership()` check.
- **Complexity preservation**: The fundamental complexity of the lock-based
approach remains: deferred operations when locks aren't available, all-or-
nothing lock acquisition to prevent deadlocks, lock merging for concurrent
blind writes. We would add distributed systems complexity on top of the
existing in-process lock complexity, rather than replacing it.
- **Scalability**: The distributed lock would serialize all writes to a table
across all processes, even when they don't conflict. OCC allows concurrent
non-conflicting writes to proceed without coordination.

The OCC approach avoids all of these issues. Contention is handled by retrying,
which is simple and local. The cost is paid only when there _is_ actual
contention, and the subscribe ensures that retries are based on fresh data.

## Alternative: an occ loop running on `clusterd`

Instead of sending subscribe results back to `environmentd` and running the OCC
loop there, we could run the OCC loop right on the cluster. This should work,
if we give `clusterd` access to the timestamp oracle. A benefit of this
approach is that we take `environmentd` as much out of the processing path as
possible, and so we get better distribution of work.

Another school of thought will say that we _want_ `environmentd` to be in the
path, because we can maybe be smarter about how we commit data to persist.
There's a separation between data layer, which comes up with the changes and
runs the dataflow, and the control layer, which takes pointers to the changes
and appends them durably, with maybe some smarts in the middle.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.