Skip to content

Commit c07ba7a

Browse files
authored
refactor(s2n-quic-dc): Add trait for TCP WorkerState poll behavior (#2836)
1 parent dc37ed6 commit c07ba7a

File tree

7 files changed

+210
-63
lines changed

7 files changed

+210
-63
lines changed

dc/s2n-quic-dc/src/stream/endpoint.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
path::secret::{self, map, Map},
88
random::Random,
99
stream::{
10-
application,
10+
self, application,
1111
environment::{Environment, Peer},
1212
recv,
1313
send::{self, flow},
@@ -28,9 +28,8 @@ use super::environment::{ReadWorkerSocket as _, WriteWorkerSocket as _};
2828

2929
type Result<T = (), E = io::Error> = core::result::Result<T, E>;
3030

31-
pub struct AcceptError<Peer> {
31+
pub struct AcceptError {
3232
pub secret_control: Vec<u8>,
33-
pub peer: Option<Peer>,
3433
pub error: io::Error,
3534
}
3635

@@ -84,39 +83,47 @@ where
8483
}
8584

8685
#[inline]
87-
pub fn accept_stream<Env, P>(
88-
now: Timestamp,
89-
env: &Env,
90-
peer: P,
86+
pub fn derive_stream_credentials(
9187
packet: &server::InitialPacket,
9288
map: &Map,
93-
subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
94-
parameter_override: Option<&dyn Fn(dc::ApplicationParams) -> dc::ApplicationParams>,
95-
) -> Result<application::Builder<Env::Subscriber>, AcceptError<P>>
96-
where
97-
Env: Environment,
98-
P: Peer<Env>,
99-
{
89+
features: &stream::TransportFeatures,
90+
secret_control: &mut Vec<u8>,
91+
) -> Result<(secret::map::Bidirectional, dc::ApplicationParams), io::Error> {
10092
let credentials = &packet.credentials;
101-
let mut secret_control = vec![];
102-
let Some((crypto, mut parameters)) = map.pair_for_credentials(
93+
94+
let Some((crypto, parameters)) = map.pair_for_credentials(
10395
credentials,
10496
packet.source_queue_id,
105-
&peer.features(),
106-
&mut secret_control,
97+
features,
98+
secret_control,
10799
) else {
108100
let error = io::Error::new(
109101
io::ErrorKind::NotFound,
110102
format!("missing credentials for client: {credentials:?}"),
111103
);
112-
let error = AcceptError {
113-
secret_control,
114-
peer: Some(peer),
115-
error,
116-
};
117104
return Err(error);
118105
};
119106

107+
Ok((crypto, parameters))
108+
}
109+
110+
#[inline]
111+
pub fn accept_stream<Env, P>(
112+
now: Timestamp,
113+
env: &Env,
114+
peer: P,
115+
packet: &server::InitialPacket,
116+
map: &Map,
117+
subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
118+
parameter_override: Option<&dyn Fn(dc::ApplicationParams) -> dc::ApplicationParams>,
119+
crypto: secret::map::Bidirectional,
120+
mut parameters: dc::ApplicationParams,
121+
secret_control: Vec<u8>,
122+
) -> Result<application::Builder<Env::Subscriber>, AcceptError>
123+
where
124+
Env: Environment,
125+
P: Peer<Env>,
126+
{
120127
if let Some(o) = parameter_override {
121128
parameters = o(parameters);
122129
}
@@ -148,7 +155,6 @@ where
148155
Err(error) => {
149156
let error = AcceptError {
150157
secret_control,
151-
peer: None,
152158
error,
153159
};
154160
Err(error)

dc/s2n-quic-dc/src/stream/server/tokio.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
523523
self.backlog,
524524
self.accept_flavor,
525525
self.linger,
526+
tcp::worker::DefaultBehavior,
526527
)?
527528
.run();
528529

dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
path::secret,
77
stream::{
88
environment::{tokio::Environment, Environment as _},
9-
server::accept,
9+
server::{accept, tokio::tcp::worker::PollBehavior},
1010
},
1111
};
1212
use core::{future::poll_fn, task::Poll};
@@ -18,13 +18,14 @@ use tracing::debug;
1818
mod fresh;
1919
mod lazy;
2020
mod manager;
21-
mod worker;
21+
pub mod worker;
2222

2323
pub(crate) use lazy::LazyBoundStream;
2424

25-
pub struct Acceptor<Sub>
25+
pub struct Acceptor<Sub, B>
2626
where
2727
Sub: Subscriber + Clone,
28+
B: PollBehavior<Sub> + Clone,
2829
{
2930
sender: accept::Sender<Sub>,
3031
socket: AsyncFd<TcpListener>,
@@ -33,11 +34,13 @@ where
3334
backlog: usize,
3435
accept_flavor: accept::Flavor,
3536
linger: Option<Duration>,
37+
poll_behavior: B,
3638
}
3739

38-
impl<Sub> Acceptor<Sub>
40+
impl<Sub, B> Acceptor<Sub, B>
3941
where
4042
Sub: event::Subscriber + Clone,
43+
B: PollBehavior<Sub> + Clone,
4144
{
4245
#[inline]
4346
pub fn new(
@@ -49,6 +52,7 @@ where
4952
backlog: usize,
5053
accept_flavor: accept::Flavor,
5154
linger: Option<Duration>,
55+
poll_behavior: B,
5256
) -> std::io::Result<Self> {
5357
let acceptor = Self {
5458
sender: sender.clone(),
@@ -58,6 +62,7 @@ where
5862
backlog,
5963
accept_flavor,
6064
linger,
65+
poll_behavior,
6166
};
6267

6368
#[cfg(target_os = "linux")]
@@ -100,8 +105,12 @@ where
100105
let drop_guard = DropLog;
101106
let mut fresh = fresh::Queue::new(self.backlog);
102107
let mut workers = {
103-
let workers =
104-
(0..self.backlog).map(|_| worker::Worker::new(self.env.clock().get_time()));
108+
let workers = (0..self.backlog).map(|_| {
109+
worker::Worker::<Sub, B>::new(
110+
self.env.clock().get_time(),
111+
self.poll_behavior.clone(),
112+
)
113+
});
105114
manager::Manager::new(workers)
106115
};
107116
let mut context = worker::Context::new(&self);

0 commit comments

Comments
 (0)