Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions lib/wreq_ruby/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,17 @@ def text(default_encoding: "UTF-8")
def json
end

# Get a streaming iterator for the response body, yielding each chunk.
# Stream the response body, yielding each chunk to the given block.
#
# This method allows you to process large HTTP responses efficiently,
# by yielding each chunk of the body as it arrives, without loading
# the entire response into memory.
#
# @return An iterator over response body chunks (binary String)
# @return [nil]
# @yield [chunk] Each chunk of the response body as a binary String
# @raise [LocalJumpError] if called without a block
# @raise [Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError, Wreq::RequestError]
# if streaming fails while reading the response body
# @example Save response to file
# File.open("output.bin", "wb") do |f|
# response.chunks { |chunk| f.write(chunk) }
Expand All @@ -127,7 +130,7 @@ def json
# response.chunks { |chunk| total += chunk.bytesize }
# puts "Downloaded #{total} bytes"
#
# Note: The returned Receiver is only for reading response bodies, not for uploads.
# Exceptions raised inside the block are propagated to the caller.
def chunks
end

Expand Down
23 changes: 9 additions & 14 deletions src/client/body/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::{
};

use crate::{
error::{memory_error, mpsc_send_error_to_magnus},
error::{memory_error, mpsc_send_error_to_magnus, wreq_error_to_magnus},
rt,
};

Expand All @@ -37,20 +37,15 @@ impl BodyReceiver {
pub fn new(stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static) -> BodyReceiver {
BodyReceiver(Mutex::new(Box::pin(stream)))
}
}

impl Iterator for BodyReceiver {
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
rt::maybe_block_on(async {
self.0
.lock()
.await
.as_mut()
.next()
.await
.and_then(|r| r.ok())
/// Read the next body chunk, converting stream errors into Ruby errors.
pub fn next(&self) -> Result<Option<Bytes>, Error> {
rt::try_block_on(async {
match self.0.lock().await.as_mut().next().await {
Some(Ok(data)) => Ok(Some(data)),
Some(Err(err)) => Err(wreq_error_to_magnus(err)),
None => Ok(None),
}
})
}
}
Expand Down
60 changes: 36 additions & 24 deletions src/client/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use bytes::Bytes;
use futures_util::TryFutureExt;
use http::{Extensions, HeaderMap, response::Response as HttpResponse};
use http_body_util::BodyExt;
use magnus::{Error, Module, RArray, RModule, Ruby, Value, block::Yield, scan_args::scan_args};
use magnus::{Error, Module, RArray, RModule, Ruby, Value, scan_args::scan_args};
use wreq::Uri;

use crate::{
client::body::{BodyReceiver, Json},
cookie::Cookie,
error::{memory_error, wreq_error_to_magnus},
gvl,
error::{memory_error, no_block_given_error, wreq_error_to_magnus},
gvl::{self, nogvl},
header::Headers,
http::{StatusCode, Version},
rt,
Expand Down Expand Up @@ -199,12 +199,24 @@ impl Response {
})
}

/// Get a chunk iterator for the response body.
pub fn chunks(&self) -> Result<Yield<BodyReceiver>, Error> {
self.response(true)
.map(wreq::Response::bytes_stream)
.map(BodyReceiver::new)
.map(Yield::Iter)
/// Yield response body chunks to the given Ruby block.
pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result<(), Error> {
if !ruby.block_given() {
return Err(no_block_given_error());
}

let receiver = nogvl(|| {
rb_self
.response(true)
.map(wreq::Response::bytes_stream)
.map(BodyReceiver::new)
})?;

while let Some(chunk) = receiver.next()? {
let _: Value = ruby.yield_value(chunk)?;
}

Ok(())
}

/// Close the response body, dropping any resources.
Expand All @@ -222,23 +234,23 @@ impl Drop for Response {
}

pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> {
let response_class = gem_module.define_class("Response", ruby.class_object())?;
response_class.define_method("code", magnus::method!(Response::code, 0))?;
response_class.define_method("status", magnus::method!(Response::status, 0))?;
response_class.define_method("version", magnus::method!(Response::version, 0))?;
response_class.define_method("url", magnus::method!(Response::url, 0))?;
response_class.define_method(
let response = gem_module.define_class("Response", ruby.class_object())?;
response.define_method("code", magnus::method!(Response::code, 0))?;
response.define_method("status", magnus::method!(Response::status, 0))?;
response.define_method("version", magnus::method!(Response::version, 0))?;
response.define_method("url", magnus::method!(Response::url, 0))?;
response.define_method(
"content_length",
magnus::method!(Response::content_length, 0),
)?;
response_class.define_method("cookies", magnus::method!(Response::cookies, 0))?;
response_class.define_method("headers", magnus::method!(Response::headers, 0))?;
response_class.define_method("local_addr", magnus::method!(Response::local_addr, 0))?;
response_class.define_method("remote_addr", magnus::method!(Response::remote_addr, 0))?;
response_class.define_method("bytes", magnus::method!(Response::bytes, 0))?;
response_class.define_method("text", magnus::method!(Response::text, -1))?;
response_class.define_method("json", magnus::method!(Response::json, 0))?;
response_class.define_method("chunks", magnus::method!(Response::chunks, 0))?;
response_class.define_method("close", magnus::method!(Response::close, 0))?;
response.define_method("cookies", magnus::method!(Response::cookies, 0))?;
response.define_method("headers", magnus::method!(Response::headers, 0))?;
response.define_method("local_addr", magnus::method!(Response::local_addr, 0))?;
response.define_method("remote_addr", magnus::method!(Response::remote_addr, 0))?;
response.define_method("bytes", magnus::method!(Response::bytes, 0))?;
response.define_method("text", magnus::method!(Response::text, -1))?;
response.define_method("json", magnus::method!(Response::json, 0))?;
response.define_method("chunks", magnus::method!(Response::chunks, 0))?;
response.define_method("close", magnus::method!(Response::close, 0))?;
Ok(())
}
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ pub fn interrupt_error() -> MagnusError {
MagnusError::new(ruby!().get_inner(&INTERRUPT_ERROR), "request interrupted")
}

/// LocalJumpError for methods that require a Ruby block.
pub fn no_block_given_error() -> MagnusError {
MagnusError::new(
ruby!().exception_local_jump_error(),
"no block given (yield)",
)
}

/// Map [`tokio::sync::mpsc::error::SendError`] to corresponding [`magnus::Error`]
pub fn mpsc_send_error_to_magnus<T>(err: SendError<T>) -> MagnusError {
MagnusError::new(
Expand Down
18 changes: 0 additions & 18 deletions src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,3 @@ where
})
})
}

/// Block on a future to completion on the global Tokio runtime,
/// returning `None` if cancelled via the provided `CancelFlag`.
#[inline]
pub fn maybe_block_on<F, T>(future: F) -> F::Output
where
F: Future<Output = Option<T>>,
{
gvl::nogvl_cancellable(|flag| {
RUNTIME.block_on(async move {
tokio::select! {
biased;
_ = flag.cancelled() => None,
result = future => result,
}
})
})
}
Loading