Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions generate-sfu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env bash

set -e

REPO_URL="git@github.com:GetStream/protocol.git"
REFERENCE_TYPE="branch"
REFERENCE_VALUE="main"

PROJECT_ROOT="$(dirname "$(realpath "$0")")/"
BUILD_DIR="$PROJECT_ROOT/build"
CLONE_DIR="$BUILD_DIR/protocol-repo"
OUTPUT_CLIENT_PATH="$PROJECT_ROOT/stream-video-android-core/src/main/proto/video/sfu/"

# Step 1: Delete OUTPUT_CLIENT_PATH if exists else create an empty directory
echo "🧹 Preparing output directory: $OUTPUT_CLIENT_PATH"
rm -rf "$OUTPUT_CLIENT_PATH"
mkdir -p "$OUTPUT_CLIENT_PATH"

# Step 2: Clone the repository with shallow depth
echo "🚀 Cloning repository: $REPO_URL (Type: $REFERENCE_TYPE, Value: $REFERENCE_VALUE)..."
rm -rf "$CLONE_DIR"
mkdir -p "$BUILD_DIR"
git clone --depth=1 --branch "$REFERENCE_VALUE" "$REPO_URL" "$CLONE_DIR"

cd "$CLONE_DIR"

# Step 3: Checkout to the correct branch, tag, or commit
if [ "$REFERENCE_TYPE" == "branch" ]; then
git checkout "$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "tag" ]; then
git fetch --tags
git checkout "tags/$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "commit" ]; then
git fetch --depth=1 origin "$REFERENCE_VALUE"
git checkout "$REFERENCE_VALUE"
Comment thread
rahul-lohra marked this conversation as resolved.
else
echo "❌ ERROR: Invalid reference type '$REFERENCE_TYPE'. Use 'branch', 'tag', or 'commit'."
exit 1
fi

# Step 4: Copy content from CLONE_DIR/protobuf/video/sfu to OUTPUT_CLIENT_PATH
echo "📦 Copying proto files..."
SOURCE_PROTO_PATH="$CLONE_DIR/protobuf/video/sfu"

if [ ! -d "$SOURCE_PROTO_PATH" ]; then
echo "❌ ERROR: Source proto directory does not exist: $SOURCE_PROTO_PATH"
exit 1
fi

cp -R "$SOURCE_PROTO_PATH/"* "$OUTPUT_CLIENT_PATH"

# Step 5: Run Spotless (from project root, not inside cloned repo)
echo "✨ Running Spotless..."
cd "$PROJECT_ROOT"
./gradlew spotlessApply

# Step 6: Delete CLONE_DIR
echo "🗑 Cleaning up cloned repo..."
rm -rf "$CLONE_DIR"

echo "✅ Done."
205 changes: 186 additions & 19 deletions stream-video-android-core/api/stream-video-android-core.api

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.map
import kotlin.collections.none
import kotlin.collections.toMutableList
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
Expand Down Expand Up @@ -208,6 +211,8 @@
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}

private typealias SessionId = String

/**
* The CallState class keeps all state for a call
* It's available on every call object
Expand Down Expand Up @@ -309,12 +314,12 @@
private val _dominantSpeaker: MutableStateFlow<ParticipantState?> = MutableStateFlow(null)
public val dominantSpeaker: StateFlow<ParticipantState?> = _dominantSpeaker

internal val _localPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
internal val _localPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())

Check warning on line 318 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Don't expose mutable flow types.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3ayUrR6yxXw8Uoybwt&open=AZ3ayUrR6yxXw8Uoybwt&pullRequest=1644
internal val _serverPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
internal val _serverPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())

Check warning on line 320 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Don't expose mutable flow types.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3ayUrR6yxXw8Uoybwu&open=AZ3ayUrR6yxXw8Uoybwu&pullRequest=1644

internal val _pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> =
internal val _pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> =
combine(_localPins, _serverPins) { local, server ->
val combined = mutableMapOf<String, PinUpdateAtTime>()
combined.putAll(local)
Expand All @@ -327,7 +332,7 @@
/**
* Pinned participants, combined value both from server and local pins.
*/
val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants
val pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = _pinnedParticipants

val stats = CallStats(call, scope)

Expand Down Expand Up @@ -1075,6 +1080,7 @@
getOrCreateParticipants(pendingParticipantsJoined.values.toList())
}
}
updateServerSidePins(internalParticipants, event)
} catch (e: Exception) {
logger.e(e) {
"[ParticipantJoinedEvent] #participants; #debounce; Failed to debounce, processing as usual."
Expand All @@ -1099,6 +1105,7 @@
}

if (_serverPins.value.containsKey(sessionId)) {
_serverPins.value = _serverPins.value.filter { it.key != event.participant.session_id }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be the actual root cause of the "Failed: User B rejoin" scenario in the PR description, rather than the sessionId mismatch? JS doesn't do this — its leave handler just removes the participant locally, leaving the server-side pin alone, which is why JS rejoin works. Android calls unpinForEveryone here, so when User B leaves the server is told to unpin them; by the time they rejoin, is_pinned on the server is false and the new event handler has nothing to act on.

scope.launch {
call.unpinForEveryone(sessionId, event.participant.user_id)
}
Expand Down Expand Up @@ -1265,6 +1272,27 @@
}
}

internal fun updateServerSidePins(internalParticipants: Map<SessionId, ParticipantState>, event: ParticipantJoinedEvent) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new method is internal so directly testable. Worth adding a few cases to test the new behaviour?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

val participantSessionId = event.participant.session_id
if (internalParticipants.containsKey(participantSessionId)) {
if (event.isPinned) {
val pinUpdate =
PinUpdate(event.participant.user_id, participantSessionId)
val tempPinUpdateList = _serverPins.value.map { it.value.it }
val participantIsNotPresent =
tempPinUpdateList.none { it.sessionId == participantSessionId }
if (participantIsNotPresent) {
val updatedList = tempPinUpdateList.toMutableList().apply {
add(pinUpdate)
}

Check warning on line 1287 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make this collection immutable.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3ayUrR6yxXw8Uoybwv&open=AZ3ayUrR6yxXw8Uoybwv&pullRequest=1644
updateServerSidePins(updatedList)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateServerSidePins(updatedList) rebuilds the entire _serverPins map with OffsetDateTime.now() for every entry — so when one new pinned participant joins, every other pinned user's timestamp also gets refreshed. Is that intentional?

}
} else {
_serverPins.value = _serverPins.value.filter { it.key != participantSessionId }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new participantSessionId can't already be in _serverPins (sessionIds are unique per session) — so this filter looks like a no-op for the isPinned == false branch. Defensive, or am I missing a case where the sessionId could repeat?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this code

}
}
}

private fun updateRingingState(rejectReason: RejectReason? = null) {
when (ringingState.value) {
RingingState.TimeoutNoAnswer, RingingState.RejectedByAll -> {
Expand All @@ -1288,7 +1316,7 @@
_session.value?.participants?.find { it.user.id == client.userId } != null
val outgoingMembersCount = _members.value.filter { it.value.user.id != client.userId }.size
val isCallEnded: Boolean = _endedAt.value != null
val createdBySelf = createdBy?.id == client.userId

Check warning on line 1319 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused "createdBySelf" local variable.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3N40bRP2HSqMZhWMYg&open=AZ3N40bRP2HSqMZhWMYg&pullRequest=1644

ringingLogger.d { "Current: ${_ringingState.value}, call_id: ${call.cid}" }

Expand All @@ -1305,7 +1333,7 @@
ringingLogger.d { "call_id: ${call.cid}, Flags: $ringingStateLogs" }

// no members - call is empty, we can join
val state: RingingState = if (hasActiveCall && !isJoinAndRingInProgress.get()) {

Check warning on line 1336 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Merge chained "if" statements into a single "when" statement.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3N40bRP2HSqMZhWMYh&open=AZ3N40bRP2HSqMZhWMYh&pullRequest=1644
/**
* Normal join, not joinAndRing
*/
Expand Down Expand Up @@ -1792,7 +1820,7 @@
}

@Deprecated("Use updateNotification(Int, Notification) instead")
fun updateNotification(notification: Notification) {

Check warning on line 1823 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do not forget to remove this deprecated code someday.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3N40bRP2HSqMZhWMYf&open=AZ3N40bRP2HSqMZhWMYf&pullRequest=1644
atomicNotification.set(notification)
}

Expand All @@ -1813,12 +1841,12 @@
private fun observeTelecomHold(repo: JetpackTelecomRepository) {
telecomHoldObserverJob?.cancel()

telecomHoldObserverJob = scope.launch(Dispatchers.Default) {

Check warning on line 1844 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Avoid hardcoded dispatchers.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3N40bRP2HSqMZhWMYe&open=AZ3N40bRP2HSqMZhWMYe&pullRequest=1644
repo.currentCall
.map { (it as? TelecomCall.Registered)?.isOnHold == true }
.distinctUntilChanged()
.filter { it }
.collect { isOnHold ->

Check warning on line 1849 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "_" instead of this unused lambda parameter "isOnHold".

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ3N40bRP2HSqMZhWMYd&open=AZ3N40bRP2HSqMZhWMYd&pullRequest=1644
when (ringingState.value) {
is RingingState.Active -> {
call.leave("call-on-hold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public object RTCEventMapper {
}

event.participant_joined != null -> with(event.participant_joined) {
ParticipantJoinedEvent(participant!!, call_cid)
ParticipantJoinedEvent(participant!!, call_cid, is_pinned)
}

event.participant_left != null -> with(event.participant_left) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import stream.video.sfu.signal.ICERestartResponse
import stream.video.sfu.signal.ICETrickleResponse
import stream.video.sfu.signal.SendAnswerRequest
import stream.video.sfu.signal.SendAnswerResponse
import stream.video.sfu.signal.SendMetricsRequest
import stream.video.sfu.signal.SendMetricsResponse
import stream.video.sfu.signal.SendStatsRequest
import stream.video.sfu.signal.SendStatsResponse
import stream.video.sfu.signal.SetPublisherRequest
Expand Down Expand Up @@ -117,6 +119,9 @@ internal class RetryableSignalingServiceDecorator(
override suspend fun sendStats(sendStatsRequest: SendStatsRequest): SendStatsResponse =
retryCall({ it.error }) { decorated.sendStats(sendStatsRequest) }

override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse =
retryCall({ null }) { decorated.sendMetrics(sendMetricsRequest) }

override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse =
retryCall({ it.error }) { decorated.startNoiseCancellation(startNoiseCancellationRequest) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ public data class TrackUnpublishedEvent(
) : SfuDataEvent()

public data class ParticipantJoinedEvent(

val participant: Participant,
val callCid: String,
val isPinned: Boolean,
) : SfuDataEvent()

public data class ParticipantLeftEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import stream.video.sfu.signal.ICERestartResponse
import stream.video.sfu.signal.ICETrickleResponse
import stream.video.sfu.signal.SendAnswerRequest
import stream.video.sfu.signal.SendAnswerResponse
import stream.video.sfu.signal.SendMetricsRequest
import stream.video.sfu.signal.SendMetricsResponse
import stream.video.sfu.signal.SendStatsRequest
import stream.video.sfu.signal.SendStatsResponse
import stream.video.sfu.signal.SetPublisherRequest
Expand Down Expand Up @@ -97,6 +99,9 @@ internal class SignalingServiceTracerDecorator<T : SignalServerService>(
override suspend fun sendStats(sendStatsRequest: SendStatsRequest): SendStatsResponse =
target.sendStats(sendStatsRequest)

override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse =
target.sendMetrics(sendMetricsRequest)

override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse =
traced("startNoiseCancellation", startNoiseCancellationRequest, { it.error }) {
target.startNoiseCancellation(startNoiseCancellationRequest)
Expand Down
Loading
Loading