Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ doc = false
name = "wreq_ruby"

[dependencies]
magnus = { version = "0.8.2", features = ["bytes"] }
magnus = { version = "0.8.2", features = ["bytes", "rb-sys"] }
rb-sys = { version = "0.9.128", default-features = false }
tokio = { version = "1.52.3", features = ["full"] }
wreq = { git = "https://github.com/0x676e67/wreq", features = [
Expand Down
2 changes: 1 addition & 1 deletion src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use magnus::{
pub use self::{
form::Form,
json::Json,
stream::{BodyReceiver, BodySender, ReceiverStream},
stream::{BodySender, ReceiverStream},
};

/// Represents the body of an HTTP request.
Expand Down
43 changes: 4 additions & 39 deletions src/client/body/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,11 @@ use std::{
};

use bytes::Bytes;
use futures_util::{Stream, StreamExt, TryFutureExt};
use futures_util::{Stream, TryFutureExt};
use magnus::{Error, RString, TryConvert, Value};
use tokio::sync::{
Mutex,
mpsc::{self},
};

use crate::{
error::{memory_error, mpsc_send_error_to_magnus},
rt,
};
use tokio::sync::mpsc;

/// A receiver for streaming HTTP response bodies.
pub struct BodyReceiver(Mutex<Pin<Box<dyn Stream<Item = wreq::Result<Bytes>> + Send>>>);
use crate::error::{memory_error, mpsc_send_error_to_magnus};

/// A sender for streaming HTTP request bodies.
#[magnus::wrap(class = "Wreq::BodySender", free_immediately, size)]
Expand All @@ -29,32 +20,6 @@ struct InnerBodySender {
rx: Option<mpsc::Receiver<Bytes>>,
}

// ===== impl BodyReceiver =====

impl BodyReceiver {
/// Create a new [`BodyReceiver`] instance.
#[inline]
pub fn new(stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static) -> BodyReceiver {
BodyReceiver(Mutex::new(Box::pin(stream)))
}
}

impl Iterator for BodyReceiver {
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
rt::maybe_block_on(async {
self.0
.lock()
.await
.as_mut()
.next()
.await
.and_then(|r| r.ok())
})
}
}

// ===== impl BodySender =====

impl BodySender {
Expand All @@ -78,7 +43,7 @@ impl BodySender {
let bytes = data.to_bytes();
let inner = rb_self.0.read().unwrap();
if let Some(ref tx) = inner.tx {
rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?;
crate::rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?;
}
Ok(())
}
Expand Down
137 changes: 128 additions & 9 deletions src/client/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use std::{net::SocketAddr, sync::Arc};

use arc_swap::ArcSwapOption;
use bytes::Bytes;
use futures_util::TryFutureExt;
use futures_util::{StreamExt, TryFutureExt};
use http::{Extensions, HeaderMap, response::Response as HttpResponse};
use http_body_util::BodyExt;
use magnus::{Error, Module, RArray, RModule, Ruby, Value, block::Yield, scan_args::scan_args};
use magnus::{
Error, IntoValue, Module, RArray, RModule, Ruby, Value, block::Proc, scan_args::scan_args,
value::ReprValue,
};
use wreq::Uri;

use crate::{
client::body::{BodyReceiver, Json},
client::body::Json,
cookie::Cookie,
error::{memory_error, wreq_error_to_magnus},
gvl,
Expand All @@ -18,6 +21,22 @@ use crate::{
rt,
};

// RAII wrapper that calls rb_gc_unregister_address on drop, ensuring the GC
// registration is always cleaned up regardless of how the scope exits (normal
// return, early error return via `?`, or panic).
struct GcGuard(*mut rb_sys::VALUE);

impl Drop for GcGuard {
fn drop(&mut self) {
unsafe { rb_sys::rb_gc_unregister_address(self.0) }
}
}

// SAFETY: GcGuard is only ever created while holding the GVL, and its Drop
// runs either on the same thread or inside with_gvl (also GVL-held).
// The pointer it holds is into a Box that outlives the guard.
unsafe impl Send for GcGuard {}

/// A response from a request.
#[magnus::wrap(class = "Wreq::Response", free_immediately, size)]
pub struct Response {
Expand Down Expand Up @@ -199,12 +218,112 @@ impl Response {
})
}

/// Get a chunk iterator for the response body.
pub fn chunks(&self) -> Result<Yield<BodyReceiver>, Error> {
self.response(true)
.map(wreq::Response::bytes_stream)
.map(BodyReceiver::new)
.map(Yield::Iter)
/// Stream the response body, yielding each chunk to the given block with
/// proper GVL management.
///
/// The iteration loop is driven from Rust:
/// 1. GVL is released while waiting for the next chunk (network I/O)
/// 2. GVL is re-acquired to yield the chunk to the Ruby block
/// 3. GVL is released again for the next I/O operation
///
/// This allows other Ruby threads to run during network I/O, and ensures
/// streaming errors are properly propagated instead of silently swallowed.
pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result<Value, Error> {
if unsafe { rb_sys::rb_block_given_p() == 0 } {
return Err(Error::new(
ruby.exception_local_jump_error(),
"no block given (yield)",
));
}

// FIX (issue 3): response() is called FIRST, before any GC registration.
// If it fails and returns Err, we exit here without ever registering
// anything — so there is nothing to unregister.
let response = rb_self.response(true)?;
let stream = response.bytes_stream();

// Heap-allocate the block VALUE so rb_gc_register_address has a stable
// address to track. GcGuard guarantees rb_gc_unregister_address is called
// on every exit path (FIX issues 3 and 5).
let mut block_raw = Box::new(unsafe { rb_sys::rb_block_proc() });
let block_ptr: *mut rb_sys::VALUE = block_raw.as_mut();
unsafe { rb_sys::rb_gc_register_address(block_ptr) };
let _gc_guard = GcGuard(block_ptr); // dropped at end of scope unconditionally

// FIX (issue 2): capture the heap address as `usize` (Copy + Send) rather
// than as `*mut VALUE` (!Send). The pointer is reconstructed inside the
// loop, only when needed, from within a with_gvl callback.
let block_addr: usize = block_ptr as usize;

// Drive the streaming loop without the GVL.
// FIX (issue 1): `ruby: &Ruby` is NOT moved into the async block.
// The Ruby handle is obtained fresh via Ruby::get() inside with_gvl,
// where we are guaranteed to hold the GVL. Capturing &Ruby across
// GVL releases is semantically wrong even though Ruby is a ZST.
let result = gvl::nogvl_cancellable(|flag| {
rt::runtime().block_on(async move {
let mut stream = Box::pin(stream);
loop {
let chunk = tokio::select! {
biased;
_ = flag.cancelled() => return Err(crate::error::interrupt_error()),
result = stream.next() => result,
};

match chunk {
Some(Ok(bytes)) => {
// FIX (issue 2): reconstruct the pointer from usize here,
// inside the closure, rather than at capture time.
// Read the current VALUE — GC compaction may have updated
// the referent via the registered address.
let current_block_raw =
unsafe { *(block_addr as *const rb_sys::VALUE) };

let yield_result: Result<(), Error> =
tokio::task::block_in_place(|| {
gvl::with_gvl(|| {
// FIX (issue 1): obtain Ruby handle fresh now
// that we hold the GVL. Never use a captured
// &Ruby across a GVL release.
let ruby = magnus::Ruby::get()
.expect("Ruby::get() failed inside with_gvl — GVL not held as expected");

let block_value = unsafe {
magnus::rb_sys::FromRawValue::from_raw(
current_block_raw,
)
};

// FIX (issue 6): accurate error message.
// This path means VALUE reconstruction failed,
// not that GC collected the block (the GcGuard
// prevents that).
let block =
Proc::from_value(block_value).ok_or_else(|| {
Error::new(
ruby.exception_runtime_error(),
"invalid block VALUE: reconstruction failed \
(this is a wreq-ruby bug, not GC collection)",
)
})?;

let chunk_value = bytes.into_value_with(&ruby);
block.call::<_, Value>((chunk_value,))?;
Ok(())
})
});
yield_result?;
}
Some(Err(e)) => return Err(wreq_error_to_magnus(e)),
None => return Ok(()),
}
}
})
});
// _gc_guard drops here — rb_gc_unregister_address called unconditionally.

result?;
Ok(ruby.qnil().as_value())
}

/// Close the response body, dropping any resources.
Expand Down
57 changes: 56 additions & 1 deletion src/gvl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{ffi::c_void, mem::MaybeUninit, ptr::null_mut};

use rb_sys::rb_thread_call_without_gvl;
use rb_sys::{rb_thread_call_with_gvl, rb_thread_call_without_gvl};
use tokio::sync::watch;

/// Container for safely passing closure and result through C callback.
Expand Down Expand Up @@ -77,6 +77,61 @@ unsafe extern "C" fn unblock_func(arg: *mut c_void) {
}
}

// ── Separate arg container for with_gvl ──────────────────────────────────────
//
// Uses Option<R> instead of MaybeUninit<R>. If the closure panics and never
// writes a result, args.result stays None and the subsequent .expect() gives a
// clear panic message rather than reading uninitialized memory (which would be
// UB). The FFI unwind is still UB, but this is the best we can do short of
// catching the panic before the FFI boundary.

struct GvlArgs<F, R> {
func: Option<F>,
result: Option<R>,
}

unsafe extern "C" fn call_with_gvl<F, R>(arg: *mut c_void) -> *mut c_void
where
F: FnOnce() -> R,
{
let args = unsafe { &mut *(arg as *mut GvlArgs<F, R>) };
let func = args.func.take().expect("call_with_gvl called twice");
args.result = Some(func());
null_mut()
}

/// Executes the given closure while holding the Ruby GVL.
///
/// Must be called from a context where the GVL has been released
/// (e.g., inside a [`nogvl`] or [`nogvl_cancellable`] callback).
/// Re-acquires the GVL, runs the closure, then releases it again.
///
/// # Safety
///
/// The closure MUST NOT panic. A panic unwinds through the FFI boundary,
/// which is undefined behavior. Unlike `nogvl` (which uses `MaybeUninit`),
/// this uses `Option<R>` so a failed result produces a clear `.expect()`
/// message rather than silent UB — but the FFI unwind remains UB regardless.
pub fn with_gvl<F, R>(func: F) -> R
where
F: FnOnce() -> R,
R: Sized,
{
let mut args = GvlArgs {
func: Some(func),
result: None,
};

let arg_ptr = &mut args as *mut _ as *mut c_void;

unsafe {
rb_thread_call_with_gvl(Some(call_with_gvl::<F, R>), arg_ptr);
}

args.result
.expect("with_gvl: closure did not produce a result (panic crossed FFI boundary?)")
}

/// Executes the given closure without holding the Ruby GVL (Global VM Lock).
///
/// WARNING: Do NOT nest calls to [`nogvl`] or [`nogvl_cancellable`] inside each other.
Expand Down
24 changes: 6 additions & 18 deletions src/rt.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::future::Future;
use std::sync::LazyLock;

use tokio::runtime::{Builder, Runtime};
Expand All @@ -11,6 +12,11 @@ static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
.expect("Failed to initialize Tokio runtime")
});

/// Returns a reference to the global Tokio runtime.
pub(crate) fn runtime() -> &'static Runtime {
&RUNTIME
}

/// Block on a future to completion on the global Tokio runtime,
/// with support for cancellation via the provided `CancelFlag`.
pub fn try_block_on<F, T>(future: F) -> F::Output
Expand All @@ -27,21 +33,3 @@ where
})
})
}

/// Block on a future to completion on the global Tokio runtime,
/// returning `None` if cancelled via the provided `CancelFlag`.
#[inline]
pub fn maybe_block_on<F, T>(future: F) -> F::Output
where
F: Future<Output = Option<T>>,
{
gvl::nogvl_cancellable(|flag| {
RUNTIME.block_on(async move {
tokio::select! {
biased;
_ = flag.cancelled() => None,
result = future => result,
}
})
})
}
Loading