-
Notifications
You must be signed in to change notification settings - Fork 2
fix(raft): stream receive snapshot to disk + emit EKVT token (memory safety) #747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aed129c
49723f3
113f7ce
3424d5c
944ab86
3df8983
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "context" | ||
| "io" | ||
| "log/slog" | ||
| "os" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -378,11 +379,54 @@ func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) erro | |
| return err | ||
| } | ||
| if err := t.handle(stream.Context(), msg); err != nil { | ||
| // If receive finalized the snapshot as a .fsm file (token in | ||
| // Snapshot.Data), the engine refused to apply it — likely a | ||
| // transient context cancel or raft error. Remove the on-disk | ||
| // file so retries at later indexes don't leak orphan .fsm | ||
| // payloads into fsmSnapDir until the next startup runs | ||
| // cleanupStaleFSMSnaps. Same-index retries are already safe | ||
| // because os.Rename atomically replaces the prior file. | ||
| t.removeOrphanedFSMSnapshot(msg) | ||
| return err | ||
| } | ||
| return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{})) | ||
| } | ||
|
|
||
| // removeOrphanedFSMSnapshot deletes the .fsm file that | ||
| // receiveSnapshotStream finalized for `msg`, if any. Used by | ||
| // SendSnapshot when the engine apply (`t.handle`) fails after the | ||
| // receive succeeded — the engine has NOT applied the snapshot (apply is | ||
| // synchronous to t.handle, so a non-nil return means applied_index was | ||
| // not advanced), so the file is unreferenced and safe to remove. | ||
| // | ||
| // Best-effort: a cleanup failure here is logged but not returned because | ||
| // the original apply error is the actionable signal; orphans get swept | ||
| // by cleanupStaleFSMSnaps at the next engine restart even if Remove | ||
| // races with another process. | ||
| func (t *GRPCTransport) removeOrphanedFSMSnapshot(msg raftpb.Message) { | ||
| if msg.Snapshot == nil || !isSnapshotToken(msg.Snapshot.Data) { | ||
| return | ||
| } | ||
| tok, err := decodeSnapshotToken(msg.Snapshot.Data) | ||
| if err != nil { | ||
| return | ||
| } | ||
| t.mu.RLock() | ||
| fsmSnapDir := t.fsmSnapDir | ||
| t.mu.RUnlock() | ||
| if fsmSnapDir == "" { | ||
| return | ||
| } | ||
| path := fsmSnapPath(fsmSnapDir, tok.Index) | ||
| if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) { | ||
| slog.Warn("failed to remove orphaned fsm snapshot file after apply failure", | ||
| "path", path, | ||
| "index", tok.Index, | ||
| "err", rmErr, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error) { | ||
| if req == nil { | ||
| return &pb.EtcdRaftAck{}, nil | ||
|
|
@@ -683,55 +727,154 @@ func (t *GRPCTransport) handle(ctx context.Context, msg raftpb.Message) error { | |
| return errors.WithStack(handler(ctx, msg)) | ||
| } | ||
|
|
||
| func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) { | ||
| var metadata raftpb.Message | ||
| seenMetadata := false | ||
| // snapshotSpoolPlacement returns (spoolDir, fsmSnapDir) under the transport | ||
| // lock. When fsmSnapDir is wired, the spool itself is placed inside it so | ||
| // FinalizeAsFSMFile's rename stays intra-filesystem and cannot fail with | ||
| // EXDEV. Standard engine wiring puts both under cfg.DataDir, but the | ||
| // receive code should not assume that. The legacy fallback path | ||
| // (fsmSnapDir == "") keeps the spool in spoolDir because it never renames | ||
| // — Bytes() materializes the payload in place. | ||
| func (t *GRPCTransport) snapshotSpoolPlacement() (placement, fsmSnapDir string) { | ||
| t.mu.RLock() | ||
| spoolDir := t.spoolDir | ||
| t.mu.RUnlock() | ||
| defer t.mu.RUnlock() | ||
| fsmSnapDir = t.fsmSnapDir | ||
| if fsmSnapDir != "" { | ||
| return fsmSnapDir, fsmSnapDir | ||
| } | ||
| return t.spoolDir, "" | ||
| } | ||
|
|
||
| spool, err := newSnapshotSpool(spoolDir) | ||
| func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) { | ||
| spoolPlacement, fsmSnapDir := t.snapshotSpoolPlacement() | ||
| spool, err := newSnapshotSpool(spoolPlacement) | ||
| if err != nil { | ||
| return raftpb.Message{}, err | ||
| } | ||
| defer func() { | ||
| _ = spool.Close() | ||
| // Log rather than swallow: a Close failure here points at a | ||
| // half-written spool file we couldn't clean up (disk full, | ||
| // permission flip mid-stream, …). Once FinalizeAsFSMFile has | ||
| // transferred ownership, Close is a no-op so this only fires on | ||
| // the unhappy paths that actually need an operator to look. | ||
| if closeErr := spool.Close(); closeErr != nil { | ||
| slog.Warn("snapshot spool close failed", | ||
| "spool_dir", spoolPlacement, | ||
| "err", closeErr, | ||
| ) | ||
| } | ||
| }() | ||
|
|
||
| msg, payloadBytes, err := drainSnapshotChunks(stream, spool, fsmSnapDir) | ||
| if err != nil { | ||
| return raftpb.Message{}, err | ||
| } | ||
| index := uint64(0) | ||
| if msg.Snapshot != nil { | ||
| index = msg.Snapshot.Metadata.Index | ||
| } | ||
| slog.Info("etcd raft snapshot stream received", | ||
| "index", index, | ||
| "from", msg.From, | ||
| "payload_bytes", payloadBytes, | ||
| "format", snapshotDataFormatLabel(msg.Snapshot), | ||
| ) | ||
| return msg, nil | ||
| } | ||
|
|
||
| // drainSnapshotChunks consumes the SendSnapshot stream into spool, computes | ||
| // CRC32C over the payload bytes as they hit disk, and on the final chunk | ||
| // hands off to finalizeReceivedSnapshot — which decides between the | ||
| // streaming-token path (rename to fsmSnapDir/<index>.fsm + 17-byte token | ||
| // in Snapshot.Data) and the legacy materialize fallback. Extracted from | ||
| // receiveSnapshotStream so that function stays under cyclop's complexity | ||
| // budget. | ||
| func drainSnapshotChunks( | ||
| stream pb.EtcdRaft_SendSnapshotServer, | ||
| spool *snapshotSpool, | ||
| fsmSnapDir string, | ||
| ) (raftpb.Message, int64, error) { | ||
| var metadata raftpb.Message | ||
| seenMetadata := false | ||
| // Wrap spool with crc32CWriter so the CRC accumulates as bytes hit | ||
| // disk. The CRC is only meaningful when we have an fsmSnapDir to | ||
| // finalize into; the legacy fallback path discards it. Cost is | ||
| // hashing speed (~GB/s on modern x86 with SSE 4.2 PCLMULQDQ), well | ||
| // above gRPC stream throughput so the wrapper is invisible in | ||
| // profiles. | ||
| crcWriter := newCRC32CWriter(spool) | ||
|
|
||
| var payloadBytes int64 | ||
| for { | ||
| chunk, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return raftpb.Message{}, errors.WithStack(errSnapshotStreamShort) | ||
| return raftpb.Message{}, 0, errors.WithStack(errSnapshotStreamShort) | ||
| } | ||
| return raftpb.Message{}, errors.WithStack(err) | ||
| return raftpb.Message{}, 0, errors.WithStack(err) | ||
| } | ||
| payloadBytes += int64(len(chunk.Chunk)) | ||
| seen, err := appendSnapshotChunk(&metadata, spool, chunk, seenMetadata) | ||
| seen, err := appendSnapshotChunk(&metadata, crcWriter, chunk, seenMetadata) | ||
| if err != nil { | ||
| return raftpb.Message{}, err | ||
| return raftpb.Message{}, 0, err | ||
| } | ||
| seenMetadata = seen | ||
| if chunk.Final { | ||
| msg, err := buildSnapshotMessage(metadata, spool, seenMetadata) | ||
| msg, err := finalizeReceivedSnapshot(metadata, spool, crcWriter.Sum32(), fsmSnapDir, seenMetadata) | ||
| if err != nil { | ||
| return raftpb.Message{}, err | ||
| } | ||
| index := uint64(0) | ||
| if msg.Snapshot != nil { | ||
| index = msg.Snapshot.Metadata.Index | ||
| return raftpb.Message{}, 0, err | ||
| } | ||
| slog.Info("etcd raft snapshot stream received", | ||
| "index", index, | ||
| "from", msg.From, | ||
| "payload_bytes", payloadBytes, | ||
| ) | ||
| return msg, nil | ||
| return msg, payloadBytes, nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // finalizeReceivedSnapshot picks between the streaming-token path (when an | ||
| // fsmSnapDir is wired and the snapshot's metadata index is non-zero) and the | ||
| // legacy in-memory path. The streaming path renames the spool file in place | ||
| // to fsmSnapPath(fsmSnapDir, index), embeds a 17-byte EKVT token in | ||
| // Snapshot.Data, and lets restoreSnapshotState read the payload off disk via | ||
| // io.Reader — heap usage stays flat regardless of FSM size, eliminating the | ||
| // 1.35-GiB-FSM × 2.5-GiB-container OOM hazard observed in the 2026-05-08 | ||
| // incident. The legacy path is preserved for tests and legacy receivers | ||
| // that have not wired a snapshot directory. | ||
| func finalizeReceivedSnapshot( | ||
| metadata raftpb.Message, | ||
| spool *snapshotSpool, | ||
| crc32c uint32, | ||
| fsmSnapDir string, | ||
| seenMetadata bool, | ||
| ) (raftpb.Message, error) { | ||
| if !seenMetadata || metadata.Snapshot == nil { | ||
| return raftpb.Message{}, errors.WithStack(errSnapshotMetadataNil) | ||
| } | ||
| index := metadata.Snapshot.Metadata.Index | ||
| if fsmSnapDir != "" && index > 0 { | ||
| if err := spool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c); err != nil { | ||
| return raftpb.Message{}, err | ||
| } | ||
| metadata.Snapshot.Data = encodeSnapshotToken(index, crc32c) | ||
|
Comment on lines
+852
to
+855
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The streaming branch finalizes and renames the spool into Useful? React with 👍 / 👎. |
||
| return metadata, nil | ||
| } | ||
| // Legacy fallback: full materialization. Used by tests that don't wire an | ||
| // fsmSnapDir and by the index=0 edge case (no canonical filename to | ||
| // rename to). | ||
| return buildSnapshotMessage(metadata, spool, seenMetadata) | ||
| } | ||
|
|
||
| // snapshotDataFormatLabel exists purely for the structured log line on the | ||
| // receiver — it lets an operator distinguish a streaming-token receive | ||
| // (small heap, payload on disk) from a legacy materialization (heap holds | ||
| // the full payload) at a glance, without grepping for byte counts. | ||
| func snapshotDataFormatLabel(snap *raftpb.Snapshot) string { | ||
| if snap == nil { | ||
| return "nil" | ||
| } | ||
| if isSnapshotToken(snap.Data) { | ||
| return "token" | ||
| } | ||
| return "inline" | ||
| } | ||
|
|
||
| func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb.EtcdRaftSnapshotChunk, seenMetadata bool) (bool, error) { | ||
| if len(chunk.Metadata) > 0 { | ||
| if seenMetadata { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removeOrphanedFSMSnapshotunconditionally deletesfsmSnapDir/<index>.fsmwhenevert.handlereturns an error, butt.handlein production only enqueues the message and can fail independently of whether another in-flight receive for the same snapshot index has already been accepted. In a duplicate/retry scenario (same index) one RPC can enqueue successfully while a second RPC fails (e.g.errStepQueueFull) and this delete removes the file the successful queued message still needs, causing later restore/apply to fail with missing snapshot file.Useful? React with 👍 / 👎.