Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ keylog.txt
*.jpg
*.csv
*.png
docs/site
docs/site
*.pdb
107 changes: 44 additions & 63 deletions src/client/resp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use std::{fmt::Display, sync::Arc};

use arc_swap::ArcSwapOption;
use bytes::Bytes;
use futures_util::TryFutureExt;
use futures_util::{
TryFutureExt,
future::{self, BoxFuture},
};
use http::response::{Parts, Response as HttpResponse};
use http_body_util::BodyExt;
use http_body_util::{BodyExt, Collected};
use pyo3::{coroutine::CancelHandle, prelude::*, pybacked::PyBackedStr};
use wreq::{self, Uri};

Expand All @@ -25,7 +28,6 @@ use crate::{
};

/// A response from a request.
#[derive(Clone)]
#[pyclass(subclass, frozen, str, skip_from_py_object)]
pub struct Response {
uri: Uri,
Expand All @@ -51,69 +53,57 @@ impl Response {
/// Create a new [`Response`] instance.
pub fn new(response: wreq::Response) -> Self {
let uri = response.uri().clone();
let response = HttpResponse::from(response);
let response = HttpResponse::from(response)
.map(Body::Streamable)
.map(ArcSwapOption::from_pointee)
.map(Arc::new);
let (parts, body) = response.into_parts();
Response {
uri,
parts,
body: Arc::new(ArcSwapOption::from_pointee(Body::Streamable(body))),
}
Response { uri, parts, body }
}

/// Builds a `wreq::Response` from the current response metadata and the given body.
///
/// This creates a new HTTP response with the same version, status, headers, and extensions
/// as the current response, but with the provided body.
fn build_response(&self, body: wreq::Body) -> wreq::Response {
let mut response = HttpResponse::new(body);
*response.version_mut() = self.parts.version;
*response.status_mut() = self.parts.status;
*response.headers_mut() = self.parts.headers.clone();
*response.extensions_mut() = self.parts.extensions.clone();
/// Builds a [`wreq::Response`] from the current response metadata and the given body.
fn build_response<T: Into<wreq::Body>>(&self, body: T) -> wreq::Response {
let response = HttpResponse::from_parts(self.parts.clone(), body);
wreq::Response::from(response)
}

/// Creates an empty response with the same metadata but no body content.
///
/// Useful for operations that only need response headers/metadata without consuming the body.
/// Creates an empty [`wreq::Response`] with the same metadata but no body content.
fn empty_response(&self) -> wreq::Response {
self.build_response(wreq::Body::from(Bytes::new()))
self.build_response(Bytes::new())
}

/// Consumes the response body and caches it in memory for reuse.
///
/// If the body is streamable, it will be fully read into memory and cached.
/// If the body is already cached, it will be cloned and reused.
/// Returns an error if the body has already been consumed or if reading fails.
async fn cache_response(self) -> Result<wreq::Response, Error> {
/// Consumes the response [`Body`] and caches it in memory for reuse.
fn cache_response(&self) -> BoxFuture<'static, Result<wreq::Response, Error>> {
if let Some(arc) = self.body.swap(None) {
match Arc::try_unwrap(arc) {
Ok(Body::Streamable(body)) => {
let bytes = BodyExt::collect(body)
.await
.map(|buf| buf.to_bytes())
.map_err(Error::Library)?;

self.body
.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
Ok(self.build_response(wreq::Body::from(bytes)))
let parts = self.parts.clone();
let body = self.body.clone();
match Arc::into_inner(arc) {
Some(Body::Streamable(stream)) => {
return Box::pin(async move {
let bytes = stream
.collect()
.await
.map(Collected::to_bytes)
.map_err(Error::Library)?;

body.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
let response = HttpResponse::from_parts(parts, bytes);
Ok(wreq::Response::from(response))
});
}
Ok(Body::Reusable(bytes)) => {
self.body
.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
Ok(self.build_response(wreq::Body::from(bytes)))
Some(Body::Reusable(bytes)) => {
body.store(Some(Arc::new(Body::Reusable(bytes.clone()))));
let response = HttpResponse::from_parts(parts, bytes);
return Box::pin(future::ok(wreq::Response::from(response)));
}
_ => Err(Error::Memory),
None => unreachable!("Arc should never be empty here"),
}
} else {
Err(Error::Memory)
}

Box::pin(future::err(Error::Memory))
}

/// Consumes the response body for streaming without caching.
///
/// This method transfers ownership of the streamable body for one-time use.
/// Returns an error if the body has already been consumed or is not streamable.
/// Consumes the response [`Body`] for streaming without caching.
fn stream_response(&self) -> Result<wreq::Response, Error> {
if let Some(arc) = self.body.swap(None) {
if let Ok(Body::Streamable(body)) = Arc::try_unwrap(arc) {
Expand All @@ -123,7 +113,7 @@ impl Response {
Err(Error::Memory)
}

/// Forcefully destroys the response body, preventing any further reads.
/// Forcefully destroys the response [`Body`], preventing any further reads.
fn destroy(&self) {
if let Some(body) = self.body.swap(None) {
if let Ok(body) = Arc::try_unwrap(body) {
Expand Down Expand Up @@ -193,8 +183,7 @@ impl Response {
#[getter]
pub fn history(&self, py: Python) -> Vec<History> {
py.detach(|| {
self.clone()
.empty_response()
self.empty_response()
.extensions()
.get::<wreq::redirect::History>()
.map_or_else(Vec::new, |history| {
Expand All @@ -207,8 +196,7 @@ impl Response {
#[getter]
pub fn tls_info(&self, py: Python) -> Option<TlsInfo> {
py.detach(|| {
self.clone()
.empty_response()
self.empty_response()
.extensions()
.get::<wreq::tls::TlsInfo>()
.cloned()
Expand All @@ -218,8 +206,7 @@ impl Response {

/// Turn a response into an error if the server returned an error.
pub fn raise_for_status(&self) -> PyResult<()> {
self.clone()
.empty_response()
self.empty_response()
.error_for_status()
.map(|_| ())
.map_err(Error::Library)
Expand All @@ -241,7 +228,6 @@ impl Response {
encoding: Option<PyBackedStr>,
) -> PyResult<String> {
let fut = self
.clone()
.cache_response()
.and_then(|resp| ResponseExt::text(resp, encoding))
.map_err(Into::into);
Expand All @@ -251,7 +237,6 @@ impl Response {
/// Get the JSON content of the response.
pub async fn json(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult<Json> {
let fut = self
.clone()
.cache_response()
.and_then(ResponseExt::json::<Json>)
.map_err(Into::into);
Expand All @@ -261,7 +246,6 @@ impl Response {
/// Get the bytes content of the response.
pub async fn bytes(&self, #[pyo3(cancel_handle)] cancel: CancelHandle) -> PyResult<PyBuffer> {
let fut = self
.clone()
.cache_response()
.and_then(ResponseExt::bytes)
.map_ok(PyBuffer::from)
Expand Down Expand Up @@ -404,7 +388,6 @@ impl BlockingResponse {
py.detach(|| {
let fut = self
.0
.clone()
.cache_response()
.and_then(|resp| ResponseExt::text(resp, encoding))
.map_err(Into::into);
Expand All @@ -417,7 +400,6 @@ impl BlockingResponse {
py.detach(|| {
let fut = self
.0
.clone()
.cache_response()
.and_then(ResponseExt::json::<Json>)
.map_err(Into::into);
Expand All @@ -430,7 +412,6 @@ impl BlockingResponse {
py.detach(|| {
let fut = self
.0
.clone()
.cache_response()
.and_then(ResponseExt::bytes)
.map_ok(PyBuffer::from)
Expand Down
Loading