Skip to content

Commit 9bed8e8

Browse files
madeyeclaude
andcommitted
Optimize throughput: 128K tunnel buffers, streaming bodies, connection pooling
- Increase CONNECT tunnel buffer from 8 KiB to 128 KiB (copy_bidirectional_with_sizes) - Stream HTTP forward response bodies via Either<Full, Incoming> instead of buffering - Add pooled hyper_util legacy Client for HTTP forward (OnceLock, 90s idle, 32/host) - Add throughput tests: 64 MB CONNECT, 32 MB forward, 500-req rate test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 47fb46c commit 9bed8e8

File tree

6 files changed

+317
-68
lines changed

6 files changed

+317
-68
lines changed

src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ pub mod tls;
2222

2323
use std::sync::Arc;
2424

25-
use http_body_util::Full;
25+
use http_body_util::{Either, Full};
2626
use hyper::body::{Bytes, Incoming};
2727
use hyper::service::service_fn;
2828
use hyper::{Method, Request, Response};
2929
use hyper_util::rt::TokioIo;
30+
31+
/// Response body type: either a buffered `Full<Bytes>` or a streaming `Incoming`.
32+
pub type ProxyBody = Either<Full<Bytes>, Incoming>;
3033
use tokio_rustls::TlsAcceptor;
3134
use tokio_util::sync::CancellationToken;
3235
use tracing::error;
@@ -37,7 +40,7 @@ use crate::config::Config;
3740
pub async fn handle_request(
3841
req: Request<Incoming>,
3942
config: &Config,
40-
) -> Result<Response<Full<Bytes>>, anyhow::Error> {
43+
) -> Result<Response<ProxyBody>, anyhow::Error> {
4144
if !stealth::is_proxy_request(&req) {
4245
return Ok(stealth::fake_404(&config.stealth.server_name));
4346
}

src/proxy.rs

Lines changed: 87 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,35 @@
22
//!
33
//! - [`handle_connect`]: Upgrades the client connection and tunnels bytes
44
//! bidirectionally to the target via [`tokio::io::copy_bidirectional`].
5-
//! - [`handle_forward`]: Rewrites the absolute URI to path-only form, strips
6-
//! proxy headers, and forwards the request via hyper's HTTP/1.1 client.
5+
//! - [`handle_forward`]: Strips proxy headers and forwards the request via
6+
//! a pooled HTTP client (or manual connection with TCP Fast Open).
77
88
use anyhow::Context;
9-
use http_body_util::{BodyExt, Full};
9+
use http_body_util::{Either, Full};
1010
use hyper::body::{Bytes, Incoming};
1111
use hyper::{Request, Response, StatusCode};
12-
use hyper_util::rt::TokioIo;
12+
use hyper_util::client::legacy::Client;
13+
use hyper_util::rt::{TokioExecutor, TokioIo};
1314
use tracing::{error, info};
1415

1516
use crate::net;
17+
use crate::ProxyBody;
18+
19+
/// Buffer size per direction for CONNECT tunnels (128 KiB).
20+
/// 16x the default 8 KiB — matches TLS record size and reduces syscall count.
21+
const TUNNEL_BUF_SIZE: usize = 128 * 1024;
22+
23+
/// Global pooled HTTP/1.1 client for forward proxying (non-TFO path).
24+
static POOLED_CLIENT: std::sync::OnceLock<Client<hyper_util::client::legacy::connect::HttpConnector, Incoming>> = std::sync::OnceLock::new();
25+
26+
fn get_pooled_client() -> &'static Client<hyper_util::client::legacy::connect::HttpConnector, Incoming> {
27+
POOLED_CLIENT.get_or_init(|| {
28+
Client::builder(TokioExecutor::new())
29+
.pool_idle_timeout(std::time::Duration::from_secs(90))
30+
.pool_max_idle_per_host(32)
31+
.build_http()
32+
})
33+
}
1634

1735
/// Handle an HTTP `CONNECT` request by establishing a TCP tunnel.
1836
///
@@ -23,7 +41,7 @@ use crate::net;
2341
pub async fn handle_connect(
2442
req: Request<Incoming>,
2543
fast_open: bool,
26-
) -> anyhow::Result<Response<Full<Bytes>>> {
44+
) -> anyhow::Result<Response<ProxyBody>> {
2745
let authority = req
2846
.uri()
2947
.authority()
@@ -49,7 +67,7 @@ pub async fn handle_connect(
4967
match net::connect(&addr, fast_open).await {
5068
Ok(mut target) => {
5169
if let Err(e) =
52-
tokio::io::copy_bidirectional(&mut client, &mut target).await
70+
tokio::io::copy_bidirectional_with_sizes(&mut client, &mut target, TUNNEL_BUF_SIZE, TUNNEL_BUF_SIZE).await
5371
{
5472
error!("tunnel {addr} io error: {e}");
5573
}
@@ -68,79 +86,87 @@ pub async fn handle_connect(
6886
// Return 200 to signal the client that the tunnel is established.
6987
Ok(Response::builder()
7088
.status(StatusCode::OK)
71-
.body(Full::new(Bytes::new()))
89+
.body(Either::Left(Full::new(Bytes::new())))
7290
.unwrap())
7391
}
7492

7593
/// Handle a plain HTTP forward proxy request with an absolute URI.
7694
///
77-
/// Rewrites the request URI from absolute form (`http://host/path`) to
78-
/// path-only (`/path`), removes `Proxy-Authorization` and `Proxy-Connection`
79-
/// headers, connects to the upstream server, and relays the response back
80-
/// to the client.
95+
/// For the common case (`fast_open = false`), uses a pooled HTTP client that
96+
/// reuses connections across requests. For `fast_open = true`, uses manual
97+
/// connection setup with TCP Fast Open.
98+
///
99+
/// The response body is streamed directly from the upstream server without
100+
/// buffering the entire body in memory.
81101
pub async fn handle_forward(
82102
mut req: Request<Incoming>,
83103
fast_open: bool,
84-
) -> anyhow::Result<Response<Full<Bytes>>> {
104+
) -> anyhow::Result<Response<ProxyBody>> {
85105
let uri = req.uri().clone();
86-
let host = uri
87-
.authority()
88-
.context("missing authority in forward request")?
89-
.to_string();
90-
91-
let port = uri.port_u16().unwrap_or(match uri.scheme_str() {
92-
Some("https") => 443,
93-
_ => 80,
94-
});
95-
96-
let addr = if host.contains(':') {
97-
host.clone()
98-
} else {
99-
format!("{host}:{port}")
100-
};
101-
102-
info!("forward {} {} -> {addr}", req.method(), uri);
103106

104-
// Rewrite the URI to path-only form for the upstream request.
105-
let path_and_query = uri
106-
.path_and_query()
107-
.map(|pq| pq.to_string())
108-
.unwrap_or_else(|| "/".to_string());
109-
*req.uri_mut() = path_and_query.parse()?;
107+
info!("forward {} {}", req.method(), uri);
110108

111109
// Strip hop-by-hop / proxy headers.
112110
let headers = req.headers_mut();
113111
headers.remove("proxy-authorization");
114112
headers.remove("proxy-connection");
115113

116-
// Connect to the upstream server.
117-
let stream = net::connect(&addr, fast_open)
118-
.await
119-
.with_context(|| format!("connect to {addr}"))?;
120-
let io = TokioIo::new(stream);
114+
if fast_open {
115+
// TFO path: manual connection (pooled client doesn't support custom connectors yet)
116+
let host = uri
117+
.authority()
118+
.context("missing authority in forward request")?
119+
.to_string();
121120

122-
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
123-
.await
124-
.context("upstream handshake")?;
125-
126-
tokio::spawn(async move {
127-
if let Err(e) = conn.await {
128-
error!("upstream connection error: {e}");
129-
}
130-
});
121+
let port = uri.port_u16().unwrap_or(match uri.scheme_str() {
122+
Some("https") => 443,
123+
_ => 80,
124+
});
131125

132-
let resp = sender
133-
.send_request(req)
134-
.await
135-
.context("upstream send_request")?;
126+
let addr = if host.contains(':') {
127+
host.clone()
128+
} else {
129+
format!("{host}:{port}")
130+
};
131+
132+
// Rewrite the URI to path-only form for the upstream request.
133+
let path_and_query = uri
134+
.path_and_query()
135+
.map(|pq| pq.to_string())
136+
.unwrap_or_else(|| "/".to_string());
137+
*req.uri_mut() = path_and_query.parse()?;
138+
139+
let stream = net::connect(&addr, true)
140+
.await
141+
.with_context(|| format!("connect to {addr}"))?;
142+
let io = TokioIo::new(stream);
143+
144+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
145+
.await
146+
.context("upstream handshake")?;
147+
148+
tokio::spawn(async move {
149+
if let Err(e) = conn.await {
150+
error!("upstream connection error: {e}");
151+
}
152+
});
136153

137-
// Collect the upstream response body.
138-
let (parts, body) = resp.into_parts();
139-
let body_bytes = body
140-
.collect()
141-
.await
142-
.context("read upstream body")?
143-
.to_bytes();
154+
let resp = sender
155+
.send_request(req)
156+
.await
157+
.context("upstream send_request")?;
144158

145-
Ok(Response::from_parts(parts, Full::new(body_bytes)))
159+
let (parts, body) = resp.into_parts();
160+
Ok(Response::from_parts(parts, Either::Right(body)))
161+
} else {
162+
// Pooled client path: connection reuse, automatic URI handling
163+
let client = get_pooled_client();
164+
let resp = client
165+
.request(req)
166+
.await
167+
.context("pooled client request")?;
168+
169+
let (parts, body) = resp.into_parts();
170+
Ok(Response::from_parts(parts, Either::Right(body)))
171+
}
146172
}

src/stealth.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
//! server. Proxy requests with missing/invalid auth get a `407` so that
66
//! real clients (e.g. Chrome) can send credentials.
77
8-
use http_body_util::Full;
8+
use http_body_util::{Either, Full};
99
use hyper::body::{Bytes, Incoming};
1010
use hyper::{Method, Request, Response, StatusCode, Version};
1111

12+
use crate::ProxyBody;
13+
1214
/// Returns `true` if the request is a proxy request.
1315
///
1416
/// A request is considered a proxy request if it uses the `CONNECT` method
@@ -35,7 +37,7 @@ pub fn is_proxy_request(req: &Request<Incoming>) -> bool {
3537
///
3638
/// The response includes the configured `Server` header and an HTML body
3739
/// identical to what nginx produces for a missing page.
38-
pub fn fake_404(server_name: &str) -> Response<Full<Bytes>> {
40+
pub fn fake_404(server_name: &str) -> Response<ProxyBody> {
3941
let body = concat!(
4042
"<html>\r\n",
4143
"<head><title>404 Not Found</title></head>\r\n",
@@ -51,7 +53,7 @@ pub fn fake_404(server_name: &str) -> Response<Full<Bytes>> {
5153
.header("Server", server_name)
5254
.header("Content-Type", "text/html")
5355
.header("Content-Length", body.len().to_string())
54-
.body(Full::new(Bytes::from(body)))
56+
.body(Either::Left(Full::new(Bytes::from(body))))
5557
.unwrap()
5658
}
5759

@@ -60,12 +62,12 @@ pub fn fake_404(server_name: &str) -> Response<Full<Bytes>> {
6062
/// Sent when a proxy request (CONNECT or absolute URI) arrives without
6163
/// valid credentials. The `Proxy-Authenticate` header tells clients like
6264
/// Chrome to prompt for or resend credentials.
63-
pub fn proxy_auth_required(server_name: &str) -> Response<Full<Bytes>> {
65+
pub fn proxy_auth_required(server_name: &str) -> Response<ProxyBody> {
6466
Response::builder()
6567
.status(StatusCode::PROXY_AUTHENTICATION_REQUIRED)
6668
.header("Server", server_name)
6769
.header("Proxy-Authenticate", "Basic realm=\"Restricted\"")
6870
.header("Content-Length", "0")
69-
.body(Full::new(Bytes::new()))
71+
.body(Either::Left(Full::new(Bytes::new())))
7072
.unwrap()
7173
}

tests/common/data_server.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#![allow(dead_code)]
2+
3+
use std::net::SocketAddr;
4+
5+
use http_body_util::Full;
6+
use hyper::body::{Bytes, Incoming};
7+
use hyper::server::conn::http1;
8+
use hyper::service::service_fn;
9+
use hyper::{Request, Response};
10+
use hyper_util::rt::TokioIo;
11+
use tokio::net::TcpListener;
12+
use tokio_util::sync::CancellationToken;
13+
14+
/// A simple HTTP server that returns a configurable number of zero-filled bytes.
15+
/// Used for throughput testing.
16+
pub struct DataServer {
17+
pub addr: SocketAddr,
18+
shutdown: CancellationToken,
19+
}
20+
21+
impl DataServer {
22+
/// Start a data server that returns `size` bytes of zeros for any request.
23+
pub async fn start(size: usize) -> Self {
24+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
25+
let addr = listener.local_addr().unwrap();
26+
let shutdown = CancellationToken::new();
27+
let token = shutdown.clone();
28+
29+
tokio::spawn(async move {
30+
loop {
31+
tokio::select! {
32+
_ = token.cancelled() => break,
33+
result = listener.accept() => {
34+
let (stream, _) = match result {
35+
Ok(v) => v,
36+
Err(_) => continue,
37+
};
38+
let token = token.clone();
39+
tokio::spawn(async move {
40+
let io = TokioIo::new(stream);
41+
let service = service_fn(move |_req: Request<Incoming>| {
42+
let data = vec![0u8; size];
43+
async move {
44+
Ok::<_, hyper::Error>(
45+
Response::builder()
46+
.status(200)
47+
.header("Content-Type", "application/octet-stream")
48+
.header("Content-Length", size.to_string())
49+
.body(Full::new(Bytes::from(data)))
50+
.unwrap(),
51+
)
52+
}
53+
});
54+
let conn = http1::Builder::new().serve_connection(io, service);
55+
tokio::select! {
56+
_ = token.cancelled() => {}
57+
result = conn => {
58+
if let Err(e) = result {
59+
eprintln!("data server error: {e}");
60+
}
61+
}
62+
}
63+
});
64+
}
65+
}
66+
}
67+
});
68+
69+
DataServer { addr, shutdown }
70+
}
71+
}
72+
73+
impl Drop for DataServer {
74+
fn drop(&mut self) {
75+
self.shutdown.cancel();
76+
}
77+
}

tests/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod data_server;
12
pub mod echo_server;
23
pub mod test_server;
34
pub mod tls_fixture;

0 commit comments

Comments
 (0)