-
Notifications
You must be signed in to change notification settings - Fork 58
Improve pinned participant state on rejoin with new session #1644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
97f19f4
8c09bdf
56a0249
a8921c7
70bfcc8
1c763bf
dada793
24ad3a4
bd10d0e
350a430
4056ffd
c72056d
6530dcf
cd0b197
63b30fe
c5e5c00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| set -e | ||
|
|
||
| REPO_URL="git@github.com:GetStream/protocol.git" | ||
| REFERENCE_TYPE="branch" | ||
| REFERENCE_VALUE="main" | ||
|
|
||
| PROJECT_ROOT="$(dirname "$(realpath "$0")")/" | ||
| BUILD_DIR="$PROJECT_ROOT/build" | ||
| CLONE_DIR="$BUILD_DIR/protocol-repo" | ||
| OUTPUT_CLIENT_PATH="$PROJECT_ROOT/stream-video-android-core/src/main/proto/video/sfu/" | ||
|
|
||
| # Step 1: Delete OUTPUT_CLIENT_PATH if exists else create an empty directory | ||
| echo "🧹 Preparing output directory: $OUTPUT_CLIENT_PATH" | ||
| rm -rf "$OUTPUT_CLIENT_PATH" | ||
| mkdir -p "$OUTPUT_CLIENT_PATH" | ||
|
|
||
| # Step 2: Clone the repository with shallow depth | ||
| echo "🚀 Cloning repository: $REPO_URL (Type: $REFERENCE_TYPE, Value: $REFERENCE_VALUE)..." | ||
| rm -rf "$CLONE_DIR" | ||
| mkdir -p "$BUILD_DIR" | ||
| git clone --depth=1 --branch "$REFERENCE_VALUE" "$REPO_URL" "$CLONE_DIR" | ||
|
|
||
| cd "$CLONE_DIR" | ||
|
|
||
| # Step 3: Checkout to the correct branch, tag, or commit | ||
| if [ "$REFERENCE_TYPE" == "branch" ]; then | ||
| git checkout "$REFERENCE_VALUE" | ||
| elif [ "$REFERENCE_TYPE" == "tag" ]; then | ||
| git fetch --tags | ||
| git checkout "tags/$REFERENCE_VALUE" | ||
| elif [ "$REFERENCE_TYPE" == "commit" ]; then | ||
| git fetch --depth=1 origin "$REFERENCE_VALUE" | ||
| git checkout "$REFERENCE_VALUE" | ||
| else | ||
| echo "❌ ERROR: Invalid reference type '$REFERENCE_TYPE'. Use 'branch', 'tag', or 'commit'." | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Step 4: Copy content from CLONE_DIR/protobuf/video/sfu to OUTPUT_CLIENT_PATH | ||
| echo "📦 Copying proto files..." | ||
| SOURCE_PROTO_PATH="$CLONE_DIR/protobuf/video/sfu" | ||
|
|
||
| if [ ! -d "$SOURCE_PROTO_PATH" ]; then | ||
| echo "❌ ERROR: Source proto directory does not exist: $SOURCE_PROTO_PATH" | ||
| exit 1 | ||
| fi | ||
|
|
||
| cp -R "$SOURCE_PROTO_PATH/"* "$OUTPUT_CLIENT_PATH" | ||
|
|
||
| # Step 5: Run Spotless (from project root, not inside cloned repo) | ||
| echo "✨ Running Spotless..." | ||
| cd "$PROJECT_ROOT" | ||
| ./gradlew spotlessApply | ||
|
|
||
| # Step 6: Delete CLONE_DIR | ||
| echo "🗑 Cleaning up cloned repo..." | ||
| rm -rf "$CLONE_DIR" | ||
|
|
||
| echo "✅ Done." | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,6 +165,9 @@ | |
| import java.util.concurrent.ConcurrentHashMap | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import java.util.concurrent.atomic.AtomicReference | ||
| import kotlin.collections.map | ||
| import kotlin.collections.none | ||
| import kotlin.collections.toMutableList | ||
| import kotlin.time.Duration | ||
| import kotlin.time.DurationUnit | ||
| import kotlin.time.toDuration | ||
|
|
@@ -208,6 +211,8 @@ | |
| public data object Disconnected : RealtimeConnection // normal disconnect by the app | ||
| } | ||
|
|
||
| private typealias SessionId = String | ||
|
|
||
| /** | ||
| * The CallState class keeps all state for a call | ||
| * It's available on every call object | ||
|
|
@@ -309,12 +314,12 @@ | |
| private val _dominantSpeaker: MutableStateFlow<ParticipantState?> = MutableStateFlow(null) | ||
| public val dominantSpeaker: StateFlow<ParticipantState?> = _dominantSpeaker | ||
|
|
||
| internal val _localPins: MutableStateFlow<Map<String, PinUpdateAtTime>> = | ||
| internal val _localPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> = | ||
| MutableStateFlow(emptyMap()) | ||
|
Check warning on line 318 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| internal val _serverPins: MutableStateFlow<Map<String, PinUpdateAtTime>> = | ||
| internal val _serverPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> = | ||
| MutableStateFlow(emptyMap()) | ||
|
Check warning on line 320 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
|
|
||
| internal val _pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = | ||
| internal val _pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = | ||
| combine(_localPins, _serverPins) { local, server -> | ||
| val combined = mutableMapOf<String, PinUpdateAtTime>() | ||
| combined.putAll(local) | ||
|
|
@@ -327,7 +332,7 @@ | |
| /** | ||
| * Pinned participants, combined value both from server and local pins. | ||
| */ | ||
| val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants | ||
| val pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = _pinnedParticipants | ||
|
|
||
| val stats = CallStats(call, scope) | ||
|
|
||
|
|
@@ -1075,6 +1080,7 @@ | |
| getOrCreateParticipants(pendingParticipantsJoined.values.toList()) | ||
| } | ||
| } | ||
| updateServerSidePins(internalParticipants, event) | ||
| } catch (e: Exception) { | ||
| logger.e(e) { | ||
| "[ParticipantJoinedEvent] #participants; #debounce; Failed to debounce, processing as usual." | ||
|
|
@@ -1099,6 +1105,7 @@ | |
| } | ||
|
|
||
| if (_serverPins.value.containsKey(sessionId)) { | ||
| _serverPins.value = _serverPins.value.filter { it.key != event.participant.session_id } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be the actual root cause of the "Failed: User B rejoin" scenario in the PR description, rather than the sessionId mismatch? JS doesn't do this — its leave handler just removes the participant locally, leaving the server-side pin alone, which is why JS rejoin works. Android calls |
||
| scope.launch { | ||
| call.unpinForEveryone(sessionId, event.participant.user_id) | ||
| } | ||
|
|
@@ -1265,6 +1272,27 @@ | |
| } | ||
| } | ||
|
|
||
| internal fun updateServerSidePins(internalParticipants: Map<SessionId, ParticipantState>, event: ParticipantJoinedEvent) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new method is internal so directly testable. Worth adding a few cases to test the new behaviour?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
| val participantSessionId = event.participant.session_id | ||
| if (internalParticipants.containsKey(participantSessionId)) { | ||
| if (event.isPinned) { | ||
| val pinUpdate = | ||
| PinUpdate(event.participant.user_id, participantSessionId) | ||
| val tempPinUpdateList = _serverPins.value.map { it.value.it } | ||
| val participantIsNotPresent = | ||
| tempPinUpdateList.none { it.sessionId == participantSessionId } | ||
| if (participantIsNotPresent) { | ||
| val updatedList = tempPinUpdateList.toMutableList().apply { | ||
| add(pinUpdate) | ||
| } | ||
|
Check warning on line 1287 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| updateServerSidePins(updatedList) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } else { | ||
| _serverPins.value = _serverPins.value.filter { it.key != participantSessionId } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove this code |
||
| } | ||
| } | ||
| } | ||
|
|
||
| private fun updateRingingState(rejectReason: RejectReason? = null) { | ||
| when (ringingState.value) { | ||
| RingingState.TimeoutNoAnswer, RingingState.RejectedByAll -> { | ||
|
|
@@ -1288,7 +1316,7 @@ | |
| _session.value?.participants?.find { it.user.id == client.userId } != null | ||
| val outgoingMembersCount = _members.value.filter { it.value.user.id != client.userId }.size | ||
| val isCallEnded: Boolean = _endedAt.value != null | ||
| val createdBySelf = createdBy?.id == client.userId | ||
|
Check warning on line 1319 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
|
|
||
| ringingLogger.d { "Current: ${_ringingState.value}, call_id: ${call.cid}" } | ||
|
|
||
|
|
@@ -1305,7 +1333,7 @@ | |
| ringingLogger.d { "call_id: ${call.cid}, Flags: $ringingStateLogs" } | ||
|
|
||
| // no members - call is empty, we can join | ||
| val state: RingingState = if (hasActiveCall && !isJoinAndRingInProgress.get()) { | ||
|
Check warning on line 1336 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| /** | ||
| * Normal join, not joinAndRing | ||
| */ | ||
|
|
@@ -1792,7 +1820,7 @@ | |
| } | ||
|
|
||
| @Deprecated("Use updateNotification(Int, Notification) instead") | ||
| fun updateNotification(notification: Notification) { | ||
|
Check warning on line 1823 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| atomicNotification.set(notification) | ||
| } | ||
|
|
||
|
|
@@ -1813,12 +1841,12 @@ | |
| private fun observeTelecomHold(repo: JetpackTelecomRepository) { | ||
| telecomHoldObserverJob?.cancel() | ||
|
|
||
| telecomHoldObserverJob = scope.launch(Dispatchers.Default) { | ||
|
Check warning on line 1844 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| repo.currentCall | ||
| .map { (it as? TelecomCall.Registered)?.isOnHold == true } | ||
| .distinctUntilChanged() | ||
| .filter { it } | ||
| .collect { isOnHold -> | ||
|
Check warning on line 1849 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
|
||
| when (ringingState.value) { | ||
| is RingingState.Active -> { | ||
| call.leave("call-on-hold") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.