Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Config struct {

MediaTimeout time.Duration `yaml:"media_timeout"`
MediaTimeoutInitial time.Duration `yaml:"media_timeout_initial"`
SymmetricRTP bool `yaml:"symmetric_rtp"`
Codecs map[string]bool `yaml:"codecs"`

// HideInboundPort controls how SIP endpoint responds to unverified inbound requests.
Expand Down
1 change: 1 addition & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: c.s.conf.MediaTimeout,
SymmetricRTP: conf.SymmetricRTP,
EnableJitterBuffer: c.jitterBuf,
LogSignalChanges: logSignalChanges,
Stats: &c.stats.Port,
Expand Down
14 changes: 11 additions & 3 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,16 @@ type UDPConn interface {
WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error)
}

func newUDPConn(log logger.Logger, conn UDPConn) *udpConn {
return &udpConn{UDPConn: conn, log: log, stopped: make(chan struct{})}
func newUDPConn(log logger.Logger, conn UDPConn, symmetricRTP bool) *udpConn {
return &udpConn{UDPConn: conn, log: log, stopped: make(chan struct{}), symmetricRTP: symmetricRTP}
}

type udpConn struct {
UDPConn
stopping core.Fuse
stopped chan struct{}
log logger.Logger
symmetricRTP bool
src atomic.Pointer[netip.AddrPort]
dst atomic.Pointer[netip.AddrPort]
}
Expand Down Expand Up @@ -231,6 +232,12 @@ func (c *udpConn) Read(b []byte) (n int, err error) {
} else if *prev != addr {
c.log.Infow("changing media source", "addr", addr.String())
}
if c.symmetricRTP {
dst := c.dst.Load()
if dst == nil || !dst.IsValid() || *dst != addr {
c.SetDst(addr)
}
}
return n, err
}

Expand Down Expand Up @@ -289,6 +296,7 @@ type MediaOptions struct {
Ports rtcconfig.PortRange
MediaTimeoutInitial time.Duration
MediaTimeout time.Duration
SymmetricRTP bool
Stats *PortStats
EnableJitterBuffer bool
NoInputResample bool
Expand Down Expand Up @@ -335,7 +343,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
timeoutResetTick: make(chan time.Duration, 1),
jitterEnabled: opts.EnableJitterBuffer,
logSignalChanges: opts.LogSignalChanges,
port: newUDPConn(log, conn),
port: newUDPConn(log, conn, opts.SymmetricRTP),
audioOut: msdk.NewSwitchWriter(sampleRate),
audioIn: msdk.NewSwitchWriter(inSampleRate),
stats: opts.Stats,
Expand Down
51 changes: 51 additions & 0 deletions pkg/sip/media_port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,54 @@ func TestMediaTimeout(t *testing.T) {
}
})
}

func TestSymmetricRTP(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{SymmetricRTP: false}, nil)
dstPtr := m1.port.dst.Load()
require.NotNil(t, dstPtr)
dst := *dstPtr
require.True(t, dst.IsValid())

c2 := m2.port.UDPConn.(*testUDPConn)
newAddr := netip.AddrPortFrom(newIP("9.9.9.9"), 9999)
c2.addr = newAddr

err := m2.GetAudioWriter().WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-m1.Received():
case <-time.After(time.Second):
t.Fatal("no media received")
}

curDstPtr := m1.port.dst.Load()
require.NotNil(t, curDstPtr)
require.Equal(t, dst, *curDstPtr)
})

t.Run("enabled", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{SymmetricRTP: true}, nil)
dstPtr := m1.port.dst.Load()
require.NotNil(t, dstPtr)
require.True(t, dstPtr.IsValid())

c2 := m2.port.UDPConn.(*testUDPConn)
newAddr := netip.AddrPortFrom(newIP("9.9.9.9"), 9999)
c2.addr = newAddr

err := m2.GetAudioWriter().WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-m1.Received():
case <-time.After(time.Second):
t.Fatal("no media received")
}

curDstPtr := m1.port.dst.Load()
require.NotNil(t, curDstPtr)
require.Equal(t, newAddr, *curDstPtr)
})
}
1 change: 1 addition & 0 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
Ports: conf.RTPPort,
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: c.conf.MediaTimeout,
SymmetricRTP: c.conf.SymmetricRTP,
EnableJitterBuffer: call.jitterBuf,
LogSignalChanges: signalLoggingEnabled,
Stats: &call.stats.Port,
Expand Down
Loading