feat(grpc): add server-side stream validators#2602
Conversation
507787e to
e2f4bd0
Compare
c5c0e67 to
038f365
Compare
e2f4bd0 to
90ae261
Compare
038f365 to
89b8525
Compare
89b8525 to
355c9cd
Compare
dfawley
left a comment
There was a problem hiding this comment.
I'm trying to figure out how exactly this is going to be used.
Most likely it will be a server-side interceptor? In which case, when an error occurs, I would think we'd want to terminate the RPC with a status that indicates the cause of the problem.
355c9cd to
de7ee29
Compare
Yes on the server side interceptor. RecvStreamVal.. is somewhat straight forward. This'll be used closer to the protobuf API to ensure that we transition our generic stream request receiver to the protobuf API stream object. SendStreamVal.. is closer to the transport to validate the outgoing data after all other interceptors. I somewhat agree with the need for status. We need to update the Now that we are returning trailers instead of sending them, we need more details from SendStream to propagate to trailers . Does it sound okay to add |
I was expecting we'd just need an interceptor that can terminate the RPC underneath the handler when either the SendStream or RecvStream has any problem. I.e. we can't do stream validation via something that simply wraps each stream independently. |
I mean we are definitely gonna need an interceptor for this. But I was planning to separate it into two responsibilities , one for lifecycle for stream and another for lifecycle of handler. Something along the lines of But we might need some |
Sure but they will need to be linked, so it would make more sense to me to make them all together.
So you're thinking two different interceptors might be needed?
Yes we should make sure the RPC error includes an explanation of the protocol violation. I expect the interceptor would basically select on the handler future and a receive from a oneshot channel that the validator writes to, and then terminates with whichever status comes first, dropping the other future. |
|
So, we discussed offline. It'd make sense for both stream and handler validation to be a part of the same PR. I initially thought we'd need different handlers, but based on my progress so far, I believe a single one suffices.
Since , we don't expose error status from sendstream or recvstream right now, we cannot propagate anything beyond an arbitrary "internal error: stream broken" . Would you want to tweak our stream APIs to be able to propagate errors Err(Status) or Err(String) instead of Err() |
de7ee29 to
a2a44a2
Compare
|
The validation implementation has been updated to be interceptor based and the stream validation utilities are now private. There's still a question about if we should introduce "status" or "error messages" into the sendstream and recvstream to be propagated to trailer or will an arbitrary "internal error" suffice. Also, this now is stacked on top of #2634 |
a2a44a2 to
e930953
Compare
56323f1 to
51903a9
Compare
|
|
||
| struct ChannelAwareSendStreamValidator<S> { | ||
| inner: ServerSendStreamValidator<S>, | ||
| error_tx: tokio::sync::mpsc::Sender<()>, |
There was a problem hiding this comment.
Would an Option<Oneshot> be preferable here? It might make the access simpler, but I'm not sure.
There was a problem hiding this comment.
So, this is mpsc because the sender is shared between send and recv streams. So, this is subject to the conversation below, if we can eliminate Chann...Recv...Valida...
| } | ||
| } | ||
|
|
||
| struct ChannelAwareRecvStreamValidator<R> { |
There was a problem hiding this comment.
Maybe we don't need this? It seems OK if the application keeps receiving after getting an error. We want our wrapper to avoid propagating that read down further, since it would be illegal-per-the-API to do so, but I don't think that needs to error the whole call?
There was a problem hiding this comment.
Can you help me understand the reason behind the asymmetry in how we treat recvstream errors vs sendstream errors?
Probably worth noting that this could happen on interceptor side , in which case maybe we want to send out an error.
There was a problem hiding this comment.
On the sending side, if the server application attempts to violate the gRPC protocol by sending the wrong data on the stream (e.g. transmit headers after messages), the stream is corrupted and we can't reasonably proceed. We should end the stream with an error.
On the receiving side, if the server application requests data after we told it there was no more data....well that's wrong, but it doesn't actually cause any problems. The overall call can still function just fine. We just keep telling it "no, you have no more data" over and over if they ask.
So I believe this situation is fine. But if you have reasons why it's more dangerous than that then I'm happy to discuss further.
There was a problem hiding this comment.
What about other types of error, beyond polling the empty stream again? What if an interceptor sends an error, which doesn't necessarily mean end of stream? I assume we want to terminate the call on this one with some form of internal error instead of letting the application continue.
This'd imply we don't need the validator, but would still the channel error signal which is able to distinguish between repolling and an interceptor error?
There was a problem hiding this comment.
Hmm, I think it's not clear to me what the role of this interceptor is. I was assuming it was validating the application's use of the stream. But maybe it's intended for something else? On the client side, it's mainly there to assert the generated code's assumptions in case an interceptor below it did something wrong.
There was a problem hiding this comment.
So, this is probably where the recvstream /sendstream implementation assymmetry kicks in.
IIUC recvstreams are usually supposed to be like. This is obviously to the discretion of the recvstream implementation , but doing stuff to x before calling inner.recv may not make a lot of sense.
recv(x){
inner.recv(x)?;
x.modify();
}
while sendstreams are usually opposite . The same applies i.e. applications can modify post inner call but likely won't do anything in particular.
send(x){
x.modify();
inner.send(x);
}
So, even if the validation sits closer to the transport even on recv side, it actually will run at the end of all interceptors and can catch those.
There was a problem hiding this comment.
The application's interceptors will wrap this interceptor's send & recv streams before calling the final handler.
That means we can't observe any errors they introduce between them and the application; we can only see their usage of this interceptor's streams. I.e. if they return an error from their recv back to the application after calling our recv, we have no visibility into that.
Aside: you're right those should be the most common patterns, but I could see this kind of thing happening, too:
fn recv(..) -> .. {
let my_message = wrap(x);
inner.recv(&my_message)
}What situation are you still worried about?
There was a problem hiding this comment.
Yes. We have zero visibility to what they do beyond streams. So, I want to understand that's the expected behavior when an interceptor does
fn recv(x) {
if num_messages> 5:
return Err()
inner.recv(x);
}
Are we supposed to treat it like a regular recvstream failure and propagate the error, or are we supposed to close the handler with internal stream error of some sort?
I am leaning towards the latter.
There was a problem hiding this comment.
So, I want to understand that's the expected behavior when an interceptor does
If this interceptor is inside our channel at the boundary, we don't ever see that error anyway. So we don't need to think about it for this.
If their interceptor is doing that, we'd normally expect that interceptor to also terminate the handler future ASAP and return an error from the intercept method.
The handler itself should also see that error and know the RPC was cancelled or errored, and return from their handler ASAP, but they can't return a very useful status based on this.
There was a problem hiding this comment.
Okay, you are right. Roughly putting this closer to the transport and expecting to prevent interceptor doing bad things from there is essentially lost cause.
So, I've actually been thinking of the behavior that we want for the last couple of days for corner cases.
- Should recvs and send side interceptors receive some guarantees on the state machine from the transport or the application? Likely no, since this seems like a lost cause since a buggy interceptor can create out of order messages for downstream interceptors?
- Should the transport be protected from bad sendstream from interceptors? Likely yes, because while hyper provides some safety, it doesn't guarantee the full grpc spec safety.
- Should the application handler be protected from bad recvstream from interceptors? Likely yes, this'll probably be somewhere in codegen?
- What's the behavior we want on some interceptor's individual
recvstreammessage failure(think throttling interceptor mid streaming request?) to the application, should this end up cancelling the stream or should the interceptor be responsible for handling that and grpc just propagates the error to the application handler? leaning towards, making interceptor's problem. - What's the behavior we want on some interceptor's individual
sendstreammessage failure(think throttling and not a protocol ordering violation ), does this drop handler and cancel stream?
Here's what I am currently leaning towards
- Some validation on the codegen side for recvstream. Mostly focusing on ensuring terminal states are terminal. That ensures that the API doesn't allow.
- Some validation on the codegen side for sendstream. This will be focussed on error handling from interceptors, all errors are fallible and terminate the handler with reset stream.
- Some validation closer to the transport side for sendstream (roughly this PR). This ensure that we send valid grpc to transport.
Does that sound good ?
| tokio::select! { | ||
| res = next.handle(headers, options, &mut wrapped_tx, wrapped_rx) => { | ||
| if error_rx.try_recv().is_ok() { | ||
| Trailers::new(Err(StatusError::new(StatusCodeError::Internal, "Stream validation error"))) |
There was a problem hiding this comment.
On the one hand, it's unfortunate that we aren't being any more specific than this. "Server attempted to send multiple headers / messages before headers / etc"
On the other hand, maybe it's for the best, since the client maybe shouldn't get details about server bugs?
There was a problem hiding this comment.
With the current API that's likely the best we can do. The issue may exaggerate when dealing with interceptors because there would be no way to propagate error from an interceptor's stream which may be a painful debugging experience.
But maybe the right answer is that server interceptor implementations can log errors when as needed, and we treat all of it as internal error with a generic error message to the client of some form.
Enforces strict gRPC stream state transitions and sequence validation on the server side to prevent malformed responses and improper polling. Key Changes: - **Send Stream Validation (`ServerSendStreamValidator`)**: Tracks state transitions (`Init` -> `HeadersSent` -> `MessagesSent` -> `Done`) to guarantee headers are sent exactly once and always precede messages, rejecting invalid operations. - **Receive Stream Validation (`ServerRecvStreamValidator`)**: Prevents erroneous polling by returning stable errors once the receive stream reaches EOF or a terminal error state. - **Preemptive Error Interception (`StreamValidationInterceptor`)**: Wraps streams with channel-aware validators to immediately preempt handler execution via `tokio::select!` upon detecting any protocol violation or underlying transport error, automatically returning an `Internal` status.
51903a9 to
e80434b
Compare
e930953 to
986af3c
Compare
e80434b to
a00b4b0
Compare
Introduces
ServerSendStreamValidatorandServerRecvStreamValidatorto enforce strict gRPC semantics on server streams. These are intended to wrap raw transport streams before passing them to application-level handlers, ensuring that user-provided service logic cannot violate protocol rules.ServerSendStreamValidatorensures proper response sequencing (Headers -> Messages -> Trailers) and prevents invalid state transitions, while fully supporting trailers-only responses.ServerRecvStreamValidatorsafely manages terminal states, ensuring that any subsequent polls after stream completion consistently return an error to prevent undefined behavior.This is recreation of #2595 to allow using a stacked PR workflow