Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ lru = { workspace = true, optional = true }

[target.'cfg(unix)'.dependencies]
daemonize = "0.5.0"
nix = "~0.24.3"
nix = { version = "~0.31.1", features = ["socket", "net", "fs", "uio"] }

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] }
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/protocols/l4/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl SocketAddr {
fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option<SocketAddr> {
if let Some(v4) = sock.as_sockaddr_in() {
return Some(SocketAddr::Inet(StdSockAddr::V4(
std::net::SocketAddrV4::new(v4.ip().into(), v4.port()),
std::net::SocketAddrV4::new(v4.ip(), v4.port()),
)));
} else if let Some(v6) = sock.as_sockaddr_in6() {
return Some(SocketAddr::Inet(StdSockAddr::V6(
Expand Down
9 changes: 5 additions & 4 deletions pingora-core/src/protocols/l4/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl RawStreamWrapper {
#[cfg(target_os = "linux")]
enable_rx_ts: false,
#[cfg(target_os = "linux")]
reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec),
reusable_cmsg_space: nix::cmsg_space!(nix::sys::socket::Timestamps),
}
}

Expand Down Expand Up @@ -240,7 +240,8 @@ impl AsyncRead for RawStreamWrapper {
as *mut [u8])
};
let mut iov = [IoSliceMut::new(b)];
rs_wrapper.reusable_cmsg_space.clear();

rs_wrapper.reusable_cmsg_space.fill(0);

match s.try_io(Interest::READABLE, || {
recvmsg::<SockaddrStorage>(
Expand All @@ -253,7 +254,7 @@ impl AsyncRead for RawStreamWrapper {
}) {
Ok(r) => {
if let Some(ControlMessageOwned::ScmTimestampsns(rtime)) = r
.cmsgs()
.cmsgs()?
.find(|i| matches!(i, ControlMessageOwned::ScmTimestampsns(_)))
{
// The returned timestamp is a real (i.e. not monotonic) timestamp
Expand Down Expand Up @@ -432,7 +433,7 @@ impl Stream {
if let RawStream::Tcp(s) = &self.stream_mut().get_mut().stream {
let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
| TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
setsockopt(&s, sockopt::Timestamping, &timestamp_options)
.or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?;
self.stream_mut().get_mut().enable_rx_ts(true);
}
Expand Down
28 changes: 16 additions & 12 deletions pingora-core/src/server/transfer_fd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
use log::{debug, error, warn};
use nix::errno::Errno;
#[cfg(target_os = "linux")]
use nix::sys::socket::{self, AddressFamily, RecvMsg, SockFlag, SockType, UnixAddr};
use nix::sys::socket::{self, AddressFamily, Backlog, RecvMsg, SockFlag, SockType, UnixAddr};
#[cfg(target_os = "linux")]
use nix::sys::stat;
use nix::{Error, NixPath};
use std::collections::HashMap;
use std::io::Write;
#[cfg(target_os = "linux")]
use std::io::{IoSlice, IoSliceMut};
#[cfg(target_os = "linux")]
use std::os::fd::{AsRawFd, BorrowedFd};
use std::os::unix::io::RawFd;
#[cfg(target_os = "linux")]
use std::{thread, time};
Expand Down Expand Up @@ -127,20 +129,20 @@ where
// TODO: warn if exist but not able to unlink
}
};
socket::bind(listen_fd, &unix_addr).unwrap();
socket::bind(listen_fd.as_raw_fd(), &unix_addr).unwrap();

/* sock is created before we change user, need to give permission to all */
stat::fchmodat(
None,
unsafe { BorrowedFd::borrow_raw(libc::AT_FDCWD) },
path,
stat::Mode::all(),
stat::FchmodatFlags::FollowSymlink,
)
.unwrap();

socket::listen(listen_fd, 8).unwrap();
socket::listen(&listen_fd, Backlog::new(8).unwrap()).unwrap();

let fd = match accept_with_retry_timeout(listen_fd, max_retry) {
let fd = match accept_with_retry_timeout(listen_fd.as_raw_fd(), max_retry) {
Ok(fd) => fd,
Err(e) => {
error!("Giving up reading socket from: {path}, error: {e:?}");
Expand All @@ -163,7 +165,7 @@ where
.unwrap();

let mut fds: Vec<RawFd> = Vec::new();
for cmsg in msg.cmsgs() {
for cmsg in msg.cmsgs()? {
if let socket::ControlMessageOwned::ScmRights(mut vec_fds) = cmsg {
fds.append(&mut vec_fds)
} else {
Expand Down Expand Up @@ -250,7 +252,7 @@ where
let mut nonblocking_polls = 0;

let conn_result: Result<usize, Error> = loop {
match socket::connect(send_fd, &unix_addr) {
match socket::connect(send_fd.as_raw_fd(), &unix_addr) {
Ok(_) => break Ok(0),
Err(e) => match e {
/* If the new process hasn't created the upgrade sock we'll get an ENOENT.
Expand Down Expand Up @@ -295,7 +297,7 @@ where
let cmsg = [scm; 1];
loop {
match socket::sendmsg(
send_fd,
send_fd.as_raw_fd(),
&io_vec,
&cmsg,
socket::MsgFlags::empty(),
Expand Down Expand Up @@ -347,6 +349,8 @@ where
#[cfg(test)]
#[cfg(target_os = "linux")]
mod tests {
use std::os::fd::AsRawFd;

use super::*;
use log::{debug, error};

Expand Down Expand Up @@ -415,7 +419,7 @@ mod tests {
assert_eq!(1, buf[31]);
});

let fds = vec![dumb_fd];
let fds = vec![dumb_fd.as_raw_fd()];
let buf: [u8; 128] = [1; 128];
match send_fds_to(fds, &buf, "/tmp/pingora_fds_receive.sock", None) {
Ok(sent) => {
Expand All @@ -442,7 +446,7 @@ mod tests {
None,
)
.unwrap();
fds.add(key1.clone(), dumb_fd1);
fds.add(key1.clone(), dumb_fd1.as_raw_fd());
let key2 = "1.1.1.1:443".to_string();
let dumb_fd2 = socket::socket(
AddressFamily::Unix,
Expand All @@ -451,7 +455,7 @@ mod tests {
None,
)
.unwrap();
fds.add(key2.clone(), dumb_fd2);
fds.add(key2.clone(), dumb_fd2.as_raw_fd());

let child = thread::spawn(move || {
let mut fds2 = Fds::new();
Expand All @@ -478,7 +482,7 @@ mod tests {
)
.unwrap();

let fds = vec![dumb_fd];
let fds = vec![dumb_fd.as_raw_fd()];
let buf: [u8; 32] = [1; 32];

// Try to send with a custom max_retries of 2
Expand Down