Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 166 additions & 23 deletions internal/raftengine/etcd/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"io"
"log/slog"
"os"
"sync"
"time"

Expand Down Expand Up @@ -378,11 +379,54 @@ func (t *GRPCTransport) SendSnapshot(stream pb.EtcdRaft_SendSnapshotServer) erro
return err
}
if err := t.handle(stream.Context(), msg); err != nil {
// If receive finalized the snapshot as a .fsm file (token in
// Snapshot.Data), the engine refused to apply it — likely a
// transient context cancel or raft error. Remove the on-disk
// file so retries at later indexes don't leak orphan .fsm
// payloads into fsmSnapDir until the next startup runs
// cleanupStaleFSMSnaps. Same-index retries are already safe
// because os.Rename atomically replaces the prior file.
t.removeOrphanedFSMSnapshot(msg)
return err
}
return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{}))
}

// removeOrphanedFSMSnapshot deletes the .fsm file that
// receiveSnapshotStream finalized for `msg`, if any. Used by
// SendSnapshot when the engine apply (`t.handle`) fails after the
// receive succeeded — the engine has NOT applied the snapshot (apply is
// synchronous to t.handle, so a non-nil return means applied_index was
// not advanced), so the file is unreferenced and safe to remove.
//
// Best-effort: a cleanup failure here is logged but not returned because
// the original apply error is the actionable signal; orphans get swept
// by cleanupStaleFSMSnaps at the next engine restart even if Remove
// races with another process.
func (t *GRPCTransport) removeOrphanedFSMSnapshot(msg raftpb.Message) {
if msg.Snapshot == nil || !isSnapshotToken(msg.Snapshot.Data) {
return
}
tok, err := decodeSnapshotToken(msg.Snapshot.Data)
if err != nil {
return
}
t.mu.RLock()
fsmSnapDir := t.fsmSnapDir
t.mu.RUnlock()
if fsmSnapDir == "" {
return
}
path := fsmSnapPath(fsmSnapDir, tok.Index)
if rmErr := os.Remove(path); rmErr != nil && !os.IsNotExist(rmErr) {
Comment on lines +420 to +421
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid deleting shared snapshot file on handler error

removeOrphanedFSMSnapshot unconditionally deletes fsmSnapDir/<index>.fsm whenever t.handle returns an error, but t.handle in production only enqueues the message and can fail independently of whether another in-flight receive for the same snapshot index has already been accepted. In a duplicate/retry scenario (same index) one RPC can enqueue successfully while a second RPC fails (e.g. errStepQueueFull) and this delete removes the file the successful queued message still needs, causing later restore/apply to fail with missing snapshot file.

Useful? React with 👍 / 👎.

slog.Warn("failed to remove orphaned fsm snapshot file after apply failure",
"path", path,
"index", tok.Index,
"err", rmErr,
)
}
}

func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb.EtcdRaftAck, error) {
if req == nil {
return &pb.EtcdRaftAck{}, nil
Expand Down Expand Up @@ -683,55 +727,154 @@ func (t *GRPCTransport) handle(ctx context.Context, msg raftpb.Message) error {
return errors.WithStack(handler(ctx, msg))
}

func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
var metadata raftpb.Message
seenMetadata := false
// snapshotSpoolPlacement returns (spoolDir, fsmSnapDir) under the transport
// lock. When fsmSnapDir is wired, the spool itself is placed inside it so
// FinalizeAsFSMFile's rename stays intra-filesystem and cannot fail with
// EXDEV. Standard engine wiring puts both under cfg.DataDir, but the
// receive code should not assume that. The legacy fallback path
// (fsmSnapDir == "") keeps the spool in spoolDir because it never renames
// — Bytes() materializes the payload in place.
func (t *GRPCTransport) snapshotSpoolPlacement() (placement, fsmSnapDir string) {
t.mu.RLock()
spoolDir := t.spoolDir
t.mu.RUnlock()
defer t.mu.RUnlock()
fsmSnapDir = t.fsmSnapDir
if fsmSnapDir != "" {
return fsmSnapDir, fsmSnapDir
}
return t.spoolDir, ""
}

spool, err := newSnapshotSpool(spoolDir)
func (t *GRPCTransport) receiveSnapshotStream(stream pb.EtcdRaft_SendSnapshotServer) (raftpb.Message, error) {
spoolPlacement, fsmSnapDir := t.snapshotSpoolPlacement()
spool, err := newSnapshotSpool(spoolPlacement)
if err != nil {
return raftpb.Message{}, err
}
defer func() {
_ = spool.Close()
// Log rather than swallow: a Close failure here points at a
// half-written spool file we couldn't clean up (disk full,
// permission flip mid-stream, …). Once FinalizeAsFSMFile has
// transferred ownership, Close is a no-op so this only fires on
// the unhappy paths that actually need an operator to look.
if closeErr := spool.Close(); closeErr != nil {
slog.Warn("snapshot spool close failed",
"spool_dir", spoolPlacement,
"err", closeErr,
)
}
}()

msg, payloadBytes, err := drainSnapshotChunks(stream, spool, fsmSnapDir)
if err != nil {
return raftpb.Message{}, err
}
index := uint64(0)
if msg.Snapshot != nil {
index = msg.Snapshot.Metadata.Index
}
slog.Info("etcd raft snapshot stream received",
"index", index,
"from", msg.From,
"payload_bytes", payloadBytes,
"format", snapshotDataFormatLabel(msg.Snapshot),
)
return msg, nil
}

// drainSnapshotChunks consumes the SendSnapshot stream into spool, computes
// CRC32C over the payload bytes as they hit disk, and on the final chunk
// hands off to finalizeReceivedSnapshot — which decides between the
// streaming-token path (rename to fsmSnapDir/<index>.fsm + 17-byte token
// in Snapshot.Data) and the legacy materialize fallback. Extracted from
// receiveSnapshotStream so that function stays under cyclop's complexity
// budget.
func drainSnapshotChunks(
stream pb.EtcdRaft_SendSnapshotServer,
spool *snapshotSpool,
fsmSnapDir string,
) (raftpb.Message, int64, error) {
var metadata raftpb.Message
seenMetadata := false
// Wrap spool with crc32CWriter so the CRC accumulates as bytes hit
// disk. The CRC is only meaningful when we have an fsmSnapDir to
// finalize into; the legacy fallback path discards it. Cost is
// hashing speed (~GB/s on modern x86 with SSE 4.2 PCLMULQDQ), well
// above gRPC stream throughput so the wrapper is invisible in
// profiles.
crcWriter := newCRC32CWriter(spool)

var payloadBytes int64
for {
chunk, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return raftpb.Message{}, errors.WithStack(errSnapshotStreamShort)
return raftpb.Message{}, 0, errors.WithStack(errSnapshotStreamShort)
}
return raftpb.Message{}, errors.WithStack(err)
return raftpb.Message{}, 0, errors.WithStack(err)
}
payloadBytes += int64(len(chunk.Chunk))
seen, err := appendSnapshotChunk(&metadata, spool, chunk, seenMetadata)
seen, err := appendSnapshotChunk(&metadata, crcWriter, chunk, seenMetadata)
if err != nil {
return raftpb.Message{}, err
return raftpb.Message{}, 0, err
}
seenMetadata = seen
if chunk.Final {
msg, err := buildSnapshotMessage(metadata, spool, seenMetadata)
msg, err := finalizeReceivedSnapshot(metadata, spool, crcWriter.Sum32(), fsmSnapDir, seenMetadata)
if err != nil {
return raftpb.Message{}, err
}
index := uint64(0)
if msg.Snapshot != nil {
index = msg.Snapshot.Metadata.Index
return raftpb.Message{}, 0, err
}
slog.Info("etcd raft snapshot stream received",
"index", index,
"from", msg.From,
"payload_bytes", payloadBytes,
)
return msg, nil
return msg, payloadBytes, nil
}
}
}

// finalizeReceivedSnapshot picks between the streaming-token path (when an
// fsmSnapDir is wired and the snapshot's metadata index is non-zero) and the
// legacy in-memory path. The streaming path renames the spool file in place
// to fsmSnapPath(fsmSnapDir, index), embeds a 17-byte EKVT token in
// Snapshot.Data, and lets restoreSnapshotState read the payload off disk via
// io.Reader — heap usage stays flat regardless of FSM size, eliminating the
// 1.35-GiB-FSM × 2.5-GiB-container OOM hazard observed in the 2026-05-08
// incident. The legacy path is preserved for tests and legacy receivers
// that have not wired a snapshot directory.
func finalizeReceivedSnapshot(
metadata raftpb.Message,
spool *snapshotSpool,
crc32c uint32,
fsmSnapDir string,
seenMetadata bool,
) (raftpb.Message, error) {
if !seenMetadata || metadata.Snapshot == nil {
return raftpb.Message{}, errors.WithStack(errSnapshotMetadataNil)
}
index := metadata.Snapshot.Metadata.Index
if fsmSnapDir != "" && index > 0 {
if err := spool.FinalizeAsFSMFile(fsmSnapDir, index, crc32c); err != nil {
return raftpb.Message{}, err
}
metadata.Snapshot.Data = encodeSnapshotToken(index, crc32c)
Comment on lines +852 to +855
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Delete finalized snapshot when post-receive handling fails

The streaming branch finalizes and renames the spool into fsmSnapDir before the message is handed to SendSnapshot’s t.handle path, but FinalizeAsFSMFile clears the spool path so deferred spool.Close() cannot clean it up afterward. If t.handle returns an error (for example during transient engine/raft failures), SendSnapshot returns failure to the sender while leaving the newly written .fsm file behind with no corresponding applied snapshot, which can accumulate orphaned large files across retries with different snapshot indexes.

Useful? React with 👍 / 👎.

return metadata, nil
}
// Legacy fallback: full materialization. Used by tests that don't wire an
// fsmSnapDir and by the index=0 edge case (no canonical filename to
// rename to).
return buildSnapshotMessage(metadata, spool, seenMetadata)
}

// snapshotDataFormatLabel exists purely for the structured log line on the
// receiver — it lets an operator distinguish a streaming-token receive
// (small heap, payload on disk) from a legacy materialization (heap holds
// the full payload) at a glance, without grepping for byte counts.
func snapshotDataFormatLabel(snap *raftpb.Snapshot) string {
if snap == nil {
return "nil"
}
if isSnapshotToken(snap.Data) {
return "token"
}
return "inline"
}

func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb.EtcdRaftSnapshotChunk, seenMetadata bool) (bool, error) {
if len(chunk.Metadata) > 0 {
if seenMetadata {
Expand Down
Loading
Loading