Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `DeviceBusy` error variant to `SupportedStreamConfigsError`, `DefaultStreamConfigError`, and
`BuildStreamError` for retryable device access errors (EBUSY, EAGAIN).
- `StreamConfig` now implements `Copy`.
- `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback.
- **PipeWire**: New host for Linux and some BSDs using the PipeWire API.
- **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API.

Expand All @@ -25,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **AAudio**: `supported_input_configs` and `supported_output_configs` now return an error for
direction-mismatched devices (e.g. querying input configs on an output-only device) instead of
silently returning an empty list.
- **AAudio**: Buffers with default sizes are now dynamically tuned.
- **ALSA**: Device disconnection now stops the stream with `StreamError::DeviceNotAvailable` instead of looping.
- **ALSA**: Polling errors trigger underrun recovery instead of looping.
- **ALSA**: Try to resume from hardware after a system suspend.
Expand Down
22 changes: 21 additions & 1 deletion src/host/aaudio/java_interface/audio_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
utils::{
get_context, get_property, get_system_service, with_attached, JNIEnv, JObject, JResult,
get_context, get_property, get_system_property, get_system_service, with_attached, JNIEnv,
JObject, JResult,
},
AudioManager, Context,
};
Expand All @@ -13,6 +14,14 @@ impl AudioManager {
with_attached(context, |env, context| get_frames_per_buffer(env, &context))
.map_err(|error| error.to_string())
}

/// Get the AAudio mixer burst count from system property
pub fn get_mixer_bursts() -> Result<i32, String> {
let context = get_context();

with_attached(context, |env, _context| get_mixer_bursts(env))
.map_err(|error| error.to_string())
}
}

fn get_frames_per_buffer<'j>(env: &mut JNIEnv<'j>, context: &JObject<'j>) -> JResult<i32> {
Expand All @@ -31,3 +40,14 @@ fn get_frames_per_buffer<'j>(env: &mut JNIEnv<'j>, context: &JObject<'j>) -> JRe
.parse::<i32>()
.map_err(|_| jni::errors::Error::JniCall(jni::errors::JniError::Unknown))
}

fn get_mixer_bursts<'j>(env: &mut JNIEnv<'j>) -> JResult<i32> {
let mixer_bursts = get_system_property(env, "aaudio.mixer_bursts", "2")?;

let mixer_bursts_string = String::from(env.get_string(&mixer_bursts)?);

// TODO: Use jni::errors::Error::ParseFailed instead of jni::errors::Error::JniCall once jni > v0.21.1 is released
mixer_bursts_string
.parse::<i32>()
.map_err(|_| jni::errors::Error::JniCall(jni::errors::JniError::Unknown))
}
20 changes: 20 additions & 0 deletions src/host/aaudio/java_interface/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ pub fn get_property<'j>(
call_method_string_arg_ret_string(env, subject, "getProperty", name)
}

/// Read an Android system property
pub fn get_system_property<'j>(
env: &mut JNIEnv<'j>,
name: &str,
default_value: &str,
) -> JResult<JString<'j>> {
Ok(env
.call_static_method(
"android/os/SystemProperties",
"get",
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;",
&[
(&env.new_string(name)?).into(),
(&env.new_string(default_value)?).into(),
],
)?
.l()?
.into())
}

pub fn get_devices<'j>(
env: &mut JNIEnv<'j>,
subject: &JObject<'j>,
Expand Down
155 changes: 125 additions & 30 deletions src/host/aaudio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::cmp;
use std::convert::TryInto;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
Expand Down Expand Up @@ -131,9 +132,9 @@ pub struct Device(Option<AudioDeviceInfo>);
/// - The pointer in AudioStream (NonNull<AAudioStreamStruct>) is valid for the lifetime
/// of the stream and AAudio C API functions are thread-safe at the C level
#[derive(Clone)]
pub enum Stream {
Input(Arc<Mutex<AudioStream>>),
Output(Arc<Mutex<AudioStream>>),
pub struct Stream {
inner: Arc<Mutex<AudioStream>>,
direction: DeviceDirection,
}

// SAFETY: AudioStream can be safely sent between threads. The AAudio C API is thread-safe
Expand All @@ -148,6 +149,14 @@ unsafe impl Sync for Stream {}
crate::assert_stream_send!(Stream);
crate::assert_stream_sync!(Stream);

/// State for dynamic buffer tuning on output streams.
#[derive(Default)]
struct BufferTuningState {
previous_underrun_count: AtomicI32,
capacity: AtomicI32,
mixer_bursts: AtomicI32,
}

pub use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs};
pub type Devices = std::vec::IntoIter<Device>;

Expand Down Expand Up @@ -277,14 +286,18 @@ fn configure_for_device(
};
builder = builder.sample_rate(config.sample_rate.try_into().unwrap());

// Note: Buffer size validation is not needed - the native AAudio API validates buffer sizes
// when `open_stream()` is called.
match &config.buffer_size {
BufferSize::Default => builder,
BufferSize::Fixed(size) => builder
.frames_per_data_callback(*size as i32)
.buffer_capacity_in_frames((*size * 2) as i32), // Double-buffering
// Following the pattern from Oboe and Google's AAudio, we let AAudio choose the optimal
// callback size dynamically by default. See
// - https://developer.android.com/ndk/reference/group/audio#aaudiostreambuilder_setframesperdatacallback
// - https://developer.android.com/ndk/guides/audio/audio-latency#buffer-size
if let BufferSize::Fixed(size) = config.buffer_size {
// For fixed sizes, the user explicitly wants control over the callback size.
builder = builder
.frames_per_data_callback(size as i32)
.buffer_capacity_in_frames(2 * size as i32);
}

builder
}

fn build_input_stream<D, E>(
Expand Down Expand Up @@ -326,11 +339,15 @@ where
(error_callback)(StreamError::from(error))
}))
.open_stream()?;

// SAFETY: Stream implements Send + Sync (see unsafe impl below). Arc<Mutex<AudioStream>>
// is safe because the Mutex provides exclusive access and AudioStream's thread safety
// is documented in the AAudio C API.
#[allow(clippy::arc_with_non_send_sync)]
Ok(Stream::Input(Arc::new(Mutex::new(stream))))
Ok(Stream {
inner: Arc::new(Mutex::new(stream)),
direction: DeviceDirection::Input,
})
}

fn build_output_stream<D, E>(
Expand All @@ -348,8 +365,14 @@ where
let builder = configure_for_device(builder, device, config);
let created = Instant::now();
let channel_count = config.channels as i32;
let tune_dynamically = config.buffer_size == BufferSize::Default;

let tuning = Arc::new(BufferTuningState::default());
let tuning_for_callback = tuning.clone();

let stream = builder
.data_callback(Box::new(move |stream, data, num_frames| {
// Deliver audio data to user callback
let cb_info = OutputCallbackInfo {
timestamp: OutputStreamTimestamp {
callback: to_stream_instant(created.elapsed()),
Expand All @@ -366,17 +389,79 @@ where
},
&cb_info,
);

// Dynamic buffer tuning for output streams
// See: https://developer.android.com/ndk/guides/audio/aaudio/aaudio#tuning-buffers
if tune_dynamically {
let underrun_count = stream.x_run_count();
let previous = tuning_for_callback
.previous_underrun_count
.load(Ordering::Relaxed);

if underrun_count > previous {
// The number of frames per burst can vary dynamically
let mut burst_size = stream.frames_per_burst();
if burst_size <= 0 {
burst_size = 256; // fallback from AAudio documentation
} else if burst_size < 16 {
burst_size = 16; // floor from Oboe
}

let new_mixer_bursts = tuning_for_callback
.mixer_bursts
.load(Ordering::Relaxed)
.saturating_add(1);
let mut buffer_size = burst_size * new_mixer_bursts;

let buffer_capacity = tuning_for_callback.capacity.load(Ordering::Relaxed);
if buffer_size > buffer_capacity {
buffer_size = buffer_capacity;
}

if stream.set_buffer_size_in_frames(buffer_size).is_ok() {
tuning_for_callback
.mixer_bursts
.store(new_mixer_bursts, Ordering::Relaxed);
}

tuning_for_callback
.previous_underrun_count
.store(underrun_count, Ordering::Relaxed);
}
}

ndk::audio::AudioCallbackResult::Continue
}))
.error_callback(Box::new(move |_stream, error| {
(error_callback)(StreamError::from(error))
}))
.open_stream()?;

// After stream opens, query and cache the values
let capacity = stream.buffer_capacity_in_frames();
tuning.capacity.store(capacity, Ordering::Relaxed);

let mixer_bursts = match AudioManager::get_mixer_bursts() {
Ok(bursts) => bursts.max(0),
Err(_) => {
let burst_size = stream.frames_per_burst();
if burst_size > 0 {
stream.buffer_size_in_frames() / burst_size
} else {
0 // defer to dynamic tuning
}
}
};
tuning.mixer_bursts.store(mixer_bursts, Ordering::Relaxed);

// SAFETY: Stream implements Send + Sync (see unsafe impl below). Arc<Mutex<AudioStream>>
// is safe because the Mutex provides exclusive access and AudioStream's thread safety
// is documented in the AAudio C API.
#[allow(clippy::arc_with_non_send_sync)]
Ok(Stream::Output(Arc::new(Mutex::new(stream))))
Ok(Stream {
inner: Arc::new(Mutex::new(stream)),
direction: DeviceDirection::Output,
})
}

impl DeviceTrait for Device {
Expand Down Expand Up @@ -598,31 +683,41 @@ impl DeviceTrait for Device {

impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
match self {
Self::Input(stream) => stream
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from),
Self::Output(stream) => stream
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from),
}
self.inner
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from)
}

fn pause(&self) -> Result<(), PauseStreamError> {
match self {
Self::Input(_) => Err(BackendSpecificError {
description: "Pause called on the input stream.".to_owned(),
}
.into()),
Self::Output(stream) => stream
match self.direction {
DeviceDirection::Output => self
.inner
.lock()
.unwrap()
.request_pause()
.map_err(PauseStreamError::from),
_ => Err(BackendSpecificError {
description: "Pause only supported on output streams.".to_owned(),
}
.into()),
}
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
let stream = self.inner.lock().ok()?;

// frames_per_data_callback is only set for BufferSize::Fixed; for Default AAudio
// schedules callbacks at the burst size, so that is the best available estimate.
let frames = match stream.frames_per_data_callback() {
Some(size) if size > 0 => size,
_ => stream.frames_per_burst(),
};
if frames > 0 {
Some(frames as crate::FrameCount)
} else {
None
}
}
}
3 changes: 3 additions & 0 deletions src/host/alsa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,9 @@ impl StreamTrait for Stream {
self.inner.channel.pause(true).ok();
Ok(())
}
fn buffer_size(&self) -> Option<FrameCount> {
Some(self.inner.period_frames as FrameCount)
}
}

// Convert ALSA frames to FrameCount, clamping to valid range.
Expand Down
4 changes: 4 additions & 0 deletions src/host/asio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,8 @@ impl StreamTrait for Stream {
fn pause(&self) -> Result<(), PauseStreamError> {
Stream::pause(self)
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
Stream::buffer_size(self)
}
}
9 changes: 9 additions & 0 deletions src/host/asio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ impl Stream {
self.playing.store(false, Ordering::SeqCst);
Ok(())
}

pub fn buffer_size(&self) -> Option<crate::FrameCount> {
let streams = self.asio_streams.lock().ok()?;
streams
.output
.as_ref()
.or(streams.input.as_ref())
.map(|s| s.buffer_size as crate::FrameCount)
}
}

impl Device {
Expand Down
5 changes: 4 additions & 1 deletion src/host/coreaudio/ios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,14 @@ impl StreamTrait for Stream {
let err = BackendSpecificError { description };
return Err(err.into());
}

stream.playing = false;
}
Ok(())
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
Some(get_device_buffer_frames() as crate::FrameCount)
}
}

struct StreamInner {
Expand Down
4 changes: 3 additions & 1 deletion src/host/coreaudio/macos/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,9 @@ fn setup_callback_vars(
///
/// Buffer frame size is a device-level property that always uses Scope::Global + Element::Output,
/// regardless of whether the audio unit is configured for input or output streams.
fn get_device_buffer_frame_size(audio_unit: &AudioUnit) -> Result<usize, coreaudio::Error> {
pub(crate) fn get_device_buffer_frame_size(
audio_unit: &AudioUnit,
) -> Result<usize, coreaudio::Error> {
// Device-level property: always use Scope::Global + Element::Output
// This is consistent with how we set the buffer size and query the buffer size range
let frames: u32 = audio_unit.get_property(
Expand Down
8 changes: 8 additions & 0 deletions src/host/coreaudio/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ impl StreamTrait for Stream {

stream.pause()
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
let stream = self.inner.lock().ok()?;

device::get_device_buffer_frame_size(&stream.audio_unit)
.ok()
.map(|size| size as crate::FrameCount)
}
}

#[cfg(test)]
Expand Down
Loading
Loading