Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Server struct {
punchRateMu sync.Mutex // protects punchSourceLast
punchSourceLast map[string]time.Time // source IP → last allowed punch time
lastPunchTime atomic.Int64 // UnixNano of last global punch (rate limit)
discoverRateMu sync.Mutex // protects discoverRateLast
discoverRateLast map[uint32]time.Time // nodeID → last allowed discover endpoint update

// Peer mesh (gossip)
beaconID uint32
Expand Down Expand Up @@ -116,6 +118,7 @@ const (
maxPunchPerSecond = 10 // global hard cap on punch commands per second
punchPerSourceInterval = time.Second // min interval between punches from same source
punchRateCleanupInterval = 5 * time.Minute // how often stale source entries are swept
discoverMinInterval = 30 * time.Second // min interval between endpoint updates from same nodeID
)

func New() *Server {
Expand All @@ -132,7 +135,8 @@ func NewWithPeers(beaconID uint32, peers []string) *Server {
relayCh: make(chan relayJob, relayQueueSize),
beaconID: beaconID,
done: make(chan struct{}),
punchSourceLast: make(map[string]time.Time),
punchSourceLast: make(map[string]time.Time),
discoverRateLast: make(map[uint32]time.Time),
}
emptyPeers := make(map[uint32]*net.UDPAddr)
s.peerNodes.Store(&emptyPeers)
Expand Down Expand Up @@ -500,9 +504,20 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) {

nodeID := binary.BigEndian.Uint32(data[0:4])

// Record this node's observed public endpoint. Sharded — no global lock.
if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap {
return // shard at capacity — drop silently
// Per-nodeID endpoint update rate limit — prevents a single
// nodeID from flapping its endpoint via rapid Discover messages.
s.discoverRateMu.Lock()
if last, ok := s.discoverRateLast[nodeID]; ok && time.Since(last) < discoverMinInterval {
s.discoverRateMu.Unlock()
// Rate-limited: skip the Upsert but still reply with the
// observed address so the node learns its public endpoint.
} else {
s.discoverRateLast[nodeID] = time.Now()
s.discoverRateMu.Unlock()
// Record this node's observed public endpoint. Sharded — no global lock.
if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap {
return // shard at capacity — drop silently
}
}

slog.Debug("beacon discover", "node_id", nodeID, "addr", remote)
Expand Down Expand Up @@ -941,6 +956,16 @@ func (s *Server) reapStaleNodes() {
}
}
s.punchRateMu.Unlock()

// Sweep stale discover-rate entries.
s.discoverRateMu.Lock()
discoverCutoff := time.Now().Add(-discoverMinInterval * 2)
for id, last := range s.discoverRateLast {
if last.Before(discoverCutoff) {
delete(s.discoverRateLast, id)
}
}
s.discoverRateMu.Unlock()
}

// --- Gossip ---
Expand Down
Loading