Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions src/client/resp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ impl Response {
///
/// This creates a new HTTP response with the same version, status, headers, and extensions
/// as the current response, but with the provided body.
fn build_response(self, body: wreq::Body) -> wreq::Response {
fn build_response(&self, body: wreq::Body) -> wreq::Response {
let mut response = HttpResponse::new(body);
*response.version_mut() = self.parts.version;
*response.status_mut() = self.parts.status;
*response.headers_mut() = self.parts.headers;
*response.extensions_mut() = self.parts.extensions;
*response.headers_mut() = self.parts.headers.clone();
*response.extensions_mut() = self.parts.extensions.clone();
wreq::Response::from(response)
}

/// Creates an empty response with the same metadata but no body content.
///
/// Useful for operations that only need response headers/metadata without consuming the body.
fn empty_response(self) -> wreq::Response {
fn empty_response(&self) -> wreq::Response {
self.build_response(wreq::Body::from(Bytes::new()))
}

Expand Down Expand Up @@ -114,14 +114,29 @@ impl Response {
///
/// This method transfers ownership of the streamable body for one-time use.
/// Returns an error if the body has already been consumed or is not streamable.
fn stream_response(self) -> Result<wreq::Response, Error> {
fn stream_response(&self) -> Result<wreq::Response, Error> {
if let Some(arc) = self.body.swap(None) {
if let Ok(Body::Streamable(body)) = Arc::try_unwrap(arc) {
return Ok(self.build_response(body));
}
}
Err(Error::Memory)
}

/// Forcefully destroys the response body, preventing any further reads.
fn destroy(&self) {
if let Some(body) = self.body.swap(None) {
if let Ok(body) = Arc::try_unwrap(body) {
::std::mem::drop(body);
}
}
}
}

impl Drop for Response {
fn drop(&mut self) {
self.destroy();
}
}

#[pymethods]
Expand Down Expand Up @@ -159,19 +174,19 @@ impl Response {
/// Get the content length of the response.
#[getter]
pub fn content_length(&self, py: Python) -> Option<u64> {
py.detach(|| self.clone().empty_response().content_length())
py.detach(|| self.empty_response().content_length())
}

/// Get the remote address of the response.
#[getter]
pub fn remote_addr(&self, py: Python) -> Option<SocketAddr> {
py.detach(|| self.clone().empty_response().remote_addr().map(SocketAddr))
py.detach(|| self.empty_response().remote_addr().map(SocketAddr))
}

/// Get the local address of the response.
#[getter]
pub fn local_addr(&self, py: Python) -> Option<SocketAddr> {
py.detach(|| self.clone().empty_response().local_addr().map(SocketAddr))
py.detach(|| self.empty_response().local_addr().map(SocketAddr))
}

/// Get the redirect history of the Response.
Expand Down Expand Up @@ -213,8 +228,7 @@ impl Response {

/// Get the response into a `Stream` of `Bytes` from the body.
pub fn stream(&self) -> PyResult<Streamer> {
self.clone()
.stream_response()
self.stream_response()
.map(Streamer::new)
.map_err(Into::into)
}
Expand Down Expand Up @@ -259,20 +273,21 @@ impl Response {
///
/// **Current behavior:**
/// - When connection pooling is **disabled**: This method closes the network connection.
/// - When connection pooling is **enabled**: This method closes the response, prevents further body reads,
/// and returns the connection to the pool for reuse.
/// - When connection pooling is **enabled**: This method closes the response, prevents further
/// body reads, and returns the connection to the pool for reuse.
///
/// **Future changes:**
/// In future versions, this method will be changed to always close the network connection regardless of
/// whether connection pooling is enabled or not.
/// In future versions, this method will be changed to always close the network connection
/// regardless of whether connection pooling is enabled or not.
///
/// **Recommendation:**
/// It is **not recommended** to manually call this method at present. Instead, use context managers
/// (async with statement) to properly manage response lifecycle. Wait for the improved implementation
/// in future versions.
pub async fn close(&self) -> PyResult<()> {
self.body.swap(None);
Ok(())
/// It is **not recommended** to manually call this method at present. Instead, use context
/// managers (async with statement) to properly manage response lifecycle. Wait for the
/// improved implementation in future versions.
pub async fn close(&self) {
Python::attach(|py| {
py.detach(|| self.destroy());
});
}
}

Expand All @@ -284,12 +299,7 @@ impl Response {
}

#[inline]
async fn __aexit__(
&self,
_exc_type: Py<PyAny>,
_exc_val: Py<PyAny>,
_traceback: Py<PyAny>,
) -> PyResult<()> {
async fn __aexit__(&self, _exc_type: Py<PyAny>, _exc_val: Py<PyAny>, _traceback: Py<PyAny>) {
self.close().await
}
}
Expand All @@ -308,6 +318,12 @@ impl Display for Response {

// ===== impl BlockingResponse =====

impl Drop for BlockingResponse {
fn drop(&mut self) {
self.0.destroy();
}
}

#[pymethods]
impl BlockingResponse {
/// Get the URL of the response.
Expand Down Expand Up @@ -427,20 +443,20 @@ impl BlockingResponse {
///
/// **Current behavior:**
/// - When connection pooling is **disabled**: This method closes the network connection.
/// - When connection pooling is **enabled**: This method closes the response, prevents further body reads,
/// and returns the connection to the pool for reuse.
/// - When connection pooling is **enabled**: This method closes the response, prevents further
/// body reads, and returns the connection to the pool for reuse.
///
/// **Future changes:**
/// In future versions, this method will be changed to always close the network connection regardless of
/// whether connection pooling is enabled or not.
/// In future versions, this method will be changed to always close the network connection
/// regardless of whether connection pooling is enabled or not.
///
/// **Recommendation:**
/// It is **not recommended** to manually call this method at present. Instead, use context managers
/// (with statement) to properly manage response lifecycle. Wait for the improved implementation
/// in future versions.
/// It is **not recommended** to manually call this method at present. Instead, use context
/// managers (with statement) to properly manage response lifecycle. Wait for the improved
/// implementation in future versions.
#[inline]
pub fn close(&self, py: Python) {
py.detach(|| self.0.body.swap(None));
py.detach(|| self.0.destroy());
}
}

Expand Down
32 changes: 18 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

mod r#async {
use crate::client::{
Client,
req::Request,
req::WebSocketRequest,
resp::{Response, WebSocket},
};
use crate::http::Method;
use pyo3::{coroutine::CancelHandle, prelude::*, pybacked::PyBackedStr};

use crate::{
client::{
Client,
req::{Request, WebSocketRequest},
resp::{Response, WebSocket},
},
http::Method,
};

/// Make a GET request with the given parameters.
#[inline]
#[pyfunction]
Expand Down Expand Up @@ -198,15 +200,17 @@ mod r#async {
}

mod blocking {
use crate::client::{
BlockingClient,
req::Request,
req::WebSocketRequest,
resp::{BlockingResponse, BlockingWebSocket},
};
use crate::http::Method;
use pyo3::{prelude::*, pybacked::PyBackedStr};

use crate::{
client::{
BlockingClient,
req::{Request, WebSocketRequest},
resp::{BlockingResponse, BlockingWebSocket},
},
http::Method,
};

/// Make a GET request with the given parameters (blocking).
#[inline]
#[pyfunction]
Expand Down
Loading