Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,23 @@ impl Prioritize {

let frame = match stream.pending_send.pop_front(buffer) {
Some(Frame::Data(mut frame)) => {
if let Some(reason) = stream.state.get_scheduled_reset() {
// If a reset is scheduled due to cancellation or
// an error, discard buffered DATA and let the `None`
// arm emit the RST_STREAM on the next iteration.
//
// NO_ERROR is excluded. Per RFC 9113 §8.1, a NO_ERROR
// stream reset may only be sent after a complete
// response, which requires sending all queued DATA.
if reason != Reason::NO_ERROR {
stream.pending_send.push_front(buffer, frame.into());
self.clear_queue(buffer, &mut stream);
self.reclaim_all_capacity(&mut stream, counts);
self.pending_send.push(&mut stream);
continue;
}
}

// Get the amount of capacity remaining for stream's
// window.
let stream_capacity = stream.send_flow.available();
Expand Down
89 changes: 89 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,95 @@ async fn reset_stream_waiting_for_capacity() {
join(srv, client).await;
}

#[tokio::test]
async fn scheduled_reset_with_buffered_data_sends_rst() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().initial_window_size(0))
.await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;

tokio::time::timeout(
Duration::from_secs(5),
srv.recv_frame(frames::reset(1).cancel()),
)
.await
.expect("RST_STREAM not received within 5s");
};

let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (response, mut stream) = client.send_request(request, false).unwrap();
let resp = conn.drive(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// Buffer data that can never be sent (zero stream window).
stream.send_data(vec![0u8; 10].into(), false).unwrap();
// Drop both handles to schedule a CANCEL reset.
drop(stream);
drop(resp);
conn.await.unwrap();
};

join(srv, client).await;
}

#[tokio::test]
async fn scheduled_reset_with_excess_buffered_data_is_cleaned_up() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let _ = srv.assert_client_handshake().await;
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
.await;
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16383])).await;
srv.send_frame(frames::window_update(0, 65535)).await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.recv_frame(frames::reset(1).cancel()).await;
};

let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (response, mut stream) = client.send_request(request, false).unwrap();
// Buffer the full window plus excess. The first 65535 bytes
// are sent, and the remaining 1000 are stuck.
stream
.send_data(vec![0u8; 65535 + 1000].into(), false)
.unwrap();
let resp = conn.drive(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
drop(stream);
drop(resp);
drop(client);
tokio::time::timeout(Duration::from_secs(5), conn)
.await
.expect("connection did not shut down within 5s")
.unwrap();
};

join(srv, client).await;
}

#[tokio::test]
async fn data_padding() {
h2_support::trace_init!();
Expand Down
52 changes: 51 additions & 1 deletion tests/h2-tests/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,55 @@ async fn sends_reset_no_error_when_req_body_is_dropped() {
join(client, srv).await;
}

#[tokio::test]
async fn no_error_response_body_delivered_before_rst() {
// When a server sends a large response body and drops the request
// body without reading it, NO_ERROR is scheduled. The response DATA
// must still be delivered.
h2_support::trace_init!();
let (io, mut client) = mock::new();

let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client
.send_frame(frames::headers(1).request("POST", "https://example.com/"))
.await;
client.recv_frame(frames::headers(1).response(200)).await;
client.recv_frame(frames::data(1, vec![0; 16384])).await;
client.recv_frame(frames::data(1, vec![0; 16384])).await;
client.recv_frame(frames::data(1, vec![0; 16384])).await;
client.recv_frame(frames::data(1, vec![0; 16383])).await;
// These window updates allow the full response to be delivered.
client.send_frame(frames::window_update(0, 65535)).await;
client.send_frame(frames::window_update(1, 65535)).await;
client.recv_frame(frames::data(1, vec![0; 16384])).await;
client.recv_frame(frames::data(1, vec![0; 16384])).await;
client.recv_frame(frames::data(1, vec![0; 1]).eos()).await;
client
.recv_frame(frames::reset(1).reason(Reason::NO_ERROR))
.await;
};

let srv = async move {
let mut srv = server::handshake(io).await.expect("handshake");
{
let (req, mut stream) = srv.next().await.unwrap().unwrap();
assert_eq!(req.method(), &http::Method::POST);

let rsp = http::Response::builder().status(200).body(()).unwrap();
let mut tx = stream.send_response(rsp, false).unwrap();
// Response body larger than the stream window. The first 65535 bytes
// are sent immediately, and the remaining bytes wait for the client's
// WINDOW_UPDATE.
tx.send_data(vec![0; 16384 * 6].into(), true).unwrap();
}
assert!(srv.next().await.is_none());
};

join(client, srv).await;
}

#[tokio::test]
async fn abrupt_shutdown() {
h2_support::trace_init!();
Expand Down Expand Up @@ -890,7 +939,8 @@ async fn sends_reset_cancel_when_res_body_is_dropped() {
)
.await;
client.recv_frame(frames::headers(3).response(200)).await;
client.recv_frame(frames::data(3, vec![0; 10])).await;
// CANCEL means "stream is no longer needed" (RFC 9113 §7). Buffered DATA
// is discarded and RST_STREAM is sent immediately.
client.recv_frame(frames::reset(3).cancel()).await;
};

Expand Down
Loading