Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions pkg/dmsg/server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,50 @@ func (ss *ServerSession) bridgeStream(log logrus.FieldLogger, yStr io.ReadWriteC
}
log.Debug("Forwarded stream response.")

// Clear the read deadline before the long-lived bidirectional copy.
if conn, ok := yStr.(net.Conn); ok {
conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec
}
// Set an idle timeout on both sides of the bridge. If no data flows
// in either direction for this duration, both streams are closed.
// Without this, half-dead connections (client disconnected without
// sending FIN) cause goroutines to leak indefinitely — observed as
// 55K+ stuck goroutines in production dmsg servers.
const streamIdleTimeout = 5 * time.Minute

// Wrap both streams with idle-timeout deadlines.
yStr = &idleTimeoutConn{rwc: yStr, timeout: streamIdleTimeout}
yStr2 = &idleTimeoutConn{rwc: yStr2, timeout: streamIdleTimeout}

log.Info("Serving stream.")
ss.m.RecordStream(metrics.DeltaConnect)
defer ss.m.RecordStream(metrics.DeltaDisconnect)
return netutil.CopyReadWriteCloser(yStr, yStr2)
}

// idleTimeoutConn wraps a ReadWriteCloser with per-operation deadlines.
// If the underlying connection supports SetReadDeadline/SetWriteDeadline,
// each Read/Write resets the deadline. If the connection goes idle (no data
// in either direction), the deadline fires and the blocked io.Copy returns.
type idleTimeoutConn struct {
rwc io.ReadWriteCloser
timeout time.Duration
}

func (c *idleTimeoutConn) Read(p []byte) (int, error) {
if conn, ok := c.rwc.(net.Conn); ok {
conn.SetReadDeadline(time.Now().Add(c.timeout)) //nolint:errcheck,gosec
}
return c.rwc.Read(p)
}

func (c *idleTimeoutConn) Write(p []byte) (int, error) {
if conn, ok := c.rwc.(net.Conn); ok {
conn.SetWriteDeadline(time.Now().Add(c.timeout)) //nolint:errcheck,gosec
}
return c.rwc.Write(p)
}

func (c *idleTimeoutConn) Close() error {
return c.rwc.Close()
}

// forwardViaPeer tries to forward a stream request through peer server sessions.
// This is only called for client-originated requests (not peer-originated, enforcing 1-hop max).
func (ss *ServerSession) forwardViaPeer(log logrus.FieldLogger, yStr io.ReadWriteCloser, req StreamRequest) error {
Expand Down