Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (r *RTPBuffer) Add(packet *RetainablePacket) {
r.packets[idx] = packet
}

// Clear releases all retained packets in the buffer.
func (r *RTPBuffer) Clear() {
for i, pkt := range r.packets {
if pkt != nil {
pkt.Release()
r.packets[i] = nil
}
}
r.started = false
}

// Get returns the RetainablePacket for the requested sequence number.
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
diff := r.highestAdded - seq
Expand Down
84 changes: 84 additions & 0 deletions internal/rtpbuffer/rtpbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,90 @@ func TestRTPBuffer_Overridden_WithRTX_NILPayload(t *testing.T) {
require.Nil(t, sb.Get(1))
}

func TestRTPBuffer_Clear(t *testing.T) {
pm := NewPacketFactoryCopy()
sb, err := NewRTPBuffer(8)
require.NoError(t, err)

// Add packets and verify they exist
for i := uint16(0); i < 8; i++ {
pkt, pktErr := pm.NewPacket(&rtp.Header{SequenceNumber: i}, []byte("payload"), 0, 0)
require.NoError(t, pktErr)
sb.Add(pkt)
}

// Verify packets are in buffer
for i := uint16(0); i < 8; i++ {
pkt := sb.Get(i)
require.NotNil(t, pkt, "packet %d should exist before Clear", i)
pkt.Release()
}

// Clear the buffer
sb.Clear()

// Verify all slots are nil
for _, pkt := range sb.packets {
require.Nil(t, pkt, "all packets should be nil after Clear")
}

// Verify buffer can be reused after Clear
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: 100}, []byte("new"), 0, 0)
require.NoError(t, err)
sb.Add(pkt)

got := sb.Get(100)
require.NotNil(t, got, "buffer should be usable after Clear")
got.Release()
}

func TestRTPBuffer_ClearReleasesPacketsToPool(t *testing.T) {
pm := NewPacketFactoryCopy()
sb, err := NewRTPBuffer(4)
require.NoError(t, err)

// Add packets and track their ref counts
packets := make([]*RetainablePacket, 4)
for i := uint16(0); i < 4; i++ {
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: i}, []byte("data"), 0, 0)
require.NoError(t, err)
packets[i] = pkt
sb.Add(pkt)
}

// All packets should have count 1
for i, pkt := range packets {
require.Equal(t, 1, pkt.count, "packet %d should have count 1 before Clear", i)
}

sb.Clear()

// All packets should have count 0 (released)
for i, pkt := range packets {
require.Equal(t, 0, pkt.count, "packet %d should have count 0 after Clear", i)
}
}

func TestRTPBuffer_ClearPartiallyFilled(t *testing.T) {
pm := NewPacketFactoryCopy()
sb, err := NewRTPBuffer(8)
require.NoError(t, err)

// Only add 3 packets to an 8-slot buffer
for i := uint16(0); i < 3; i++ {
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: i}, []byte("data"), 0, 0)
require.NoError(t, err)
sb.Add(pkt)
}

// Should not panic with nil slots
sb.Clear()

for _, pkt := range sb.packets {
require.Nil(t, pkt)
}
}

func TestRTPBuffer_Padding(t *testing.T) {
pm := NewPacketFactoryCopy()
sb, err := NewRTPBuffer(1)
Expand Down
23 changes: 23 additions & 0 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,31 @@ func (n *ResponderInterceptor) BindLocalStream(
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.streamsMu.Lock()
stream, ok := n.streams[info.SSRC]
delete(n.streams, info.SSRC)
n.streamsMu.Unlock()

if ok {
stream.rtpBufferMutex.Lock()
stream.rtpBuffer.Clear()
stream.rtpBufferMutex.Unlock()
}
}

// Close releases all resources held by the ResponderInterceptor.
func (n *ResponderInterceptor) Close() error {
n.streamsMu.Lock()
streams := n.streams
n.streams = map[uint32]*localStream{}
n.streamsMu.Unlock()

for _, stream := range streams {
stream.rtpBufferMutex.Lock()
stream.rtpBuffer.Clear()
stream.rtpBufferMutex.Unlock()
}

return nil
}

func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
Expand Down
138 changes: 138 additions & 0 deletions pkg/nack/responder_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,144 @@ func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) {
}
}

func TestResponderInterceptor_UnbindReleasesBufferedPackets(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(8),
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

resp, ok := i.(*ResponderInterceptor)
require.True(t, ok)

info := &interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}

writer := resp.BindLocalStream(info, interceptor.RTPWriterFunc(
func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
return len(payload), nil
},
))

// Send some packets to fill the buffer
for seqNum := uint16(0); seqNum < 5; seqNum++ {
_, err := writer.Write(&rtp.Header{SequenceNumber: seqNum, SSRC: 1}, []byte("payload"), interceptor.Attributes{})
require.NoError(t, err)
}

// Get a reference to the stream's buffer to check after unbind
resp.streamsMu.Lock()
stream := resp.streams[info.SSRC]
resp.streamsMu.Unlock()
require.NotNil(t, stream)

// Grab references to the packets before unbind
stream.rtpBufferMutex.RLock()
var packets []*rtpbuffer.RetainablePacket
for seqNum := uint16(0); seqNum < 5; seqNum++ {
pkt := stream.rtpBuffer.Get(seqNum)
if pkt != nil {
packets = append(packets, pkt)
}
}
stream.rtpBufferMutex.RUnlock()
require.NotEmpty(t, packets, "should have buffered packets")

// Release our Get() references
for _, pkt := range packets {
pkt.Release()
}

// Unbind should release all buffered packets
resp.UnbindLocalStream(info)

// Verify stream was removed
resp.streamsMu.Lock()
_, exists := resp.streams[info.SSRC]
resp.streamsMu.Unlock()
require.False(t, exists, "stream should be removed after UnbindLocalStream")

// Verify packets were released (Retain should fail)
for _, pkt := range packets {
err := pkt.Retain()
require.Error(t, err, "packet should be released after UnbindLocalStream")
}
}

func TestResponderInterceptor_CloseReleasesAllStreams(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(8),
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

resp, ok := i.(*ResponderInterceptor)
require.True(t, ok)

// Bind two streams
var allPackets []*rtpbuffer.RetainablePacket
for ssrc := uint32(1); ssrc <= 2; ssrc++ {
info := &interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}

writer := resp.BindLocalStream(info, interceptor.RTPWriterFunc(
func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
return len(payload), nil
},
))

for seqNum := uint16(0); seqNum < 4; seqNum++ {
_, err := writer.Write(&rtp.Header{SequenceNumber: seqNum, SSRC: ssrc}, []byte("data"), interceptor.Attributes{})
require.NoError(t, err)
}

// Get references to packets to verify release later
resp.streamsMu.Lock()
stream := resp.streams[ssrc]
resp.streamsMu.Unlock()

stream.rtpBufferMutex.RLock()
for seqNum := uint16(0); seqNum < 4; seqNum++ {
pkt := stream.rtpBuffer.Get(seqNum)
if pkt != nil {
allPackets = append(allPackets, pkt)
}
}
stream.rtpBufferMutex.RUnlock()
}

// Release our Get() references
for _, pkt := range allPackets {
pkt.Release()
}

require.NotEmpty(t, allPackets, "should have packets from both streams")

// Close should release all
require.NoError(t, resp.Close())

// All streams should be gone
resp.streamsMu.Lock()
require.Empty(t, resp.streams, "streams map should be empty after Close")
resp.streamsMu.Unlock()

// All packets should be released
for _, pkt := range allPackets {
err := pkt.Retain()
require.Error(t, err, "packet should be released after Close")
}
}

// reentrantRTPWriter tries to re-acquire localStream.rtpBufferMutex inside Write.
// If BindLocalStream's wrapper calls writer.Write while holding that mutex, this
// will deadlock.
Expand Down
Loading