Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/hotplug.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
//! Support for hotplug events.
//!
//! This module is currently Linux-specific. It uses the udev netlink socket to listen for events
//! from a udev implementation.
//! The recommended way to support device hotplug in applications is to use the
//! [`hotplug::enumerate`] function, which returns an iterator over all devices that are or will be
//! plugged into the system.
//!
//! The recommended way to support device hotplug is to use the [`hotplug::enumerate`] function,
//! which returns an iterator over all devices that are or will be plugged into the system.
//! # Platform Support
//!
//! Hotplug functionality is supported on Linux and FreeBSD, as follows:
//!
//! | OS | Details |
//! |----|---------|
//! | Linux | Uses the `NETLINK_KOBJECT_UEVENT` socket. Requires `udev`. |
//! | FreeBSD | Uses `devd`'s seqpacket socket at `/var/run/devd.seqpacket.pipe`. |
//!
//! [`hotplug::enumerate`]: crate::hotplug::enumerate

Expand Down Expand Up @@ -114,6 +121,8 @@ impl HotplugMonitor {
///
/// The [`HotplugMonitor`] will be put in non-blocking mode while the [`AsyncIter`] is alive
/// (if it isn't already).
///
/// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context.
#[cfg_attr(docsrs, doc(cfg(any(doc, feature = "tokio", feature = "async-io"))))]
#[cfg(any(doc, feature = "tokio", feature = "async-io"))]
pub fn async_iter(&self) -> io::Result<AsyncIter<'_>> {
Expand Down
42 changes: 27 additions & 15 deletions src/hotplug/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,41 @@ impl<'a> AsyncIter<'a> {

#[cfg(test)]
mod tests {
use std::{io, pin::pin};
use std::io;

use crate::{
hotplug::HotplugMonitor, test::AssertPending, uinput::UinputDevice,
util::r#async::with_runtime,
};
use crate::{hotplug::HotplugMonitor, uinput::UinputDevice, util::r#async::test::AsyncTest};

#[test]
fn smoke() -> io::Result<()> {
with_runtime(|rt| {
const DEVNAME: &str = "-@-rust-async-hotplug-test-@-";
env_logger::try_init().ok();

let mon = HotplugMonitor::new()?;
const DEVNAME: &str = "-@-rust-async-hotplug-test-@-";

let events = mon.async_iter()?;
let mut fut = pin!(events.next_event());
rt.block_on(AssertPending(fut.as_mut()));

let _uinput = UinputDevice::builder()?.build(DEVNAME)?;

rt.block_on(fut)?;
let mon = HotplugMonitor::new()?;

let mut uinput = None;
let fut = async {
// Wait for our test device to arrive:
loop {
if let Ok(evdev) = mon.async_iter()?.next_event().await {
if let Ok(name) = evdev.name() {
if name == DEVNAME {
return Ok(evdev);
}
}
}
}
};
AsyncTest::new(fut, || {
uinput = Some(UinputDevice::builder()?.build(DEVNAME)?);
println!("unblocked");
Ok(())
})
// This test might take a few tries since unrelated events need to be filtered out, and
// unrelated messages may arrive at the socket, causing a wakeup that results in `Pending`.
.allowed_polls(1024)
.run()?;

Ok(())
}
}
4 changes: 4 additions & 0 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ impl EventReader {
///
/// The underlying device will be put in non-blocking mode while the returned [`AsyncEvents`]
/// is alive (if it isn't already).
///
/// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context.
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-io"))))]
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub fn async_events(&mut self) -> io::Result<AsyncEvents<'_>> {
Expand All @@ -952,6 +954,8 @@ impl EventReader {
///
/// The underlying device will be put in non-blocking mode while the returned [`AsyncReports`]
/// is alive (if it isn't already).
///
/// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context.
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-io"))))]
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub fn async_reports(&mut self) -> io::Result<AsyncReports<'_>> {
Expand Down
56 changes: 25 additions & 31 deletions src/reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,49 +77,43 @@ impl<'a> AsyncEvents<'a> {

#[cfg(test)]
mod tests {
use std::{io, pin::pin};
use std::io;

use crate::{
event::{Rel, RelEvent, Syn},
test::{AssertPending, check_events, pair},
util::r#async::with_runtime,
test::{check_events, pair},
util::r#async::test::AsyncTest,
};

#[test]
fn smoke() -> io::Result<()> {
with_runtime(|rt| {
let (uinput, evdev) = pair(|b| b.with_rel_axes([Rel::DIAL]))?;
let mut reader = evdev.into_reader()?;
let mut events = reader.async_events()?;
let (uinput, evdev) = pair(|b| b.with_rel_axes([Rel::DIAL]))?;
let mut reader = evdev.into_reader()?;

{
let mut fut = pin!(events.next_event());
rt.block_on(AssertPending(fut.as_mut()));

uinput.write(&[RelEvent::new(Rel::DIAL, 1).into()])?;

let event = rt.block_on(fut)?;
check_events([event], [RelEvent::new(Rel::DIAL, 1).into()]);
}
{
let event = AsyncTest::new(async { reader.async_events()?.next_event().await }, || {
uinput.write(&[RelEvent::new(Rel::DIAL, 1).into()])
})
.run()?;

drop(events);
let ev = reader.events().next().unwrap()?;
check_events([ev], [Syn::REPORT.into()]);
check_events([event], [RelEvent::new(Rel::DIAL, 1).into()]);
}

let mut reports = reader.async_reports()?;
let mut fut = pin!(reports.next_report());
rt.block_on(AssertPending(fut.as_mut()));
let ev = reader.events().next().unwrap()?;
check_events([ev], [Syn::REPORT.into()]);

uinput.write(&[RelEvent::new(Rel::DIAL, 2).into()])?;
let report = AsyncTest::new(
async { reader.async_reports()?.next_report().await },
|| uinput.write(&[RelEvent::new(Rel::DIAL, 2).into()]),
)
.run()?;

let report = rt.block_on(fut)?;
assert_eq!(report.len(), 2);
check_events(
report,
[RelEvent::new(Rel::DIAL, 2).into(), Syn::REPORT.into()],
);
assert_eq!(report.len(), 2);
check_events(
report,
[RelEvent::new(Rel::DIAL, 2).into(), Syn::REPORT.into()],
);

Ok(())
})
Ok(())
}
}
31 changes: 5 additions & 26 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
#![allow(dead_code)]

use std::{
fmt,
hash::{BuildHasher, Hasher, RandomState},
io,
iter::zip,
pin::Pin,
task::{Context, Poll},
};

use crate::{
Expand All @@ -16,12 +11,13 @@ use crate::{
uinput::{Builder, UinputDevice},
};

fn hash() -> u64 {
RandomState::new().build_hasher().finish()
}

/// Creates a [`UinputDevice`] and [`Evdev`] that are connected to each other.
#[allow(dead_code)]
pub fn pair(b: impl FnOnce(Builder) -> io::Result<Builder>) -> io::Result<(UinputDevice, Evdev)> {
fn hash() -> u64 {
RandomState::new().build_hasher().finish()
}

let hash = hash();
let name = format!("-@-rust-evdevil-device-{hash}-@-");

Expand Down Expand Up @@ -73,20 +69,3 @@ pub fn check_events(
panic!("expected {expected:?}, got {actual:?}");
}
}

/// A `Future` that polls its argument once and panics unless the inner poll results in `Pending`.
pub struct AssertPending<'a, F>(pub Pin<&'a mut F>);

impl<'a, F: Future> Future for AssertPending<'a, F>
where
F::Output: fmt::Debug,
{
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.0.as_mut().poll(cx) {
Poll::Ready(val) => panic!("expected `Pending`, got `Ready`: {val:?}"),
Poll::Pending => Poll::Ready(()),
}
}
}
126 changes: 114 additions & 12 deletions src/util/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ use asyncio_impl::*;
#[cfg(feature = "async-io")]
mod asyncio_impl {
use std::{
io,
future, io,
os::fd::{BorrowedFd, RawFd},
pin::pin,
task::Poll,
};

Expand All @@ -129,7 +130,7 @@ mod asyncio_impl {
impl Impl {
pub fn new(fd: RawFd) -> io::Result<Self> {
let fd = unsafe { BorrowedFd::borrow_raw(fd) };
Async::new(fd).map(Self)
Async::new_nonblocking(fd).map(Self)
}

pub async fn asyncify<T>(
Expand All @@ -138,13 +139,33 @@ mod asyncio_impl {
) -> io::Result<T> {
loop {
match op() {
Poll::Pending => self.0.readable().await?,
Poll::Pending => optimistic(self.0.readable()).await?,
Poll::Ready(res) => return res,
}
}
}
}

// This "optimization" is copied from async-io.
// async-io is apparently very buggy (see smol-rs/async-io#78), so it ends up being required for
// things to work right.
// Specifically, the `.readable()` future is permanently `Pending`, even after the reactor
// schedules the future again, so `asyncify` would just never complete.
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
let mut polled = false;
let mut fut = pin!(fut);

future::poll_fn(|cx| {
if !polled {
polled = true;
fut.as_mut().poll(cx)
} else {
Poll::Ready(Ok(()))
}
})
.await
}

#[cfg(test)]
pub struct Runtime;

Expand All @@ -169,14 +190,95 @@ pub struct Impl;
#[cfg(doc)]
pub struct Runtime;

/// Calls `f` with an instance of the selected async runtime.
///
/// Allows writing async-runtime-agnostic tests.
///
/// The only supported API is `runtime.block_on(future)`.
#[cfg(test)]
pub fn with_runtime<R>(f: impl FnOnce(&Runtime) -> io::Result<R>) -> io::Result<R> {
let rt = Runtime::new()?;
let _guard = rt.enter();
f(&rt)
pub mod test {
use std::{fmt, future, panic::resume_unwind, pin::pin, sync::mpsc, thread};

use super::*;

pub struct AsyncTest<F, U> {
future: F,
unblocker: U,
allowed_polls: usize,
}

impl<F, U> AsyncTest<F, U> {
pub fn new(future: F, unblocker: U) -> Self {
Self {
future,
unblocker,
allowed_polls: 1,
}
}

/// Sets the number of allowed future polls after the `unblocker` has been run.
///
/// By default, this is 1, expecting the future to complete immediately after the waker has
/// been notified.
/// Higher values may be needed if the API-under-test is system-global and may have to
/// process some irrelevant events until it becomes `Ready`.
pub fn allowed_polls(mut self, allowed_polls: usize) -> Self {
self.allowed_polls = allowed_polls;
self
}

/// Polls `future`, expecting `Poll::Pending`. Then runs `unblocker`, and expects the waker to
/// be invoked and the `future` to be `Poll::Ready`.
pub fn run<T>(self) -> io::Result<T>
where
F: Future<Output = io::Result<T>> + Send,
F::Output: Send,
U: FnOnce() -> io::Result<()>,
T: fmt::Debug,
{
let (sender, recv) = mpsc::sync_channel(0);
thread::scope(|s| {
let h = s.spawn(move || -> io::Result<_> {
let rt = Runtime::new()?;
let _guard = rt.enter();
let mut fut = pin!(self.future);
let mut poll_count = 0;

rt.block_on(future::poll_fn(|cx| {
if poll_count == 0 {
match fut.as_mut().poll(cx) {
Poll::Ready(val) => {
panic!("expected future to be `Pending`, but it is `Ready({val:?})`")
}
Poll::Pending => {
// Waker is now scheduled to be woken when the event of interest occurs.
println!("future is pending; scheduling wakeup");
poll_count += 1;
sender.send(()).unwrap();
return Poll::Pending;
}
}
} else {
// This is called when the `Waker` has been woken up.
match fut.as_mut().poll(cx) {
Poll::Ready(out) => Poll::Ready(out),
Poll::Pending => {
if poll_count >= self.allowed_polls {
panic!("future still `Pending` after {poll_count} polls");
}
poll_count += 1;
Poll::Pending
}
}
}
}))
});

recv.recv().unwrap();

// We've been signaled to invoke `unblocker`.
(self.unblocker)()?;

match h.join() {
Ok(res) => res,
Err(payload) => resume_unwind(payload),
}
})
}
}
}