fix(raft): stream receive snapshot to disk + emit EKVT token (memory safety)#747
fix(raft): stream receive snapshot to disk + emit EKVT token (memory safety)#747
Conversation
PR #746 raised the receive-side spool cap to 16 GiB, unblocking follower catch-up against >1 GiB FSM snapshots. The on-disk receive worked, but apply still went through buildSnapshotMessage -> spool.Bytes() -> io.ReadAll(spool.file) -> raftpb.Snapshot.Data, so Snapshot.Data carried the entire payload as a []byte for RawNode.Step. On a 2.5 GiB-container production node a 1.35-GiB FSM materialization inflates heap past the OOM kill line before the FSM ever runs Restore. Followers would receive successfully, then container OOM-kill mid-apply, then leader retries -- a different loop with the same shape. Fix: route receive into the existing token-format streaming path. The codebase already has all the apply-side machinery: - isSnapshotToken / decodeSnapshotToken / encodeSnapshotToken (fsm_snapshot_file.go) implement the 17-byte EKVT token format - openAndRestoreFSMSnapshot streams a .fsm file's payload into StateMachine.Restore(io.Reader) without materialization - engine.go::applySnapshot already branches on isSnapshotToken and calls openAndRestoreFSMSnapshot when it sees a token - restoreSnapshotState (wal_store.go) does the same on the startup-restore path - e.transport.SetFSMSnapDir(e.fsmSnapDir) is already wired at engine Open What was missing was the receive-side producing tokens. This PR adds: 1. snapshotSpool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c) appends the 4-byte CRC32C footer, fsyncs, closes, and atomically renames the spool file into fsmSnapPath(fsmSnapDir, index). On success, ownership of the on-disk file transfers to fsmSnapDir and snapshotSpool.Close() becomes a no-op so the renamed file is NOT removed. 2. receiveSnapshotStream wraps the spool with crc32CWriter so the CRC accumulates as bytes hit disk -- no extra read pass needed to compute the footer. After the final chunk it calls finalizeReceivedSnapshot, which: - if fsmSnapDir is wired and metadata.Snapshot.Metadata.Index is non-zero: calls FinalizeAsFSMFile, then sets Snapshot.Data = encodeSnapshotToken(index, crc) (17 bytes) - otherwise: falls back to the legacy materialize path so tests and legacy receivers without a wired fsmSnapDir keep behaving exactly as before this PR. 3. Receive log line gains a `format` field (`token` or `inline`) so an operator can distinguish the streaming path from the legacy path at a glance without grepping byte counts. Heap usage during receive is now proportional to chunk size, not FSM size. A 16 GiB FSM would write 16 GiB to disk and allocate a 17-byte token plus per-chunk buffers (~MiB) -- container memory limits no longer constrain receive. Self-review (5 lenses): 1. Data loss -- none. The on-disk format produced by FinalizeAsFSMFile is byte-identical to writeFSMSnapshotFile (payload + 4-byte big-endian CRC32C footer), and openAndRestoreFSMSnapshot reads either source uniformly. Tests prove a sender-FSM round-trips through the streaming receive to a fresh receiver-FSM with identical Applied(). 2. Concurrency -- no new locks. The spool's `path` and `file` fields are written from a single goroutine (the receive loop); FinalizeAsFSMFile nulls them on success so deferred Close() becomes a no-op without a separate keep flag. crcWriter wraps the spool linearly; it is never shared across goroutines. 3. Performance -- one CRC32C pass over the receive bytes. CRC32C on modern x86 with PCLMULQDQ runs at multi-GiB/s, well above gRPC stream throughput, so the overhead is invisible in profiles. Heap allocation for Snapshot.Data drops from O(FSM size) to constant 17 bytes. Disk write pattern is unchanged (sequential append to spool, single rename, single fsync) -- the rename is metadata-only so it is essentially free. 4. Data consistency -- the EKVT token CRC is computed by the same crc32CWriter wrapper used for sender-side .fsm file creation (writeFSMSnapshotPayload), so the footer the receiver writes matches what openAndRestoreFSMSnapshot's pre-Restore footer check expects. ApplySnapshot ordering is unchanged; this PR only swaps how Snapshot.Data is shaped before RawNode.Step sees it, and the apply branch on isSnapshotToken was already in place. 5. Test coverage -- TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet pins: - Snapshot.Data is the 17-byte token, not the payload - spool dir is empty post-receive (file was renamed, not deleted, not left behind) - .fsm file exists at fsmSnapPath with payload + 4-byte CRC footer in the correct on-disk format - end-to-end round trip: sender's Applied() == receiver's Applied() after restoreSnapshotState reads the token, opens the .fsm file, and streams it into FSM.Restore. TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDir pins the unchanged legacy path: when fsmSnapDir is empty, payload is still inline so existing tests keep passing. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.5s, all green. Future work (separate PRs): - Extend the streaming-token receive to the leader-side applyBridgeMode legacy path so an upgrade in progress where some peers still run the pre-token binary doesn't fall back to materialization on the leader. - Add a Prometheus counter for token vs inline receive so an operator can detect a regression where a peer accidentally hits the legacy fallback in production.
📝 WalkthroughWalkthroughThis PR implements disk-backed "streaming-token" snapshot finalization to prevent ChangesSnapshot Streaming-Token Finalization
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- gemini medium (snapshot_spool.go:75): Switch the cap check from `int64(len(p))+s.size > s.maxSize` to `int64(len(p)) > s.maxSize-s.size`. With the new env override an operator can set maxSize near math.MaxInt64; the previous form would overflow there and silently let the write through. Subtraction stays in [0, maxSize] and rejects the same payloads correctly. Mathematically equivalent for all sane configurations, no caller-visible semantic change. - gemini medium (snapshot_spool.go:90): Replace `io.ReadAll(s.file)` with `make([]byte, s.size)` + `io.ReadFull`. Write only increments s.size on successful os.File.Write returns, so it is the truth-of- record for what's on disk. Pre-allocating skips ~30 reallocs and intermediate copies that ReadAll would do via power-of-two doublings on a 1.35 GiB receive. - codex P1 (snapshot_spool.go:28): "Keep snapshot cap within in-memory apply limit" — a 16 GiB receive cap admits payloads larger than the in-memory materialization can survive on a memory-constrained node. Acknowledged: this PR alone is the on-disk-cap fix; the in-memory hazard is closed by stacked PR #747, which routes receive into the existing token-format streaming path so Snapshot.Data is a 17-byte token (not the payload) and apply reads from disk via io.Reader. #747 keeps the receive cap aligned with disk space (where 16 GiB is realistic) rather than RAM (where it is not). See PR #747 for the streaming-receive change and self-review. No semantic changes here: Write's reject/accept set is unchanged; Bytes() returns the same content via a faster path. Tests still green: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.6s, all green.
There was a problem hiding this comment.
Code Review
This pull request implements a streaming snapshot receive mechanism to mitigate OOM risks during large state machine transfers. By utilizing a crc32CWriter and the new FinalizeAsFSMFile method, the system now renames spool files directly into the snapshot directory and uses a lightweight token for Raft messages instead of in-memory materialization. Feedback was provided regarding error handling in FinalizeAsFSMFile, specifically noting that failures in syncDir after a file move could lead to misleading errors in spool.Close(), and emphasizing the importance of logging Close() errors rather than ignoring them.
| if syErr := syncDir(fsmSnapDir); syErr != nil { | ||
| return errors.WithStack(syErr) | ||
| } |
There was a problem hiding this comment.
If syncDir fails, the file has already been moved, which causes the caller's spool.Close() to return a misleading os.ErrNotExist. Crucially, the caller must not silently ignore the error from spool.Close() (e.g., via _ = spool.Close()). Repository rules require that errors from Close() methods be logged to ensure resource leaks or cleanup problems are visible.
References
- Do not silently ignore errors from Close() methods; log them to ensure resource leaks or cleanup problems are visible.
…747 r1) Round-1 review on commit aed129c from gemini-code-assist (medium): - snapshot_spool.go FinalizeAsFSMFile: the original single-defer state-clearing only fired on full success. If os.Rename succeeded but syncDir(fsmSnapDir) then failed, the function returned the syncDir error but left s.path populated, so the caller's deferred spool.Close() would attempt os.Remove(s.path) on a path that no longer pointed at our file (it had been renamed). Close() then surfaced a misleading os.ErrNotExist that would bury the real syncDir error in the operator's view. Switched to per-step inline state clearing: - s.file = nil after successful os.File.Close() - s.path = "" after successful os.Rename Now subsequent Close() does the right thing on every partial- failure path: 1. file Close OK, Rename fails: s.file=nil, s.path=spoolPath → Close() correctly removes the half-written spool file. 2. Rename OK, syncDir fails: s.file=nil, s.path="" → Close() is a true no-op; syncDir error is reported cleanly, no misleading ErrNotExist. - grpc_transport.go receiveSnapshotStream: replaced `defer func() { _ = spool.Close() }()` with a slog.Warn-on-error variant. Per the project convention noted in the gemini comment, Close() errors are operationally meaningful (half-written spool files we couldn't clean up = disk full / permission flip / etc.) and should not be silently ignored. After FinalizeAsFSMFile hands ownership off, Close is a no-op; the warning only fires on the unhappy paths that actually need an operator's attention. Tests added (snapshot_spool_test.go): - TestFinalizeAsFSMFile_PostFinalizeCloseIsNoop: pins that Close after a successful Finalize is a clean no-op (and idempotent — a second Close also passes), and the renamed .fsm file is still on disk afterwards. - TestFinalizeAsFSMFile_RenameFailureCleansUpSpool: simulates a pre-rename failure (parent dir made unwritable), verifies the spool file is still at its original path post-Finalize, and that the deferred Close() correctly removes the orphan. Caller audit (semantic change requires it): - wal_store.go:372 spool.Close call: legacy path (snapshotBytesAndClose / migrateLegacyState) that does NOT call FinalizeAsFSMFile. Behavior unchanged: Close still removes the spool file if the path is set. - grpc_transport.go:receiveSnapshotStream: only other caller; this PR's same diff updates it to log Close errors. - No other callers in tree. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.9s, all green.
|
@claude review |
|
Claude finished @bootjp's task in 5m 14s —— View job Code Review
This is a well-motivated fix for a real OOM hazard, and the implementation is correct. The r1 commit already addresses the two Gemini review concerns. Detailed findings below by lens. Gemini review — addressedBoth concerns from the Gemini reviewer are resolved in the r1 commit (
Data Loss — no issuesThe write sequence in Each step clears the relevant state field only on success, so deferred
CRC correctness — no issues
Concurrency — no issues
Performance — no issuesTwo fsyncs per received snapshot (payload sync + directory sync) matches the sender-side FindingsShould Fix Missing test: The PR's comment block explicitly documents the "failure mode 2" where Agreed that mocking Minor
If
Test coverage assessment
Coverage is solid. The one gap (syncDir failure) is noted above. Overall: approve with the syncDir test suggestion as a follow-up — the core correctness is sound and this PR unblocks the production cluster from its OOM loop. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 49723f395c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if mkErr := os.MkdirAll(fsmSnapDir, defaultDirPerm); mkErr != nil { | ||
| return errors.WithStack(mkErr) | ||
| } | ||
| if rnErr := os.Rename(s.path, finalPath); rnErr != nil { |
There was a problem hiding this comment.
Handle cross-device spool finalization without os.Rename
FinalizeAsFSMFile moves the received snapshot from spoolDir to fsmSnapDir with os.Rename, but the receive path always creates the spool in spoolDir and propagates this error back through receiveSnapshotStream/SendSnapshot. When those directories are on different filesystems (e.g., separate mounted volumes or a mounted fsm-snap subdir), os.Rename fails with EXDEV, so snapshot delivery is rejected and the leader will keep retrying instead of letting the follower catch up. Please add a cross-device fallback (copy+fsync+atomic replace in destination) or guarantee same-filesystem temp file placement before finalization.
Useful? React with 👍 / 👎.
#747 r2) Round-2 review on commit 49723f3 from claude bot: - Should Fix: missing test for "rename succeeded but syncDir failed" path. The PR's comment block in FinalizeAsFSMFile explicitly documents this case as handled by stepwise state clearing, but no test pinned it down. Addressed by: 1. Indirecting fsync-on-directory through a package var `snapshotSyncDir = syncDir`. Production goes through the unchanged default; tests can fault-inject a failure. 2. New test TestFinalizeAsFSMFile_SyncDirFailureCloseIsNoop fault-injects a syncDir error and pins: - Finalize returns the syncDir error (not buried). - .fsm file is at canonical path (durability-loss only, not logical-state regression). - spool dir is empty (file was renamed out). - deferred Close() returns nil (no misleading os.ErrNotExist that would bury the syncDir signal in the slog warning emitted by receiveSnapshotStream). - Minor: `s.file.Close()` returning an error left `s.file` non-nil, so the deferred caller-side Close would double-call it and log os.ErrClosed alongside the real error. Set `s.file = nil` unconditionally — os.File.Close releases the fd atomically before the syscall returns even on error, so this matches the documented semantics and stops the cosmetic double-warning. Caller audit (semantic-adjacent change requires it): - snapshotSyncDir: new package var, default value = syncDir function, so production callers (FinalizeAsFSMFile is the only one) see unchanged behavior. - s.file = nil unconditional: only affects post-Close double- Close warning; the spool's Close() method's branch on `s.file != nil` correctly skips the second close. No caller reads s.file directly outside snapshot_spool.go. - All Close paths verified by enumeration: success: s.file=nil, s.path="" → Close no-op Close-error: s.file=nil, s.path=spool → Close removes spool Rename-error: s.file=nil, s.path=spool → Close removes spool syncDir-error: s.file=nil, s.path="" → Close no-op (test above) Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.9s, all green.
|
@claude review |
|
Claude finished @bootjp's task in 2m 32s —— View job Code Review
Round 2 review (post r1 + r2 commits). Both previous review concerns are closed. One new finding from the Codex review warrants discussion. r1 + r2 — fully resolvedr1 ( r2 (
Codex P1 — EXDEV (cross-device rename)The Codex reviewer's concern is structurally correct, but the severity in practice needs context. Standard engine wiring ( A rename from Non-standard deployment (e.g.
Contrast with the sender side: Conclusion on EXDEV: The receiver path is less robust than the sender path in this regard. A user who puts Data consistency — verified correctThe CRC chain is airtight:
The The Minor:
|
| Path | Test |
|---|---|
| Streaming token path (fsmSnapDir set) | TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet ✓ |
| Legacy fallback (fsmSnapDir unset) | TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDir ✓ |
FinalizeAsFSMFile success + Close is no-op |
TestFinalizeAsFSMFile_PostFinalizeCloseIsNoop ✓ |
| Pre-rename failure (MkdirAll fails) | TestFinalizeAsFSMFile_RenameFailureCleansUpSpool ✓ |
| Post-rename failure (syncDir fails) | TestFinalizeAsFSMFile_SyncDirFailureCloseIsNoop ✓ (new in r2) |
| Full sender→receiver gRPC round-trip | TestStreamFSMSnapshotOverGRPCRestoresFollowerFSM ✓ |
| Trailing partial chunk regression | TestSendSnapshotReaderChunksTrailingPartialChunk ✓ |
dispatchSnapshot routing |
TestDispatchSnapshotTokenRoutesToStream, TestDispatchSnapshotNonTokenRoutesToBridge, TestDispatchSnapshotTokenNoOpenerFallsBackToBridge ✓ |
index=0 fallback |
missing (minor) |
Overall: approve. The core correctness is sound, the memory-safety fix is complete, and all previously flagged gaps (including the syncDir test) are resolved. The EXDEV concern is real but only affects non-standard deployments and has a safe failure mode; a doc comment noting the same-filesystem requirement would close it without a follow-up PR.
r3) Round-2 review on commit 113f7ce from chatgpt-codex (P1): > FinalizeAsFSMFile moves the received snapshot from spoolDir to > fsmSnapDir with os.Rename, but the receive path always creates > the spool in spoolDir and propagates this error back through > receiveSnapshotStream/SendSnapshot. When those directories are > on different filesystems (e.g., separate mounted volumes or a > mounted fsm-snap subdir), os.Rename fails with EXDEV, so > snapshot delivery is rejected and the leader will keep retrying > instead of letting the follower catch up. Standard engine wiring (engine.go:637-638) keeps spoolDir and fsmSnapDir under cfg.DataDir, so EXDEV cannot fire in default deployments — but the receive code should not assume that. An operator who mounts fsmSnapDir on a separate volume (e.g. for isolation between WAL and snapshot I/O) would otherwise hit a hard receive failure with the leader retrying indefinitely. Fix: when fsmSnapDir is wired, create the spool inside fsmSnapDir (not spoolDir). FinalizeAsFSMFile's os.Rename then stays within a single filesystem and EXDEV is structurally impossible. The legacy fallback path (fsmSnapDir empty) keeps the spool in spoolDir because it never renames — Bytes() materializes the payload in place. Cleanup: receiveSnapshotStream's spool can now be orphaned in fsmSnapDir if the process crashes mid-receive, so prepareDataDirs gains a second cleanupStaleSnapshotSpools call against fsmSnapDir at startup. The pattern (elastickv-etcd-snapshot-*) is distinct from the .fsm files that live in fsmSnapDir, so the cleanup glob has no false positives. Doc: FinalizeAsFSMFile now documents the same-filesystem requirement explicitly so future readers understand why the receive code chose this placement. Test added (TestReceiveSnapshotStream_SpoolPlacedInFSMSnapDir): - distinct spoolDir / fsmSnapDir - inject syncDir failure to keep partial state observable - assert spoolDir is empty after receive (no fallthrough to legacy placement) - assert .fsm file exists at fsmSnapPath(fsmSnapDir, index) Caller audit (semantic change requires it): - newSnapshotSpool: 2 production callers. - wal_store.go:375 (snapshotBytes for legacy hashicorp→etcd migration path): uses caller-supplied spoolDir, behavior unchanged — receive-side and migration-side are independent. - grpc_transport.go:708 (receiveSnapshotStream): updated in this PR to pick fsmSnapDir when set. - cleanupStaleSnapshotSpools: 2 production callers in prepareDataDirs: - existing call against dataDir: unchanged. - new call against fsmSnapDir: necessary because the new spool placement can leak elastickv-etcd-snapshot-* into fsmSnapDir on crash mid-receive. - No other production callers of either function. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 12.0s, all green.
|
@claude review |
|
Claude finished @bootjp's task in 2m 3s —— View job Code Review (Round 3)
Round 3 review (post r3 commit). This commit closes the Codex P1 EXDEV concern from round 2. No new issues found. r3 — EXDEV avoidance: fully resolvedThe approach is correct and cleaner than the alternative (copy+fsync fallback). By creating the spool inside Placement logic ( spoolPlacement := spoolDir
if fsmSnapDir != "" {
spoolPlacement = fsmSnapDir
}Simple and correct. The legacy path ( Startup cleanup ( if err := cleanupStaleSnapshotSpools(fsmSnapDir); err != nil {
return errors.Wrap(err, "cleanup stale snapshot spools (fsm-snap dir)")
}Necessary and correct: a crash mid-receive now leaves One ordering note: this call runs before
Documents the same-filesystem requirement and explains how Test ( The test is well-designed:
This pins the placement guarantee against future refactors. ✓ Full coverage table (post r3)
Overall: approve. All three rounds of review concerns are resolved. The implementation is correct, the memory-safety goal is achieved, and the EXDEV concern is handled structurally rather than defensively. Ready to merge once #746 lands. |
…r3) Round-3 lint feedback on commit 3424d5c from reviewdog/golangci: > calculated cyclomatic complexity for function receiveSnapshotStream > is 11, max is 10 (cyclop) Round-3's EXDEV-avoidance change pushed receiveSnapshotStream's cyclomatic complexity from 10 to 11 (the new spool-placement branch). Reduced complexity by extracting two helpers and narrowing each function's responsibility: - snapshotSpoolPlacement(): owns the locked read of t.spoolDir / t.fsmSnapDir and the EXDEV-avoidance decision (place spool inside fsmSnapDir when wired). Returns (placement, fsmSnapDir). - drainSnapshotChunks(): owns the per-chunk receive loop, CRC accumulation, and the call into finalizeReceivedSnapshot on the final chunk. Returns (msg, payloadBytes, err). receiveSnapshotStream now owns: - spool lifecycle (newSnapshotSpool + deferred Close-with-warn) - the post-receive structured log line Behaviour is byte-identical: same spool placement, same finalization path, same log line (index, from, payload_bytes, format). Verified locally: golangci-lint run --enable-only cyclop ./internal/raftengine/etcd/... -- 0 issues. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 12.0s, all green. Claude bot round-3 review on the prior commit (3424d5c) marked the PR as "approve. All three rounds of review concerns are resolved." This refactor only addresses the cyclop signal raised by reviewdog separately; no semantic changes on the receive path. No caller audit needed.
|
@claude review |
|
Claude finished @bootjp's task in 2m 6s —— View job Code Review (Round 4)
Round 4 review (post r3 refactor commit r3 refactor — cyclomatic complexity: resolved correctlyThe EXDEV-avoidance placement branch in
Behavior is byte-identical: same spool placement, same CRC accumulation, same Subtle observation:
|
| Path | Test |
|---|---|
| Streaming token path (fsmSnapDir set) | TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet ✓ |
| Spool placed in fsmSnapDir (not spoolDir) | TestReceiveSnapshotStream_SpoolPlacedInFSMSnapDir ✓ |
| Legacy fallback (fsmSnapDir unset) | TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDir ✓ |
| Premature EOF during drain | TestReceiveSnapshotStreamRejectsPrematureEOF ✓ |
| Duplicate metadata during drain | TestReceiveSnapshotStreamRejectsDuplicateMetadata ✓ |
FinalizeAsFSMFile success + Close is no-op |
TestFinalizeAsFSMFile_PostFinalizeCloseIsNoop ✓ |
| Pre-rename failure | TestFinalizeAsFSMFile_RenameFailureCleansUpSpool ✓ |
| Post-rename / syncDir failure | TestFinalizeAsFSMFile_SyncDirFailureCloseIsNoop ✓ |
| Full sender→receiver gRPC round-trip | TestStreamFSMSnapshotOverGRPCRestoresFollowerFSM ✓ |
| Trailing partial chunk regression | TestSendSnapshotReaderChunksTrailingPartialChunk ✓ |
dispatchSnapshot routing |
TestDispatchSnapshotTokenRoutesToStream + two variants ✓ |
index=0 fallback |
missing (minor, pre-existing) |
drainSnapshotChunks and snapshotSpoolPlacement are not tested in isolation, but they're fully exercised via receiveSnapshotStream — isolation tests would add no new coverage at this complexity level.
Overall: approve. All four rounds of review concerns are closed. The refactor is clean, purely structural, and the cyclop complaint is resolved without any behavior change. Ready to merge once #746 lands.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 944ab86455
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := spool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c); err != nil { | ||
| return raftpb.Message{}, err | ||
| } | ||
| metadata.Snapshot.Data = encodeSnapshotToken(index, crc32c) |
There was a problem hiding this comment.
Delete finalized snapshot when post-receive handling fails
The streaming branch finalizes and renames the spool into fsmSnapDir before the message is handed to SendSnapshot’s t.handle path, but FinalizeAsFSMFile clears the spool path so deferred spool.Close() cannot clean it up afterward. If t.handle returns an error (for example during transient engine/raft failures), SendSnapshot returns failure to the sender while leaving the newly written .fsm file behind with no corresponding applied snapshot, which can accumulate orphaned large files across retries with different snapshot indexes.
Useful? React with 👍 / 👎.
Round-3 review on commit 944ab86 from chatgpt-codex (P2): > The streaming branch finalizes and renames the spool into > fsmSnapDir before the message is handed to SendSnapshot's > t.handle path, but FinalizeAsFSMFile clears the spool path so > deferred spool.Close() cannot clean it up afterward. If > t.handle returns an error (for example during transient > engine/raft failures), SendSnapshot returns failure to the > sender while leaving the newly written .fsm file behind with > no corresponding applied snapshot, which can accumulate > orphaned large files across retries with different snapshot > indexes. Confirmed: - After receiveSnapshotStream succeeds, msg.Snapshot.Data is a 17-byte EKVT token and the .fsm file lives at fsmSnapPath(fsmSnapDir, index). - SendSnapshot then calls t.handle(ctx, msg). The engine's applySnapshot is synchronous to t.handle, so a non-nil return guarantees applied_index was NOT advanced — the .fsm file is unreferenced. - Same-index retries are safe (os.Rename atomically replaces), but the leader can take a fresh snapshot at a higher index before the apply finally succeeds, and each failed attempt at a different index leaves an orphan .fsm. - cleanupStaleFSMSnaps only runs at startup (prepareDataDirs), so during a long-lived process, orphans accumulate to disk-size pressure. Fix: SendSnapshot calls a new removeOrphanedFSMSnapshot helper on the apply-failure branch. The helper: 1. Decodes the EKVT token from msg.Snapshot.Data; bails if the message used the legacy inline path (no .fsm file to clean). 2. Reads fsmSnapDir under t.mu; bails if it's unset (legacy receivers also use the inline path so there's nothing on disk). 3. os.Remove(fsmSnapPath(...)) — best-effort, IsNotExist is tolerated, all other errors are slog.Warn'd. The original apply error is the actionable signal returned to the sender; a failed Remove is a secondary concern that startup cleanup still picks up. Test added (TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile): - Wires SetFSMSnapDir + SetHandler-that-fails. - Drives a real testStateMachine-framed payload through SendSnapshot. - Asserts SendSnapshot surfaces the apply error. - Asserts fsmSnapPath(fsmSnapDir, index) does NOT exist after the call — orphan cleanup fired. Caller audit (semantic change requires it): - SendSnapshot (this function): only gRPC service handler, no in-tree non-test callers. Modified. - removeOrphanedFSMSnapshot: only called from SendSnapshot. - t.handle (other callers): line 438 in Send (regular RPC for non-snapshot messages, no .fsm involvement) is untouched. - client.SendSnapshot (lines 267, 455, 481): leader-side client invocations are different function, unaffected. The cleanup is gated by isSnapshotToken(msg.Snapshot.Data) so the legacy in-memory fallback path (when fsmSnapDir is unset) sees no behavior change — there's no .fsm file to remove on that path because Bytes() materializes inline. Test: go test -race -count=1 -short ./internal/raftengine/etcd -- 11.3s, all green. golangci-lint run --enable-only cyclop ./internal/raftengine/etcd/... -- 0 issues.
|
@claude review |
|
Claude finished @bootjp's task in 2m 40s —— View job Code Review (Round 5)
Round 5 review (post r4 commit r4 — Codex P2: orphaned
|
| Case | Outcome |
|---|---|
msg.Snapshot.Data is not a token (legacy path) |
Early return — no .fsm file was written, no cleanup needed ✓ |
| Token decode fails | Early return — malformed token from our own code, shouldn't happen; orphan swept at startup ✓ |
fsmSnapDir is empty (concurrent SetFSMSnapDir("") race) |
Early return — benign; startup safety net applies ✓ |
os.Remove succeeds |
Orphan cleaned up immediately ✓ |
os.Remove fails / file already gone |
Logged, not returned; cleanupStaleFSMSnaps sweeps on next startup ✓ |
Best-effort semantics are correct: the apply error is the actionable signal; returning a cleanup error on top of it would obscure the root cause. The startup safety net (cleanupStaleFSMSnaps) removes any .fsm file with no matching live .snap, so even if os.Remove fails here, the orphan is bounded to one restart cycle.
Test: TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile (grpc_transport_test.go:295):
The test correctly exercises the primary scenario:
- Uses a real
testStateMachinepayload + framed bytes so a syntactically valid.fsmfile is produced ✓ index = 77(non-zero) → streaming token path is taken ✓- Handler always returns a synthetic apply error ✓
- Asserts
SendSnapshotsurfaces the apply error ✓ - Asserts
fsmSnapPath(fsmSnapDir, 77)does not exist after the call ✓
One minor observation: the test comment says "Same-index retries are already safe via os.Rename's atomic-replace, so this test exercises the cross-index case where the orphan would actually persist." This is accurate — same-index retries atomically overwrite via FinalizeAsFSMFile's os.Rename, so the residual risk is cross-index, and removeOrphanedFSMSnapshot closes it.
Data Loss — still clean
The removeOrphanedFSMSnapshot path is only reachable when t.handle returns non-nil. Since t.handle invokes the raft handler synchronously (handler(ctx, msg)) and the handler does not return until apply is either committed or rejected, a non-nil return definitively means applied_index was NOT advanced for this snapshot. The .fsm file removal is therefore unconditionally safe — there is no window where the file is referenced by a committed raft entry. ✓
Final coverage table (post r4)
| Path | Test |
|---|---|
| Streaming token path (fsmSnapDir set) | TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet ✓ |
| Spool placed in fsmSnapDir (not spoolDir) | TestReceiveSnapshotStream_SpoolPlacedInFSMSnapDir ✓ |
| Legacy fallback (fsmSnapDir unset) | TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDir ✓ |
FinalizeAsFSMFile success + Close no-op |
TestFinalizeAsFSMFile_PostFinalizeCloseIsNoop ✓ |
| Pre-rename failure (MkdirAll fails) | TestFinalizeAsFSMFile_RenameFailureCleansUpSpool ✓ |
| Post-rename / syncDir failure | TestFinalizeAsFSMFile_SyncDirFailureCloseIsNoop ✓ |
| Full sender→receiver gRPC round-trip | TestStreamFSMSnapshotOverGRPCRestoresFollowerFSM ✓ |
| Trailing partial chunk regression | TestSendSnapshotReaderChunksTrailingPartialChunk ✓ |
dispatchSnapshot routing |
TestDispatchSnapshotTokenRoutesToStream + two variants ✓ |
Apply failure → orphaned .fsm removed |
TestSendSnapshot_ApplyFailureRemovesFinalizedFSMFile ✓ (new in r4) |
index=0 fallback |
missing (minor, pre-existing) |
Overall: approve. All five rounds of review concerns are resolved — the syncDir test gap (r2), the EXDEV cross-filesystem concern (r3), the cyclop complexity violation (r3 refactor), and the Codex P2 orphaned-file accumulation (r4) are all closed. The implementation is correct, the memory-safety goal is achieved, and the test coverage is comprehensive. Ready to merge once #746 lands.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/raftengine/etcd/grpc_transport.go (1)
421-427: ⚡ Quick winUse
erroras the slog key for consistency with the rest of the file.The two new
slog.Warncall sites use"err"while the existing calls in this file (e.g.,closePeerConnLockedat line 201,streamFSMSnapshot's deferred close at line 255, plus the "WAL close failed" sites inwal_store.go) all use"error". Per coding guideline "Use structuredsloglogging with stable keys", lining up on a single key across the file/codebase keeps log queries (error=) reliable.♻️ Proposed diff
@@ if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) { slog.Warn("failed to remove orphaned fsm snapshot file after apply failure", "path", path, "index", tok.Index, - "err", rmErr, + "error", rmErr, ) } @@ if closeErr := spool.Close(); closeErr != nil { slog.Warn("snapshot spool close failed", "spool_dir", spoolPlacement, - "err", closeErr, + "error", closeErr, ) }As per coding guidelines: "Use structured
sloglogging with stable keys (key,commit_ts,route_id, etc.) for logging throughout the codebase."Also applies to: 759-764
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/raftengine/etcd/grpc_transport.go` around lines 421 - 427, Update the two slog.Warn calls in internal/raftengine/etcd/grpc_transport.go that log orphaned FSM snapshot/WAL removal failures so they use the structured key "error" instead of "err"; find the slog.Warn invocations with the message "failed to remove orphaned fsm snapshot file after apply failure" (and the similar one at the later site around lines ~759-764) and change the key from "err" to "error" while keeping the same context fields ("path", path, "index", tok.Index, etc.).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 421-427: Update the two slog.Warn calls in
internal/raftengine/etcd/grpc_transport.go that log orphaned FSM snapshot/WAL
removal failures so they use the structured key "error" instead of "err"; find
the slog.Warn invocations with the message "failed to remove orphaned fsm
snapshot file after apply failure" (and the similar one at the later site around
lines ~759-764) and change the key from "err" to "error" while keeping the same
context fields ("path", path, "index", tok.Index, etc.).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5af9d193-4918-4704-ad40-669d29bba3d5
📒 Files selected for processing (5)
internal/raftengine/etcd/grpc_transport.gointernal/raftengine/etcd/grpc_transport_test.gointernal/raftengine/etcd/snapshot_spool.gointernal/raftengine/etcd/snapshot_spool_test.gointernal/raftengine/etcd/wal_store.go
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3df8983cbe
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| path := fsmSnapPath(fsmSnapDir, tok.Index) | ||
| if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) { |
There was a problem hiding this comment.
Avoid deleting shared snapshot file on handler error
removeOrphanedFSMSnapshot unconditionally deletes fsmSnapDir/<index>.fsm whenever t.handle returns an error, but t.handle in production only enqueues the message and can fail independently of whether another in-flight receive for the same snapshot index has already been accepted. In a duplicate/retry scenario (same index) one RPC can enqueue successfully while a second RPC fails (e.g. errStepQueueFull) and this delete removes the file the successful queued message still needs, causing later restore/apply to fail with missing snapshot file.
Useful? React with 👍 / 👎.
Summary
PR #746 raised the receive-side spool cap to 16 GiB so a 1.35 GiB FSM transfer no longer fails mid-stream. But apply was still going through
buildSnapshotMessage→spool.Bytes()→io.ReadAll, materializing the entire payload as[]byteforraftpb.Snapshot.Data. On a 2.5 GiB-container production node a 1.35 GiB allocation in addition to existing process state (Pebble cache, raft state, runtime) inflates heap past the OOM kill line beforeRawNode.Stepever runsRestore.This PR routes receive into the existing token-format streaming path so heap usage during receive becomes O(chunk size), not O(FSM size).
What was already in place
The codebase already had all the apply-side machinery for streaming snapshot restore:
isSnapshotToken/decodeSnapshotToken/encodeSnapshotToken(fsm_snapshot_file.go) — the 17-byte EKVT token formatopenAndRestoreFSMSnapshot— streams a.fsmfile's payload intoStateMachine.Restore(io.Reader)without materializationengine.go::applySnapshotalready branches onisSnapshotTokenand callsopenAndRestoreFSMSnapshotwhen it sees a tokenrestoreSnapshotState(wal_store.go) does the same on the startup-restore pathe.transport.SetFSMSnapDir(e.fsmSnapDir)is wired at engineOpenWhat was missing was the receive-side producing tokens.
What this PR adds
snapshotSpool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c)— appends the 4-byte CRC32C footer, fsyncs, closes, atomically renames the spool file intofsmSnapPath(fsmSnapDir, index). On success, ownership transfers tofsmSnapDirandClose()becomes a no-op so the renamed file is NOT removed.receiveSnapshotStreamwraps the spool withcrc32CWriterso CRC accumulates as bytes hit disk (no extra read pass). After the final chunk,finalizeReceivedSnapshot:fsmSnapDiris wired ANDmetadata.Snapshot.Metadata.Index > 0): callsFinalizeAsFSMFile, then setsSnapshot.Data = encodeSnapshotToken(index, crc)— 17 bytes.buildSnapshotMessageso tests without anfsmSnapDirkeep behaving exactly as before this PR.Receive log line gains a
formatfield (tokenorinline) so an operator can distinguish the streaming path from legacy at a glance.Memory profile change
Snapshot.Datais setRawNode.Steppeak heapfsm.Restorepeak heapio.Reader)A 16 GiB FSM would write 16 GiB to disk and allocate a 17-byte token plus per-chunk buffers. Container memory limits no longer constrain receive size.
Self-review (5 lenses)
Data loss — none. The on-disk format produced by
FinalizeAsFSMFileis byte-identical towriteFSMSnapshotFile(payload + 4-byte big-endian CRC32C footer).openAndRestoreFSMSnapshotreads either source uniformly. Tests prove a sender-FSM round-trips through streaming receive to a fresh receiver-FSM with identicalApplied().Concurrency — no new locks. The spool's
pathandfilefields are written from a single goroutine (the receive loop).FinalizeAsFSMFilenulls them on success so deferredClose()becomes a no-op without a separatekeepflag.crc32CWriterwraps the spool linearly; never shared across goroutines.Performance — one CRC32C pass over receive bytes. CRC32C on modern x86 (PCLMULQDQ) runs multi-GiB/s, well above gRPC stream throughput; invisible in profiles. Heap allocation for
Snapshot.Datadrops from O(FSM) to constant 17 bytes. Disk pattern unchanged: sequential append to spool, single rename (metadata-only), single fsync.Data consistency — the EKVT token CRC is computed by the same
crc32CWriterwrapper used bywriteFSMSnapshotPayload(sender side), so the footer the receiver writes matches whatopenAndRestoreFSMSnapshot's pre-Restore footer check expects.ApplySnapshotordering is unchanged; this PR only swaps howSnapshot.Datais shaped beforeRawNode.Stepsees it, and the apply branch onisSnapshotTokenwas already in place.Test coverage:
TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSetpins:Snapshot.Datais the 17-byte token, not the payload.fsmfile exists atfsmSnapPathwith payload + 4-byte CRC footersender.Applied() == receiver.Applied()afterrestoreSnapshotStatereads the token, opens the.fsmfile, streams it intoFSM.Restore.TestReceiveSnapshotStream_LegacyFallbackWhenNoFSMSnapDirpins the unchanged legacy path: whenfsmSnapDiris empty, payload remains inline so existing tests keep passing.Test plan
go test -race -count=1 -short ./internal/raftengine/etcd— 11.5s, all greenrestoreSnapshotState→ receiver-FSM round-tripapplied_indexadvances, container memory peak during receive stays well under 2.5 GiB limit (the original OOM hazard)Production incident context — 2026-05-08
Two followers (211, 213) fell behind the leader's log during an earlier OOM cascade, leader truncated past their match indices, snapshot transfer became required. With the pre-PR-746 1 GiB receive cap they failed mid-stream → leader retry loop → 100 MB/s outbound for hours.
PR #746 fixed the on-disk cap. This PR closes the remaining memory hazard so the same followers don't OOM during apply once the receive completes — the cluster genuinely catches up rather than trading one loop for another.
Follow-up (separate PRs)
applyBridgeModelegacy path so an upgrade in progress (some peers on the pre-token binary) doesn't fall back to materialization on the leader.Summary by CodeRabbit
Bug Fixes
New Features