Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
}, nil
}

// Reset clears the RTPBuffer of all packets and resets its state.
func (r *RTPBuffer) Reset() {
for i := range r.packets {
r.packets[i] = nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it sets packets to nil without calling Release():

func (r *RTPBuffer) Reset() {
    for i := range r.packets {
        r.packets[i] = nil // Release() not called
    }
}

RetainablePacket uses reference counting — when Release() decrements count to 0, it calls onRelease to return the header and payload buffer back to sync.Pool. Without calling Release(), the header (func1) and payload buffer (func2) allocated by PacketFactoryCopy are never returned to the pool.

This is consistent with how RTPBuffer.Add() already handles slot replacement:

prevPacket := r.packets[idx]
if prevPacket != nil {
    prevPacket.Release() // existing code properly releases
}
r.packets[idx] = packet

Suggested fix:

func (r *RTPBuffer) Reset() {
    for i := range r.packets {
        if r.packets[i] != nil {
            r.packets[i].Release()
            r.packets[i] = nil
        }
    }
    r.highestAdded = 0
    r.started = false
}

We hit the same issue described in #404 in production — heap profiles show steady accumulation of NewPacketFactoryCopy.func1 (headerPool) and func2 (payloadPool), which would not be resolved without the Release() call.

}
r.highestAdded = 0
r.started = false
}

// Add places the RetainablePacket in the RTPBuffer.
func (r *RTPBuffer) Add(packet *RetainablePacket) {
seq := packet.sequenceNumber
Expand Down
5 changes: 5 additions & 0 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (n *ResponderInterceptor) BindLocalStream(
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.streamsMu.Lock()
if stream, ok := n.streams[info.SSRC]; ok {
stream.rtpBufferMutex.Lock()
stream.rtpBuffer.Reset()
stream.rtpBufferMutex.Unlock()
}
delete(n.streams, info.SSRC)
n.streamsMu.Unlock()
}
Expand Down
Loading