Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ impl Inner {

let stream = self.store.resolve(key);

if stream.is_pending_open {
proto_err!(conn: "recv_headers: received frame on idle stream {:?}", id);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}

if stream.state.is_local_error() {
// Locally reset streams must ignore frames "for some time".
// This is because the remote may have sent trailers before
Expand Down Expand Up @@ -638,6 +643,11 @@ impl Inner {
}
};

if stream.is_pending_open {
proto_err!(conn: "recv_reset: received frame on idle stream {:?}", id);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}

let mut send_buffer = send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;

Expand Down Expand Up @@ -670,6 +680,11 @@ impl Inner {
// The remote may send window updates for streams that the local now
// considers closed. It's ok...
if let Some(mut stream) = self.store.find_mut(&id) {
if stream.is_pending_open {
proto_err!(conn: "recv_window_update: received frame on idle stream {:?}", id);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}

let res = self
.actions
.send
Expand Down
81 changes: 81 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2075,3 +2075,84 @@ impl MockH2 for mock_io::Builder {
.read(SETTINGS_ACK)
}
}

/// RFC 9113 S5.1: "Receiving any frame other than HEADERS or PRIORITY on a
/// stream in [idle] state MUST be treated as a connection error of type
/// PROTOCOL_ERROR."
#[tokio::test]
async fn frame_on_pending_open_stream_is_conn_error() {
h2_support::trace_init!();

for scenario in 0..5u8 {
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
.await;
assert_default_settings!(settings);

// 3. Receive stream 1 HEADERS.
srv.recv_frame(
frames::headers(1)
.request("POST", "https://example.com/")
.eos(),
)
.await;

idle_ms(50).await;

// 4. Send a frame targeting stream 3, whose HEADERS haven't
// been transmitted since it's pending. This is illegal.
match scenario {
0 => {
srv.send_frame(frames::reset(3).reason(h2::Reason::NO_ERROR))
.await
}
1 => {
srv.send_frame(frames::reset(3).reason(h2::Reason::CANCEL))
.await
}
2 => srv.send_frame(frames::window_update(3, 1024)).await,
3 => srv.send_frame(frames::headers(3).response(200).eos()).await,
4 => srv.send_frame(frames::data(3, &b"hello"[..])).await,
_ => unreachable!(),
}

// 5. Client responds with GOAWAY(PROTOCOL_ERROR).
srv.recv_frame(frames::go_away(0).protocol_error()).await;
};

let client = async move {
let (mut client, mut conn) = client::Builder::new()
.initial_max_send_streams(1)
.handshake::<_, Bytes>(io)
.await
.unwrap();

// 1. Stream 1 fills the concurrent slot
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (_resp1, _) = client.send_request(request, true).unwrap();
client = conn.drive(client.ready()).await.unwrap();

// 2. Stream 3 is queued
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();
let (_resp3, _) = client.send_request(request, true).unwrap();

// 6. Connection error propagates to poll_ready.
conn.drive(client.ready())
.await
.expect_err("connection error");
};

join(srv, client).await;
}
}
Loading