Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions audioipc/src/ipccore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ enum Request {
Shutdown,
// See EventLoop::wake_connection
WakeConnection(Token),
// Run an arbitrary closure on the EventLoop's thread. Used to perform
// work that must execute on a specific thread (e.g. thread priority
// changes that can only be made by the target thread).
RunTask(Box<dyn FnOnce() + Send>),
}

// EventLoopHandle is a cloneable external reference
Expand Down Expand Up @@ -144,6 +148,20 @@ impl EventLoopHandle {
self.waker.wake()
}

// Queue a closure to run on the EventLoop's thread. The closure is
// executed after any currently-pending requests and before the next
// poll iteration. Returns immediately without waiting for the closure
// to complete.
pub fn run_task<F: FnOnce() + Send + 'static>(&self, f: F) -> Result<()> {
self.requests
.push(Request::RunTask(Box::new(f)))
.map_err(|_| {
debug!("EventLoopHandle::run_task send failed");
io::ErrorKind::ConnectionAborted
})?;
self.waker.wake()
}

// Signal EventLoop to wake connection specified by `token` for processing.
pub(crate) fn wake_connection(&self, token: Token) {
if self.requests.push(Request::WakeConnection(token)).is_ok() {
Expand Down Expand Up @@ -284,6 +302,10 @@ impl EventLoop {
debug!("{}: EventLoop: handling shutdown", self.name);
return Ok(false);
}
Request::RunTask(f) => {
trace!("{}: EventLoop: handling run_task", self.name);
f();
}
Request::WakeConnection(token) => {
debug!(
"{}: EventLoop: handling wake_connection {:?}",
Expand Down
55 changes: 15 additions & 40 deletions client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
use crate::stream;
use crate::{assert_not_in_callback, run_in_callback};
use crate::{ClientStream, AUDIOIPC_INIT_PARAMS};
#[cfg(target_os = "linux")]
use audio_thread_priority::get_current_thread_info;
#[cfg(not(target_os = "linux"))]
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::ipccore::EventLoopHandle;
use audioipc::{ipccore, rpccore, sys, PlatformHandle};
use audioipc::{
Expand All @@ -22,6 +18,7 @@ use cubeb_backend::{
};
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex};
use std::thread;
use std::{fmt, ptr};
Expand All @@ -46,6 +43,10 @@ pub struct ClientContext {
backend_id: CString,
device_collection_rpc: bool,
device_collection_callbacks: Arc<Mutex<DeviceCollectionCallbacks>>,
// Number of ClientStreams on this context that are currently started.
// Used to gate callback-thread promotion: the thread is promoted on the
// 0->1 transition and demoted on 1->0.
pub(crate) active_streams: Arc<AtomicUsize>,
}

impl ClientContext {
Expand All @@ -65,31 +66,6 @@ impl ClientContext {
}
}

#[cfg(target_os = "linux")]
fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
match get_current_thread_info() {
Ok(info) => {
let bytes = info.serialize();
let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
}
Err(_) => {
warn!("Could not remotely promote thread to RT.");
}
}
}

#[cfg(not(target_os = "linux"))]
fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => {
info!("Audio thread promoted to real-time.");
}
Err(_) => {
warn!("Could not promote thread to real-time.");
}
}
}

fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
if let Some(func) = callback {
let thr = thread::current();
Expand All @@ -104,14 +80,6 @@ fn unregister_thread(callback: Option<extern "C" fn()>) {
}
}

fn promote_and_register_thread(
rpc: &rpccore::Proxy<ServerMessage, ClientMessage>,
callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
) {
promote_thread(rpc);
register_thread(callback);
}

#[derive(Default)]
struct DeviceCollectionCallbacks {
input_cb: ffi::cubeb_device_collection_changed_callback,
Expand Down Expand Up @@ -181,7 +149,6 @@ impl ContextOps for ClientContext {
.handle()
.bind_client::<CubebClient>(server_connection)
.map_err(|_| Error::Error)?;
let rpc2 = rpc.clone();

// Don't let errors bubble from here. Later calls against this context
// will return errors the caller expects to handle.
Expand All @@ -192,11 +159,18 @@ impl ContextOps for ClientContext {
let backend_id = CString::new(backend_id).expect("backend_id query failed");

// TODO: remove params.pool_size from init params.
// The callback thread starts at normal priority; it is promoted on demand
// when the first stream on this context is started, and demoted when the
// last stream stops.
let callback_thread = ipccore::EventLoopThread::new(
"AudioIPC Client Callback".to_string(),
Some(params.stack_size),
move || promote_and_register_thread(&rpc2, thread_create_callback),
move || unregister_thread(thread_destroy_callback),
move || register_thread(thread_create_callback),
move || {
// Best-effort: if still promoted at shutdown, demote first.
crate::thread_priority::demote();
unregister_thread(thread_destroy_callback);
},
)
.map_err(|_| Error::Error)?;

Expand All @@ -208,6 +182,7 @@ impl ContextOps for ClientContext {
backend_id,
device_collection_rpc: false,
device_collection_callbacks: Arc::new(Mutex::new(Default::default())),
active_streams: Arc::new(AtomicUsize::new(0)),
});
Ok(ctx)
}
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern crate log;
mod send_recv;
mod context;
mod stream;
mod thread_priority;

use crate::context::ClientContext;
use crate::stream::ClientStream;
Expand Down
61 changes: 59 additions & 2 deletions client/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details

use crate::thread_priority;
use crate::ClientContext;
use crate::{assert_not_in_callback, run_in_callback};
use audioipc::messages::StreamCreateParams;
Expand All @@ -14,6 +15,7 @@ use std::convert::TryFrom;
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -45,6 +47,10 @@ pub struct ClientStream<'ctx> {
device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
// Signals ClientStream that CallbackServer has dropped.
shutdown_rx: mpsc::Receiver<()>,
// Whether this stream is currently contributing to the context's
// active-stream count. Transitioned atomically on successful start/stop
// so concurrent calls can't over- or under-count.
active: AtomicBool,
}

struct CallbackServer {
Expand Down Expand Up @@ -257,14 +263,61 @@ impl<'ctx> ClientStream<'ctx> {
token: data.token,
device_change_cb,
shutdown_rx,
active: AtomicBool::new(false),
}));
Ok(unsafe { Stream::from_ptr(stream as *mut _) })
}

// Flip `active` false->true and bump the context's active-stream count,
// dispatching `enter_active` to the callback thread on the 0->1 transition.
fn mark_active(&self) {
if self
.active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let prev = self.context.active_streams.fetch_add(1, Ordering::AcqRel);
if prev == 0 {
let rpc = self.context.rpc();
if let Err(e) = self
.context
.callback_handle()
.run_task(move || thread_priority::promote(&rpc))
{
warn!("failed to dispatch thread promotion: {e:?}");
}
}
}
}

// Flip `active` true->false and decrement the context's active-stream count,
// dispatching `leave_active` to the callback thread on the 1->0 transition.
fn mark_inactive(&self) {
if self
.active
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let prev = self.context.active_streams.fetch_sub(1, Ordering::AcqRel);
if prev == 1 {
if let Err(e) = self
.context
.callback_handle()
.run_task(thread_priority::demote)
{
warn!("failed to dispatch thread demotion: {e:?}");
}
}
}
}
}

impl Drop for ClientStream<'_> {
fn drop(&mut self) {
debug!("ClientStream drop");
// Release any contribution this stream was making to the context's
// active-stream count before tearing down the remote stream.
self.mark_inactive();
let _ = send_recv!(self.context.rpc(), StreamDestroy(self.token) => StreamDestroyed);
debug!("ClientStream drop - stream destroyed");
// Wait for CallbackServer to shutdown. The remote server drops the RPC
Expand All @@ -281,13 +334,17 @@ impl StreamOps for ClientStream<'_> {
fn start(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStart(self.token) => StreamStarted)
send_recv!(rpc, StreamStart(self.token) => StreamStarted)?;
self.mark_active();
Ok(())
}

fn stop(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStop(self.token) => StreamStopped)
send_recv!(rpc, StreamStop(self.token) => StreamStopped)?;
self.mark_inactive();
Ok(())
}

fn position(&mut self) -> Result<u64> {
Expand Down
109 changes: 109 additions & 0 deletions client/src/thread_priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright © 2026 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details

//! Per-thread real-time priority management for the client's callback thread.
//!
//! On Linux the promote path goes through the server via RPC (since sandboxed
//! content processes cannot call rtkit directly); demote is performed locally
//! via `pthread_setschedparam`, which does not require elevated privilege.
//! On macOS/Windows/Linux-no-dbus both promote and demote are performed
//! directly on the calling thread.
//!
//! All functions here are intended to run on the callback thread itself,
//! dispatched via `ipccore::EventLoopHandle::run_task`.

#[cfg(not(target_os = "linux"))]
use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time, RtPriorityHandle,
};
#[cfg(target_os = "linux")]
use audio_thread_priority::{
demote_thread_from_real_time, get_current_thread_info, RtPriorityThreadInfo,
};

use audioipc::rpccore::Proxy;
use audioipc::{ClientMessage, ServerMessage};
use std::cell::RefCell;

#[cfg(target_os = "linux")]
thread_local! {
// Thread info captured at promote time. Kept so that `demote`
// can demote locally without another round-trip to the server.
static THREAD_INFO: RefCell<Option<RtPriorityThreadInfo>> = const { RefCell::new(None) };
}

#[cfg(not(target_os = "linux"))]
thread_local! {
static RT_HANDLE: RefCell<Option<RtPriorityHandle>> = const { RefCell::new(None) };
}

#[cfg(target_os = "linux")]
pub(crate) fn promote(rpc: &Proxy<ServerMessage, ClientMessage>) {
THREAD_INFO.with(|slot| {
let mut slot = slot.borrow_mut();
if slot.is_some() {
return;
}
match get_current_thread_info() {
Ok(info) => {
let bytes = info.serialize();
if rpc
.call(ServerMessage::PromoteThreadToRealTime(bytes))
.is_ok()
{
*slot = Some(info);
debug!("callback thread promoted to real-time via server");
} else {
warn!("callback thread promotion RPC failed");
}
}
Err(e) => warn!("get_current_thread_info failed: {e:?}"),
}
});
}

#[cfg(not(target_os = "linux"))]
pub(crate) fn promote(_rpc: &Proxy<ServerMessage, ClientMessage>) {
RT_HANDLE.with(|slot| {
let mut slot = slot.borrow_mut();
if slot.is_some() {
return;
}
match promote_current_thread_to_real_time(0, 48000) {
Ok(handle) => {
*slot = Some(handle);
debug!("callback thread promoted to real-time");
}
Err(e) => warn!("failed to promote callback thread: {e:?}"),
}
});
}

#[cfg(target_os = "linux")]
pub(crate) fn demote() {
THREAD_INFO.with(|slot| {
if let Some(info) = slot.borrow_mut().take() {
// Demotion to SCHED_OTHER is always permitted; no RPC needed.
if let Err(e) = demote_thread_from_real_time(info) {
warn!("failed to demote callback thread: {e:?}");
} else {
debug!("callback thread demoted from real-time");
}
}
});
}

#[cfg(not(target_os = "linux"))]
pub(crate) fn demote() {
RT_HANDLE.with(|slot| {
if let Some(handle) = slot.borrow_mut().take() {
if let Err(e) = demote_current_thread_from_real_time(handle) {
warn!("failed to demote callback thread: {e:?}");
} else {
debug!("callback thread demoted from real-time");
}
}
});
}
Loading
Loading