Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/src/main/java/com/pedro/common/base/BaseSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ abstract class BaseSender(
protected set
@Volatile
protected var bytesSendPerSecond = 0L
@Volatile
protected var videoBytesSendPerSecond = 0L
@Volatile
protected var audioBytesSendPerSecond = 0L
@Volatile
private var currentVideoBitrate = 0L
@Volatile
private var currentAudioBitrate = 0L

abstract fun setVideoInfo(sps: ByteBuffer, pps: ByteBuffer?, vps: ByteBuffer?)
abstract fun setAudioInfo(sampleRate: Int, isStereo: Boolean)
Expand Down Expand Up @@ -70,7 +78,11 @@ abstract class BaseSender(
while (scope.isActive && running) {
//bytes to bits
bitrateManager.calculateBitrate(bytesSendPerSecond * 8)
currentVideoBitrate = videoBytesSendPerSecond * 8
currentAudioBitrate = audioBytesSendPerSecond * 8
bytesSendPerSecond = 0
videoBytesSendPerSecond = 0
audioBytesSendPerSecond = 0
delay(timeMillis = 1000)
}
}
Expand Down Expand Up @@ -151,7 +163,21 @@ abstract class BaseSender(
queue.setCacheTime(delay)
}

/**
* Get the current video bitrate in bits per second.
*/
fun getVideoBitrate(): Long = currentVideoBitrate

/**
* Get the current audio bitrate in bits per second.
*/
fun getAudioBitrate(): Long = currentAudioBitrate

fun resetBytesSend() {
bytesSend = 0
videoBytesSendPerSecond = 0
audioBytesSendPerSecond = 0
currentVideoBitrate = 0
currentAudioBitrate = 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,42 @@ class RtmpStreamClient(
fun shouldFailOnRead(enabled: Boolean) {
rtmpClient.shouldFailOnRead = enabled
}

/**
* Get the current video bitrate in bits per second.
*/
fun getVideoBitrate(): Long = rtmpClient.getVideoBitrate()

/**
* Get the current audio bitrate in bits per second.
*/
fun getAudioBitrate(): Long = rtmpClient.getAudioBitrate()

/**
* Get the current RTT (Round Trip Time) in milliseconds.
* This is measured using RTMP User Control ping-pong messages.
*/
fun getRtt(): Long = rtmpClient.getRtt()

/**
* Get the average RTT (Round Trip Time) in milliseconds.
* This is the average of all RTT measurements since the last reset.
*/
fun getAverageRtt(): Long = rtmpClient.getAverageRtt()

/**
* Send a ping request to measure RTT.
* The RTT will be updated when the server responds with a pong.
* Call getRtt() or getAverageRtt() to get the measured values.
*/
fun measureRtt() {
rtmpClient.measureRtt()
}

/**
* Reset RTT tracking data.
*/
fun resetRtt() {
rtmpClient.resetRtt()
}
}
10 changes: 10 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/CommandsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ abstract class CommandsManager {
}
}

suspend fun sendPing(timestamp: Int, socket: RtmpSocket) {
writeSync.withLock {
val ping = UserControl(Type.PING_REQUEST, Event(timestamp))
ping.writeHeader(socket)
ping.writeBody(socket)
socket.flush()
Log.i(TAG, "send ping with timestamp $timestamp")
}
}

@Throws(IOException::class)
suspend fun sendClose(socket: RtmpSocket) {
writeSync.withLock {
Expand Down
74 changes: 74 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,40 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
var socketTimeout = StreamSocket.DEFAULT_TIMEOUT
var shouldFailOnRead = false

// RTT tracking
@Volatile
private var rtt = 0L // Current RTT in milliseconds
@Volatile
private var rttSum = 0L // Sum of all RTT measurements
@Volatile
private var rttCount = 0L // Number of RTT measurements
@Volatile
private var lastPingTimestamp = 0L // Timestamp when last ping was sent
@Volatile
private var lastPingSequence = 0 // Sequence number for ping-pong matching

/**
* Get the current RTT (Round Trip Time) in milliseconds.
* This is measured using RTMP User Control ping-pong messages.
*/
fun getRtt(): Long = rtt

/**
* Get the average RTT (Round Trip Time) in milliseconds.
* This is the average of all RTT measurements since the last reset.
*/
fun getAverageRtt(): Long = if (rttCount > 0) rttSum / rttCount else 0L

/**
* Get the current video bitrate in bits per second.
*/
fun getVideoBitrate(): Long = rtmpSender.getVideoBitrate()

/**
* Get the current audio bitrate in bits per second.
*/
fun getAudioBitrate(): Long = rtmpSender.getAudioBitrate()

/**
* Add certificates for TLS connection
*/
Expand Down Expand Up @@ -390,6 +424,17 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
Type.PING_REQUEST -> {
commandsManager.sendPong(userControl.event, socket)
}
Type.PONG_REPLY -> {
// Calculate RTT when we receive a pong response that matches our ping sequence
if (lastPingTimestamp > 0 && userControl.event.data == lastPingSequence) {
val currentRtt = TimeUtils.getCurrentTimeMillis() - lastPingTimestamp
rtt = currentRtt
rttSum += currentRtt
rttCount++
lastPingTimestamp = 0
Log.i(TAG, "RTT measured: $currentRtt ms, average: ${getAverageRtt()} ms")
}
}
else -> {
Log.i(TAG, "user control command $type ignored")
}
Expand Down Expand Up @@ -566,6 +611,7 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
scope = CoroutineScope(Dispatchers.IO)
publishPermitted = false
commandsManager.reset()
resetRtt()
}

fun sendVideo(videoBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
Expand Down Expand Up @@ -606,6 +652,17 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
rtmpSender.resetBytesSend()
}

/**
* Reset RTT tracking data.
*/
fun resetRtt() {
rtt = 0
rttSum = 0
rttCount = 0
lastPingTimestamp = 0
lastPingSequence = 0
}

@Throws(RuntimeException::class)
fun resizeCache(newSize: Int) {
rtmpSender.resizeCache(newSize)
Expand Down Expand Up @@ -633,4 +690,21 @@ class RtmpClient(private val connectChecker: ConnectChecker) {
* Get the exponential factor used to calculate the bitrate. Default 1f
*/
fun getBitrateExponentialFactor() = rtmpSender.getBitrateExponentialFactor()

/**
* Send a ping request to measure RTT.
* The RTT will be updated when the server responds with a pong.
* Call getRtt() or getAverageRtt() to get the measured values.
*/
fun measureRtt() {
if (isStreaming && socket != null) {
scope.launch {
val s = socket ?: return@launch
lastPingTimestamp = TimeUtils.getCurrentTimeMillis()
// Increment sequence and wrap to 0 when reaching max value to stay positive
lastPingSequence = if (lastPingSequence >= Int.MAX_VALUE - 1) 0 else lastPingSequence + 1
commandsManager.sendPing(lastPingSequence, s)
}
}
}
}
2 changes: 2 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class RtmpSender(
Log.i(TAG, "wrote Video packet, size $size")
}
}
videoBytesSendPerSecond += size
} else {
audioFramesSent++
socket?.let { socket ->
Expand All @@ -97,6 +98,7 @@ class RtmpSender(
Log.i(TAG, "wrote Audio packet, size $size")
}
}
audioBytesSendPerSecond += size
}
bytesSend += size
bytesSendPerSecond += size
Expand Down