Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions stream-video-android-core/api/stream-video-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -8593,8 +8593,8 @@ public final class io/getstream/video/android/core/Call {
public final fun isPinnedParticipant (Ljava/lang/String;)Z
public final fun isServerPin (Ljava/lang/String;)Z
public final fun isVideoEnabled ()Z
public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallActivationInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallActivationInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun joinAndRing (Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun kickUser (Ljava/lang/String;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -9317,6 +9317,10 @@ public final class io/getstream/video/android/core/RealtimeConnection$Reconnecti
public fun toString ()Ljava/lang/String;
}

public abstract interface class io/getstream/video/android/core/RingingCallActivationInterceptor {
public abstract fun callReadyToActivateWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/getstream/video/android/core/RingingState {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package io.getstream.video.android.core

import io.getstream.log.taggedLogger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
Expand All @@ -28,9 +30,10 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import org.webrtc.PeerConnection
import org.webrtc.PeerConnection.PeerConnectionState

private const val PEER_CONNECTION_OBSERVER_TIMEOUT = 5_000L
private const val INTERCEPTOR_TIMEOUT_MS = 5_000L

internal class ActiveStateGate(
private val coroutineScope: CoroutineScope,
Expand All @@ -44,79 +47,90 @@ internal class ActiveStateGate(
internal fun awaitAndTransition(
currentRingingState: RingingState,
call: Call,
interceptor: RingingCallActivationInterceptor?,
onReady: () -> Unit,
) {
logger.d { "[awaitAndTransition], ringingState: $currentRingingState" }
when (strategy) {
TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> {
onReady()
}

else -> {
val isIncomingOrOutgoing =
previousRingingStates.any { it is RingingState.Incoming || it is RingingState.Outgoing }

if (isIncomingOrOutgoing && currentRingingState !is RingingState.Active) {
observePeerConnection(
call,
onReady,
strategy,
)
} else if (!isIncomingOrOutgoing) {
onReady()
}
}
if (strategy == TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR) {
onReady()
return
}

val isRingingCall = previousRingingStates.any {
it is RingingState.Incoming || it is RingingState.Outgoing
}

when {
!isRingingCall -> onReady()
currentRingingState !is RingingState.Active -> observePeerConnection(
call,
interceptor,
onReady,
)
}
}

private fun observePeerConnection(call: Call, onReady: () -> Unit, strategy: TransitionToRingingStateStrategy) {
private fun observePeerConnection(
call: Call,
interceptor: RingingCallActivationInterceptor?,
onReady: () -> Unit,
) {
if (peerConnectionObserverJob?.isActive == true) return

peerConnectionObserverJob = coroutineScope.launch {
val start = System.currentTimeMillis()

val result = withTimeoutOrNull(timeoutMs) {
call.session
.filterNotNull()
.flatMapLatest { session ->

val publisherFlow = session.publisher
.filterNotNull()
.flatMapLatest { it.state }

when (strategy) {
TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> {
emptyFlow<Int>()
.map { "none" to it }
}
TransitionToRingingStateStrategy.PUBLISHER_CONNECTED -> {
publisherFlow.filter { it == PeerConnection.PeerConnectionState.CONNECTED }
.map { "publisher" to it }
}
}
}
.first()
val peerWait = async {
withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call).first() }
}

val duration = System.currentTimeMillis() - start

if (result != null) {
val (source, state) = result
logger.d {
"[observeConnection-$strategy] $source reached $state in ${duration}ms"
}
} else {
logger.w {
"[observeConnection-$strategy] Timeout after ${duration}ms"
}
val interceptorWait = interceptor?.let {
async { invokeInterceptorSafely(call, it) }
}
val peerResult = peerWait.await()
interceptorWait?.await()
logConnectionResult(peerResult, System.currentTimeMillis() - start)

if (isActive) {
onReady()
cleanup()
}
}
}

private fun buildConnectionFlow(call: Call): Flow<Unit> =
call.session
.filterNotNull()
.flatMapLatest { session ->
session.publisher
.filterNotNull()
.flatMapLatest { it.state }
.filter { it == PeerConnectionState.CONNECTED }
.map { }
}

private suspend fun invokeInterceptorSafely(call: Call, interceptor: RingingCallActivationInterceptor) {
try {
withTimeoutOrNull(INTERCEPTOR_TIMEOUT_MS) {
interceptor.callReadyToActivateWithTimeout(call)
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.e(
e,
) { "[RingingCallActivationInterceptor] interceptor threw, proceeding to Active" }
}
}

private fun logConnectionResult(result: Unit?, duration: Long) {
if (result != null) {
logger.d { "[observeConnection] Connected in ${duration}ms" }
} else {
logger.w { "[observeConnection] Timeout after ${duration}ms" }
}
}

fun cleanup() {
peerConnectionObserverJob?.cancel()
peerConnectionObserverJob = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"fastReconnectDeadlineSeconds. This constant will be removed in a future release.",
level = DeprecationLevel.WARNING,
)
const val sfuReconnectTimeoutMillis = 30_000

Check warning on line 140 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdVE2c9dKLUejOz8&open=AZ33XdVE2c9dKLUejOz8&pullRequest=1669

/**
* Outcome of a single reconnect attempt. Each reconnect method returns one of
Expand Down Expand Up @@ -264,8 +264,8 @@
*/
private var isDestroyed = false

/** Session handles all real time communication for video and audio */
internal val session: MutableStateFlow<RtcSession?> = MutableStateFlow(null)

Check warning on line 268 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Don't expose mutable flow types.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdVE2c9dKLUejOz7&open=AZ33XdVE2c9dKLUejOz7&pullRequest=1669

var sessionId = UUID.randomUUID().toString()
internal val unifiedSessionId = UUID.randomUUID().toString()
Expand Down Expand Up @@ -544,6 +544,7 @@
ring: Boolean = false,
notify: Boolean = false,
hintHighScaleLivestreamPublisher: Boolean? = null,
ringingCallActivationInterceptor: RingingCallActivationInterceptor? = null,
): Result<RtcSession> {
logger.d {
"[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions"
Expand All @@ -567,6 +568,8 @@
// Ensure factory is created with the current audioBitrateProfile before joining
ensureFactoryMatchesAudioProfile()

this.state.ringingCallActivationInterceptor = ringingCallActivationInterceptor

// the join flow should retry up to 3 times
// if the error is not permanent
// and fail immediately on permanent errors
Expand Down Expand Up @@ -845,7 +848,7 @@
* @param strategy the initial reconnection strategy requested by the caller.
* @param reason a human-readable reason for logging / tracing.
*/
internal suspend fun reconnect(

Check failure on line 851 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 31 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdVE2c9dKLUejOz-&open=AZ33XdVE2c9dKLUejOz-&pullRequest=1669
strategy: WebsocketReconnectStrategy,
reason: String,
) {
Expand Down Expand Up @@ -1862,7 +1865,7 @@
/**
* Should outlive both the call scope and the service scope and needs to be executed in the client-level scope.
* Because the call scope or service scope may be cancelled or finished while the network request is still in flight
* TODO: Run this in clientImpl.scope internally

Check warning on line 1868 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdVE2c9dKLUejOz9&open=AZ33XdVE2c9dKLUejOz9&pullRequest=1669
*/
suspend fun reject(reason: RejectReason? = null): Result<RejectCallResponse> {
logger.d { "[reject] #ringing; rejectReason: $reason, call_id:$id" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@
internal var incomingNotificationData = IncomingNotificationData(emptyMap())
private val ringingLogger by taggedLogger("RingingState")

internal var ringingCallActivationInterceptor: RingingCallActivationInterceptor? = null

fun handleEvent(event: VideoEvent) {
logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" }

Expand Down Expand Up @@ -1288,7 +1290,7 @@
_session.value?.participants?.find { it.user.id == client.userId } != null
val outgoingMembersCount = _members.value.filter { it.value.user.id != client.userId }.size
val isCallEnded: Boolean = _endedAt.value != null
val createdBySelf = createdBy?.id == client.userId

Check warning on line 1293 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "createdBySelf" local variable.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdWg2c9dKLUejO0C&open=AZ33XdWg2c9dKLUejO0C&pullRequest=1669

ringingLogger.d { "Current: ${_ringingState.value}, call_id: ${call.cid}" }

Expand All @@ -1305,7 +1307,7 @@
ringingLogger.d { "call_id: ${call.cid}, Flags: $ringingStateLogs" }

// no members - call is empty, we can join
val state: RingingState = if (hasActiveCall && !isJoinAndRingInProgress.get()) {

Check warning on line 1310 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Merge chained "if" statements into a single "when" statement.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdWg2c9dKLUejO0D&open=AZ33XdWg2c9dKLUejO0D&pullRequest=1669
/**
* Normal join, not joinAndRing
*/
Expand Down Expand Up @@ -1381,7 +1383,11 @@
ringingLogger.d { "Update: $state" }

if (state is RingingState.Active) {
activeStateGate.awaitAndTransition(ringingState.value, call) {
activeStateGate.awaitAndTransition(
ringingState.value,
call,
ringingCallActivationInterceptor,
) {
_ringingState.value = state
activeStateGate.cleanup()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call.join() writes ringingCallJoinInterceptor on CallState, but it's never cleared after the transition completes. Integrator interceptors typically capture UI/Activity context in lambdas — for a long-lived Call this is a real retention path past the join.

Suggested change
activeStateGate.cleanup()
activeStateGate.cleanup()
ringingCallJoinInterceptor = null

}
Expand Down Expand Up @@ -1792,7 +1798,7 @@
}

@Deprecated("Use updateNotification(Int, Notification) instead")
fun updateNotification(notification: Notification) {

Check warning on line 1801 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdWg2c9dKLUejO0B&open=AZ33XdWg2c9dKLUejO0B&pullRequest=1669
atomicNotification.set(notification)
}

Expand All @@ -1813,12 +1819,12 @@
private fun observeTelecomHold(repo: JetpackTelecomRepository) {
telecomHoldObserverJob?.cancel()

telecomHoldObserverJob = scope.launch(Dispatchers.Default) {

Check warning on line 1822 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Avoid hardcoded dispatchers.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdWg2c9dKLUejO0A&open=AZ33XdWg2c9dKLUejO0A&pullRequest=1669
repo.currentCall
.map { (it as? TelecomCall.Registered)?.isOnHold == true }
.distinctUntilChanged()
.filter { it }
.collect { isOnHold ->

Check warning on line 1827 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "_" instead of this unused lambda parameter "isOnHold".

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdWg2c9dKLUejOz_&open=AZ33XdWg2c9dKLUejOz_&pullRequest=1669
when (ringingState.value) {
is RingingState.Active -> {
call.leave("call-on-hold")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-video-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.video.android.core

/**
* Controls when a ringing call transitions to [RingingState.Active].
*
* Implement this to insert custom logic (e.g. waiting for user confirmation) between
* the publisher peer connection becoming ready and the call going active.
* Has no effect on non-ringing joins (livestream, direct join).
*/
public interface RingingCallActivationInterceptor {

Check warning on line 26 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallActivationInterceptor.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make this interface functional or replace it with a function type.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ33XdRs2c9dKLUejOz6&open=AZ33XdRs2c9dKLUejOz6&pullRequest=1669

/**
* Called when the SDK is ready to transition to [RingingState.Active].
* Suspend here to delay the transition; return to allow it to proceed.
*
* The SDK enforces a 5-second maximum — the transition proceeds automatically on timeout.
*/
public suspend fun callReadyToActivateWithTimeout(call: Call)
}
Loading
Loading