Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 12 additions & 29 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

//! Thin zero-copy-related wrappers around functions.

#[cfg(any(target_os = "linux", target_os = "android"))]
use crate::io::{RawReader, RawWriter};
#[cfg(any(target_os = "linux", target_os = "android"))]
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -106,11 +108,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd,
{
pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
Expand All @@ -134,10 +132,11 @@ where
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using unbuffered read/write. Then
// we tell the caller to fall back.
// use read_to_end to drain pipe for the case write failed
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&dest).write_all(&drain)?;
RawWriter(&dest).write_all(&drain)?;
return Ok(true);
}
}
Expand All @@ -152,11 +151,7 @@ where
/// (the fallback will be embedded to this function in the future)
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_auto<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd,
{
pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
// use splice to check that input or output is pipe which is efficient
let fallback = match splice(&source, dest, MAX_ROOTLESS_PIPE_SIZE) {
Ok(_) => splice_unbounded(source, dest)?,
Expand All @@ -169,11 +164,7 @@ where
/// return actually sent bytes
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn send_n_bytes(
input: impl Read + AsFd,
mut target: impl Write + AsFd,
n: u64,
) -> std::io::Result<u64> {
pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result<u64> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
let mut n = n;
Expand Down Expand Up @@ -222,10 +213,10 @@ pub fn send_n_bytes(
}
} else {
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
// drain pipe before fallback to raw write
// use read_to_end to drain pipe at this fallback for the case write failed
let mut drain = Vec::with_capacity(s);
broker_r.take(s as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&target).write_all(&drain)?;
RawWriter(&target).write_all(&drain)?;
break true;
}
}
Expand All @@ -239,17 +230,9 @@ pub fn send_n_bytes(
if !fallback {
return Ok(bytes_written);
}
let mut reader = input.take(n);
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
loop {
match reader.read(&mut buf)? {
0 => return Ok(bytes_written),
n => {
target.write_all(&buf[..n])?;
bytes_written += n as u64;
}
}
}
// do not buffer at this fallback, or order of output would be wrong with multiple input
bytes_written += std::io::copy(&mut RawReader(input).take(n), &mut RawWriter(target))?;
Ok(bytes_written)
}

/// Return verified /dev/null
Expand Down
Loading