Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Server struct {
punchSourceLast map[string]time.Time // source IP → last allowed punch time
lastPunchTime atomic.Int64 // UnixNano of last global punch (rate limit)

// Per-source relay rate limiters (SEC-037).
relayRateMu sync.Mutex // protects relaySourceCount
relaySourceCount map[uint32]*relaySourceWindow // senderID → sliding window state

// Peer mesh (gossip)
beaconID uint32
peers []*net.UDPAddr // peer beacon addresses (slow path, peerMu)
Expand Down Expand Up @@ -118,6 +122,21 @@ const (
punchRateCleanupInterval = 5 * time.Minute // how often stale source entries are swept
)

// relaySourceWindow tracks a single source's relay count in the current 1-second window.
type relaySourceWindow struct {
windowStart int64 // UnixNano of current 1-second window start
count uint32 // relay count in current window
}

// maxRelaysPerSourcePerSecond caps relays a single sender can push through
// dispatchRelay per second. Set generously (1000 relays/sec) — a legitimate
// source behind NAT may relay traffic for many agents, but a DoS source
// flooding relays to a known target can saturate the 524288-deep relayCh
// and cause queue-full drops for everyone. This cap prevents one source
// from consuming more than ~0.2% of total queue capacity per second.
const maxRelaysPerSourcePerSecond = 1000
const relaySourceCleanupInterval = 5 * time.Minute

func New() *Server {
return NewWithPeers(0, nil)
}
Expand All @@ -132,7 +151,8 @@ func NewWithPeers(beaconID uint32, peers []string) *Server {
relayCh: make(chan relayJob, relayQueueSize),
beaconID: beaconID,
done: make(chan struct{}),
punchSourceLast: make(map[string]time.Time),
punchSourceLast: make(map[string]time.Time),
relaySourceCount: make(map[uint32]*relaySourceWindow),
}
emptyPeers := make(map[uint32]*net.UDPAddr)
s.peerNodes.Store(&emptyPeers)
Expand Down Expand Up @@ -653,6 +673,29 @@ func (s *Server) dispatchRelay(data []byte) {
}
}

// Per-source rate limit: prevent one sender from flooding the relay
// queue and squeezing out legitimate traffic. A DoS source targeting
// a known destination can saturate the 524288-deep relayCh at rates
// far above normal — this cap gives each source a fixed share.
now := time.Now().UnixNano()
s.relayRateMu.Lock()
w, ok := s.relaySourceCount[senderID]
if !ok || now-w.windowStart >= int64(time.Second) {
// New 1-second window.
s.relaySourceCount[senderID] = &relaySourceWindow{windowStart: now, count: 1}
s.relayRateMu.Unlock()
} else if w.count >= maxRelaysPerSourcePerSecond {
// Source exceeded per-second budget — silently drop.
// The sender's daemon retries (3-attempt path in
// pkg/daemon/daemon.go relay branch), so a drop here
// is eventually self-healing for honest senders.
s.relayRateMu.Unlock()
return
} else {
w.count++
s.relayRateMu.Unlock()
}

// Copy payload into a pooled buffer so we don't hold the read buffer
payload := data[8:]
if len(payload) > maxRelayPayload {
Expand Down Expand Up @@ -941,6 +984,16 @@ func (s *Server) reapStaleNodes() {
}
}
s.punchRateMu.Unlock()

// Sweep stale relay-source entries.
s.relayRateMu.Lock()
cutoffNs := time.Now().Add(-relaySourceCleanupInterval).UnixNano()
for id, w := range s.relaySourceCount {
if w.windowStart < cutoffNs {
delete(s.relaySourceCount, id)
}
}
s.relayRateMu.Unlock()
}

// --- Gossip ---
Expand Down
Loading