Skip to content

Commit b231592

Browse files
committed
E2E test for audio/video publish/subcribe.
1 parent fbbe6f3 commit b231592

2 files changed

Lines changed: 568 additions & 0 deletions

File tree

livekit-rtc/tests/test_audio.py

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
# Copyright 2026 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""End-to-end audio and video publish/subscribe tests."""
16+
17+
import asyncio
18+
import ctypes
19+
import math
20+
import os
21+
import shutil
22+
import subprocess
23+
import uuid
24+
import wave
25+
from pathlib import Path
26+
27+
import numpy as np
28+
import pytest
29+
30+
from livekit import api, rtc
31+
from livekit.rtc.audio_frame import AudioFrame
32+
33+
34+
SAMPLE_RATE = 48000
35+
NUM_CHANNELS = 1
36+
TONE_DURATION_SEC = 1.0
37+
FREQUENCIES_HZ = [100, 300, 500, 700, 1000]
38+
AMPLITUDE = 0.5
39+
40+
VIDEO_WIDTH = 640
41+
VIDEO_HEIGHT = 480
42+
VIDEO_FPS = 15
43+
VIDEO_COLOR_DURATION_SEC = 1.0
44+
# (name, RGB tuple) — order matters; the subscriber must see them in this sequence.
45+
VIDEO_COLOR_SEQUENCE: list[tuple[str, tuple[int, int, int]]] = [
46+
("red", (255, 0, 0)),
47+
("green", (0, 255, 0)),
48+
("blue", (0, 0, 255)),
49+
("white", (255, 255, 255)),
50+
("black", (0, 0, 0)),
51+
]
52+
53+
54+
def skip_if_no_credentials():
55+
required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"]
56+
missing = [var for var in required_vars if not os.getenv(var)]
57+
return pytest.mark.skipif(
58+
bool(missing), reason=f"Missing environment variables: {', '.join(missing)}"
59+
)
60+
61+
62+
def create_token(identity: str, room_name: str) -> str:
63+
return (
64+
api.AccessToken()
65+
.with_identity(identity)
66+
.with_name(identity)
67+
.with_grants(
68+
api.VideoGrants(
69+
room_join=True,
70+
room=room_name,
71+
)
72+
)
73+
.to_jwt()
74+
)
75+
76+
77+
def unique_room_name(base: str) -> str:
78+
return f"{base}-{uuid.uuid4().hex[:8]}"
79+
80+
81+
def _generate_sine_wave(
82+
frequency: int,
83+
sample_rate: int,
84+
num_channels: int,
85+
duration_sec: float,
86+
amplitude: float = 0.5,
87+
) -> AudioFrame:
88+
"""Generate an AudioFrame containing a sine wave at the given frequency."""
89+
samples_per_channel = int(sample_rate * duration_sec)
90+
t = np.arange(samples_per_channel, dtype=np.float64) / sample_rate
91+
wave_signal = np.sin(2.0 * math.pi * frequency * t) * amplitude
92+
pcm = (wave_signal * np.iinfo(np.int16).max).astype(np.int16)
93+
94+
if num_channels > 1:
95+
pcm = np.repeat(pcm[:, np.newaxis], num_channels, axis=1).reshape(-1)
96+
97+
return AudioFrame(
98+
data=pcm.tobytes(),
99+
sample_rate=sample_rate,
100+
num_channels=num_channels,
101+
samples_per_channel=samples_per_channel,
102+
)
103+
104+
105+
def _frame_to_mono_float(frame: AudioFrame) -> np.ndarray:
106+
"""Decode an int16 AudioFrame into a normalized float64 mono signal."""
107+
samples = np.frombuffer(bytes(frame.data.cast("B")), dtype=np.int16).astype(np.float64)
108+
if frame.num_channels > 1:
109+
samples = samples.reshape(-1, frame.num_channels).mean(axis=1)
110+
return samples / float(np.iinfo(np.int16).max)
111+
112+
113+
def _fft_spectrum(frame: AudioFrame) -> tuple[np.ndarray, np.ndarray]:
114+
"""Return (freqs, magnitudes) from a Hann-windowed rfft of `frame`."""
115+
signal = _frame_to_mono_float(frame)
116+
window = np.hanning(len(signal))
117+
# Compensate for the Hann window's coherent gain so magnitudes stay comparable.
118+
spectrum = np.fft.rfft(signal * window) / (np.sum(window) / 2.0)
119+
magnitudes = np.abs(spectrum)
120+
freqs = np.fft.rfftfreq(len(signal), d=1.0 / frame.sample_rate)
121+
return freqs, magnitudes
122+
123+
124+
def _detect_peak_frequency(frame: AudioFrame) -> float:
125+
"""Return the frequency bin with the largest magnitude in `frame`."""
126+
freqs, magnitudes = _fft_spectrum(frame)
127+
return float(freqs[int(np.argmax(magnitudes))])
128+
129+
130+
def _band_energies(
131+
frame: AudioFrame,
132+
centers: list[int],
133+
bandwidth_hz: float = 20.0,
134+
) -> dict[int, float]:
135+
"""Sum squared-magnitude (energy) in narrow bands centered at each frequency."""
136+
freqs, magnitudes = _fft_spectrum(frame)
137+
power = magnitudes**2
138+
return {
139+
center: float(np.sum(power[(freqs >= center - bandwidth_hz) & (freqs <= center + bandwidth_hz)]))
140+
for center in centers
141+
}
142+
143+
144+
@skip_if_no_credentials()
145+
class TestAudioStreamPublishSubscribe:
146+
"""End-to-end: publish a sine sweep into a room and verify spectrum on the subscriber."""
147+
148+
async def test_audio_stream_publish_subscribe(self):
149+
"""Publish 5 seconds of 100/300/500/700/1000 Hz tones and FFT-verify received audio."""
150+
url = os.environ["LIVEKIT_URL"]
151+
room_name = unique_room_name("test-audio-sweep")
152+
153+
publisher_room = rtc.Room()
154+
subscriber_room = rtc.Room()
155+
156+
publisher_token = create_token("audio-sweep-publisher", room_name)
157+
subscriber_token = create_token("audio-sweep-subscriber", room_name)
158+
159+
track_subscribed_event = asyncio.Event()
160+
subscribed_track: rtc.Track | None = None
161+
162+
@subscriber_room.on("track_subscribed")
163+
def on_track_subscribed(
164+
track: rtc.Track,
165+
publication: rtc.RemoteTrackPublication,
166+
participant: rtc.RemoteParticipant,
167+
):
168+
nonlocal subscribed_track
169+
if track.kind == rtc.TrackKind.KIND_AUDIO:
170+
subscribed_track = track
171+
track_subscribed_event.set()
172+
173+
try:
174+
await subscriber_room.connect(url, subscriber_token)
175+
await publisher_room.connect(url, publisher_token)
176+
177+
source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)
178+
track = rtc.LocalAudioTrack.create_audio_track("sine-sweep", source)
179+
options = rtc.TrackPublishOptions()
180+
options.source = rtc.TrackSource.SOURCE_MICROPHONE
181+
await publisher_room.local_participant.publish_track(track, options)
182+
183+
await asyncio.wait_for(track_subscribed_event.wait(), timeout=10.0)
184+
assert subscribed_track is not None
185+
186+
audio_stream = rtc.AudioStream(
187+
subscribed_track,
188+
sample_rate=SAMPLE_RATE,
189+
num_channels=NUM_CHANNELS,
190+
)
191+
192+
total_duration = TONE_DURATION_SEC * len(FREQUENCIES_HZ)
193+
target_samples = int(SAMPLE_RATE * total_duration)
194+
# Collect a little extra to tolerate codec startup latency.
195+
collect_samples_target = target_samples + int(SAMPLE_RATE * 1.0)
196+
197+
async def publish_tones() -> None:
198+
for freq in FREQUENCIES_HZ:
199+
frame = _generate_sine_wave(
200+
freq,
201+
SAMPLE_RATE,
202+
NUM_CHANNELS,
203+
TONE_DURATION_SEC,
204+
AMPLITUDE,
205+
)
206+
await source.capture_frame(frame)
207+
await source.wait_for_playout()
208+
209+
async def collect_samples() -> np.ndarray:
210+
buffers: list[np.ndarray] = []
211+
total = 0
212+
async for event in audio_stream:
213+
chunk = np.frombuffer(
214+
bytes(event.frame.data.cast("B")), dtype=np.int16
215+
)
216+
buffers.append(chunk)
217+
total += len(chunk)
218+
if total >= collect_samples_target:
219+
break
220+
return (
221+
np.concatenate(buffers)
222+
if buffers
223+
else np.array([], dtype=np.int16)
224+
)
225+
226+
publish_task = asyncio.create_task(publish_tones())
227+
received = await asyncio.wait_for(collect_samples(), timeout=20.0)
228+
await publish_task
229+
await audio_stream.aclose()
230+
await source.aclose()
231+
232+
assert len(received) >= target_samples, (
233+
f"Expected at least {target_samples} samples, got {len(received)}"
234+
)
235+
236+
recv_wav_path = Path(__file__).parent / "subscriber_recv_freqs.wav"
237+
with wave.open(str(recv_wav_path), "wb") as wav_out:
238+
wav_out.setnchannels(NUM_CHANNELS)
239+
wav_out.setsampwidth(ctypes.sizeof(ctypes.c_int16))
240+
wav_out.setframerate(SAMPLE_RATE)
241+
wav_out.writeframes(received.tobytes())
242+
243+
# Find signal onset to skip codec startup silence.
244+
envelope = np.abs(received.astype(np.float32))
245+
threshold = float(envelope.max()) * 0.2
246+
onset_candidates = np.where(envelope > threshold)[0]
247+
assert onset_candidates.size > 0, "Received audio contains only silence"
248+
onset = int(onset_candidates[0])
249+
250+
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION_SEC)
251+
# Analyze the middle slice of each tone window to avoid boundary transitions.
252+
analysis_margin = int(SAMPLE_RATE * 0.2)
253+
analysis_length = samples_per_tone - 2 * analysis_margin
254+
255+
per_tone_peaks: list[tuple[int, float]] = []
256+
for idx, expected_freq in enumerate(FREQUENCIES_HZ):
257+
start = onset + idx * samples_per_tone + analysis_margin
258+
end = start + analysis_length
259+
assert end <= len(received), (
260+
f"Not enough samples for tone {idx} (expected {expected_freq} Hz): "
261+
f"need {end}, have {len(received)}"
262+
)
263+
segment = received[start:end]
264+
segment_frame = AudioFrame(
265+
data=segment.tobytes(),
266+
sample_rate=SAMPLE_RATE,
267+
num_channels=NUM_CHANNELS,
268+
samples_per_channel=len(segment),
269+
)
270+
peak_hz = _detect_peak_frequency(segment_frame)
271+
per_tone_peaks.append((expected_freq, peak_hz))
272+
273+
# Opus transcoding adds spectral jitter; allow a 15 Hz tolerance.
274+
assert peak_hz == pytest.approx(expected_freq, abs=15.0), (
275+
f"Tone {idx}: expected {expected_freq} Hz, got peak at {peak_hz:.1f} Hz. "
276+
f"All peaks: {per_tone_peaks}"
277+
)
278+
279+
# The target band should also dominate the other sweep bands.
280+
energies = _band_energies(segment_frame, FREQUENCIES_HZ, bandwidth_hz=30.0)
281+
target_energy = energies[expected_freq]
282+
other_energy = sum(v for k, v in energies.items() if k != expected_freq)
283+
assert target_energy > 5.0 * max(other_energy, 1e-12), (
284+
f"Tone {idx} ({expected_freq} Hz) did not dominate other bands: "
285+
f"target={target_energy:.3e}, other={other_energy:.3e}"
286+
)
287+
finally:
288+
await publisher_room.disconnect()
289+
await subscriber_room.disconnect()

0 commit comments

Comments
 (0)