-
-
Notifications
You must be signed in to change notification settings - Fork 64
Use UnsafeBoundedQueue and avoid dispatcher #803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@fredfp, I cannot push my test client/server right now as it uses our internal libraries for configuration, starup and stuff. But I'll try to cut them out in a few days and upload it to github. Basically, it is just simple metrics for request/response times over server and client implementation of this gRPC API: syntax = "proto3";
package fs2.grpc.bench;
service TestService {
rpc Identity (Message) returns (Message);
rpc IdentityStream (stream Message) returns (stream Message);
rpc Unary (UnaryRequest) returns (Message);
rpc ClientStreaming (stream UnaryRequest) returns (Message);
rpc ServerStreaming (StreamingRequest) returns (stream Message);
rpc BothStreaming (stream StreamingRequest) returns (stream Message);
}
message Message {
bytes payload = 1;
}
message RequestParams {
int32 length = 1;
bool random_length = 2;
optional double random_factor_min = 3;
optional double random_factor_max = 4;
}
message UnaryRequest {
RequestParams params = 1;
}
message StreamingRequest {
RequestParams stream_params = 1;
RequestParams chunk_params = 2;
RequestParams message_params = 3;
}Server implementation receives request and either
|
|
@fredfp, I've published our benchmark setup here: https://github.com/seigert/fs2-grpc-bench |
|
@seigert thank you for publishing your benchmark setup! I ran benchmarks locally and the current changes seem to bring a substantial improvement for streaming calls. MethodHere's what I did:
Summary of the resultsSample run of fs2-grpc v3.0.0 (baseline) Sample run of fs2-grpc v3.0.0 + the current MR |
| case Some(Left(err)) => | ||
| if (acc.isEmpty) loop(err.asLeft) | ||
| else F.pure((acc.toIndexedChunk, err.asLeft).some) | ||
| else F.pure((acc, err.asLeft).some) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@seigert notice the removal of .toIndexedChunk here and below. This seems to slightly improve the performance. Was there a specific reason for using .toIndexedChunk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fredfp, as I remember it was an attempt to increase "locality" of buffered received messages and to not pass them along as Chunk.Queue produced by acc ++ Chunk.singleton(value) in the worst case.
It was used as alternative to .compact to not copy Chunk.Singleton unnecessary if there was only one element buffered.
Even better would be, if course, to drain all incoming buffer into array slice, but Queue still cannot do this.
I think removal of this call is indeed beneficial for throughput benchmark but maybe not so for actual access of GRPC stream elements if processing is done in chunks. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I see 3 possibilities:
- leave the decision to the user (
.chunks.map(_.toIndexedChunk).unchunks) - use
.toIndexedChunkas you did - make the
loopaccumulate elements into anArrayof some sort, instead ofChunk. This allows avoiding the extra traversal and copy which was certainly slowing down the benchmark, but it is not space efficient as we'd allocate arrays of sizeprefetchN(and leverageChunk.ArraySlicewhen partially filled).
3 is certainly better throughput wise, but I don't think it's worth the added complexity. I'm fine keeping (2) .toIndexedChunk, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with keeping (2) if performance improvements show, say, less than 10% difference with .toIndexedChunk and without it.
Also, for (3) we could use same strategy as in fs2.io.readInputStream implementation: allocate and reuse Array-backed buffer of size prefetchN > 1 and pass it along as Chunk.ArraySlice with manually set offset/limit when there is a time to emit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I remembered why I did it with Chunk instead of Array in the first place: we have no ClassTag[T] evidence, so additional machinery will be required to cast Array[AnyRef] to Array[T] even if this is safe (T is a protobuf message and so, an object and not a primitive).
P.S.: And Chunk.ArraySlice[T] also requires ClassTag[T] for construction. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you open a separate PR to discuss this? I think it's complex enough to deserve its own discussion threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created a separate PR: #819 -- still had no time to do the benchmarks. :(
8fc9915 to
d63ff35
Compare
|
I've added the Below are the updated benchmark results. Sample run of fs2-grpc v3.0.0 (baseline) Sample run of fs2-grpc v3.0.0 + the current MR (calls |
3.5-4x throughput is very good but I wonder why chunk size changes between runs? |
|
My understanding was that, without dispatcher calls, we waste much less time adding received messages to the CE queue i.e., we are much more effective at it, so |
Leverage typelevel/cats-effect#3975 (released with CE 3.6.0) to remove some dispatcher calls.
It seems to work, but I was hoping to measure the actual impact with benchmarks before submitting a PR.