4242import os
4343import sys
4444import time
45- from typing import Any , Callable , Optional , Tuple
45+ from typing import Optional , Tuple
4646
4747import numpy as np
4848import pytest
4949
5050from livekit import rtc
5151from livekit .rtc ._proto .track_publication_pb2 import VideoQuality
52- from livekit .rtc .room import EventTypes
5352
54- from utils import create_token , skip_if_no_credentials , unique_room_name # type: ignore[import-not-found]
53+ from utils import ( # type: ignore[import-not-found]
54+ await_event ,
55+ create_token ,
56+ ensure_rooms_all_connected ,
57+ ensure_track_subscribed ,
58+ expect_room_event ,
59+ skip_if_no_credentials ,
60+ unique_room_name ,
61+ )
5562
5663
57- WAIT_TIMEOUT = 30.0
58- WAIT_INTERVAL = 0.1
5964PUBLISH_WIDTH = 1280
6065PUBLISH_HEIGHT = 720
6166PUBLISH_FPS = 15
7479]
7580
7681
77- async def _wait_until (
78- predicate : Callable [[], bool ],
79- * ,
80- timeout : float = WAIT_TIMEOUT ,
81- interval : float = WAIT_INTERVAL ,
82- message : str = "condition not met" ,
83- ) -> None :
84- loop = asyncio .get_running_loop ()
85- deadline = loop .time () + timeout
86- while loop .time () < deadline :
87- if predicate ():
88- return
89- await asyncio .sleep (interval )
90- raise AssertionError (f"timeout waiting: { message } " )
91-
92-
93- async def _ensure_all_connected (rooms : list [rtc .Room ]) -> None :
94- await _wait_until (
95- lambda : all (r .connection_state == rtc .ConnectionState .CONN_CONNECTED for r in rooms ),
96- message = "not all rooms reached CONN_CONNECTED" ,
97- )
98-
99-
100- async def _ensure_track_subscribed (room : rtc .Room , track_sid : str ) -> rtc .RemoteTrackPublication :
101- holder : dict [str , rtc .RemoteTrackPublication ] = {}
102-
103- def _has_subscribed () -> bool :
104- for participant in room .remote_participants .values ():
105- pub = participant .track_publications .get (track_sid )
106- if pub is not None and pub .subscribed and pub .track is not None :
107- holder ["pub" ] = pub
108- return True
109- return False
110-
111- await _wait_until (
112- _has_subscribed ,
113- message = f"room did not subscribe to track { track_sid } " ,
114- )
115- return holder ["pub" ]
116-
117-
118- def _expect_event (
119- room : rtc .Room ,
120- event : EventTypes ,
121- predicate : Optional [Callable [..., bool ]] = None ,
122- ) -> asyncio .Future :
123- loop = asyncio .get_running_loop ()
124- fut : asyncio .Future = loop .create_future ()
125-
126- def _on_event (* args : Any , ** kwargs : Any ) -> None :
127- if fut .done ():
128- return
129- if predicate is None or predicate (* args , ** kwargs ):
130- fut .set_result (args )
131- room .off (event , _on_event )
132-
133- room .on (event , _on_event )
134- return fut
135-
136-
137- async def _await_event (fut : asyncio .Future , timeout : float = WAIT_TIMEOUT ) -> None :
138- try :
139- await asyncio .wait_for (fut , timeout = timeout )
140- except asyncio .TimeoutError as e :
141- raise AssertionError ("timed out waiting for event" ) from e
142-
143-
14482def _make_rolling_i420 (width : int , height : int , t : float ) -> rtc .VideoFrame :
14583 """Build a 1280x720 I420 frame containing 8 vertical color bars that scroll
14684 horizontally over time, so the encoder always sees motion."""
@@ -259,7 +197,7 @@ async def test_simulcast_quality_layers(
259197 sender , receiver = rtc .Room (), rtc .Room ()
260198 await sender .connect (url , create_token ("sender" , room_name ))
261199 await receiver .connect (url , create_token ("receiver" , room_name ))
262- await _ensure_all_connected ([sender , receiver ])
200+ await ensure_rooms_all_connected ([sender , receiver ])
263201
264202 source = rtc .VideoSource (PUBLISH_WIDTH , PUBLISH_HEIGHT )
265203 track = rtc .LocalVideoTrack .create_video_track (f"{ mode } -{ codec_name } " , source )
@@ -282,21 +220,21 @@ async def test_simulcast_quality_layers(
282220
283221 stream : Optional [rtc .VideoStream ] = None
284222 try :
285- track_published = _expect_event (
223+ track_published = expect_room_event (
286224 receiver ,
287225 "track_published" ,
288226 predicate = lambda pub , _p : pub .kind == rtc .TrackKind .KIND_VIDEO ,
289227 )
290228 local_pub = await sender .local_participant .publish_track (track , options )
291- await _await_event (track_published )
229+ await await_event (track_published )
292230
293231 print (
294232 f"[{ codec_name } ] local_pub: sid={ local_pub .sid } "
295233 f"simulcasted={ local_pub .simulcasted } "
296234 f"mime_type={ local_pub .mime_type } "
297235 f"{ local_pub .width } x{ local_pub .height } "
298236 )
299- remote_pub = await _ensure_track_subscribed (receiver , local_pub .sid )
237+ remote_pub = await ensure_track_subscribed (receiver , local_pub .sid )
300238 assert remote_pub .track is not None
301239 print (
302240 f"[{ codec_name } ] remote_pub: sid={ remote_pub .sid } "
0 commit comments