Skip to content

Conversation

@fredfp
Copy link
Contributor

@fredfp fredfp commented Oct 3, 2025

Leverage typelevel/cats-effect#3975 (released with CE 3.6.0) to remove some dispatcher calls.

It seems to work, but I was hoping to measure the actual impact with benchmarks before submitting a PR.

@fredfp
Copy link
Contributor Author

fredfp commented Oct 3, 2025

@seigert I'd love to run the benchmarks you mentioned here, could you do it or share the setup? I think instructions to bench the client side of fs2-grpc are sorely missing.

@seigert
Copy link
Contributor

seigert commented Oct 6, 2025

@fredfp, I cannot push my test client/server right now as it uses our internal libraries for configuration, starup and stuff. But I'll try to cut them out in a few days and upload it to github.

Basically, it is just simple metrics for request/response times over server and client implementation of this gRPC API:

syntax = "proto3";

package fs2.grpc.bench;

service TestService {
  rpc Identity (Message) returns (Message);
  rpc IdentityStream (stream Message) returns (stream Message);

  rpc Unary (UnaryRequest) returns (Message);

  rpc ClientStreaming (stream UnaryRequest) returns (Message);
  rpc ServerStreaming (StreamingRequest) returns (stream Message);
  rpc BothStreaming (stream StreamingRequest) returns (stream Message);
}

message Message {
  bytes payload = 1;
}

message RequestParams {
  int32 length = 1;
  bool  random_length = 2;
  optional double random_factor_min = 3;
  optional double random_factor_max = 4;
}

message UnaryRequest {
  RequestParams params = 1;
}

message StreamingRequest {
  RequestParams stream_params = 1;
  RequestParams chunk_params = 2;
  RequestParams message_params = 3;
}

Server implementation receives request and either

  1. sends its back in case of Message;
  2. generates simngle message defined by UnaryRequest.RequestParams
    (length is payload length, random bytes);
  3. generates number of messages defined by StreamingRequest
    (RequestParams define number of total messages in a stream, messages in a single chunk, length of single payload).

@seigert
Copy link
Contributor

seigert commented Oct 13, 2025

@fredfp, I've published our benchmark setup here: https://github.com/seigert/fs2-grpc-bench

@fredfp
Copy link
Contributor Author

fredfp commented Dec 6, 2025

@seigert thank you for publishing your benchmark setup!

I ran benchmarks locally and the current changes seem to bring a substantial improvement for streaming calls.

Method

Here's what I did:

  1. compare fs2-grpc v3.0.0, with a locally built version that had the current MR applied on top of v3.0.0
  2. run fs2-grpc-bench server and client at the same version (i.e., not testing v3.0.0 server against client with MR above)
  3. server using default options, same instance for 5 consecutive benchmark runs: fs2-grpc-bench server
  4. client using default options, fs2-grpc-bench client ids, as my changes only affect streaming calls.

Summary of the results

Sample run of fs2-grpc v3.0.0 (baseline)

Test Results (22.426 s elapsed)
===============================

+---------------+-----------+-------------+
|       Counter |     Value |  Per second |
+---------------+-----------+-------------+
|         bytes | 209715200 | 9532509.091 |
|        chunks |    208173 |    9462.409 |
|      messages |    819200 |   37236.364 |
| response (ok) |       200 |       9.091 |
+---------------+-----------+-------------+

Sample run of fs2-grpc v3.0.0 + the current MR

Test Results (5.633 s elapsed)
==============================

+---------------+-----------+--------------+
|       Counter |     Value |   Per second |
+---------------+-----------+--------------+
|         bytes | 209715200 | 41943040.000 |
|        chunks |     54675 |    10935.000 |
|      messages |    819200 |   163840.000 |
| response (ok) |       200 |       40.000 |
+---------------+-----------+--------------+

case Some(Left(err)) =>
if (acc.isEmpty) loop(err.asLeft)
else F.pure((acc.toIndexedChunk, err.asLeft).some)
else F.pure((acc, err.asLeft).some)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seigert notice the removal of .toIndexedChunk here and below. This seems to slightly improve the performance. Was there a specific reason for using .toIndexedChunk?

Copy link
Contributor

@seigert seigert Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fredfp, as I remember it was an attempt to increase "locality" of buffered received messages and to not pass them along as Chunk.Queue produced by acc ++ Chunk.singleton(value) in the worst case.

It was used as alternative to .compact to not copy Chunk.Singleton unnecessary if there was only one element buffered.

Even better would be, if course, to drain all incoming buffer into array slice, but Queue still cannot do this.

I think removal of this call is indeed beneficial for throughput benchmark but maybe not so for actual access of GRPC stream elements if processing is done in chunks. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I see 3 possibilities:

  1. leave the decision to the user (.chunks.map(_.toIndexedChunk).unchunks)
  2. use .toIndexedChunk as you did
  3. make the loop accumulate elements into an Array of some sort, instead of Chunk. This allows avoiding the extra traversal and copy which was certainly slowing down the benchmark, but it is not space efficient as we'd allocate arrays of size prefetchN (and leverage Chunk.ArraySlice when partially filled).

3 is certainly better throughput wise, but I don't think it's worth the added complexity. I'm fine keeping (2) .toIndexedChunk, what do you think?

Copy link
Contributor

@seigert seigert Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with keeping (2) if performance improvements show, say, less than 10% difference with .toIndexedChunk and without it.

Also, for (3) we could use same strategy as in fs2.io.readInputStream implementation: allocate and reuse Array-backed buffer of size prefetchN > 1 and pass it along as Chunk.ArraySlice with manually set offset/limit when there is a time to emit.

Copy link
Contributor

@seigert seigert Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I remembered why I did it with Chunk instead of Array in the first place: we have no ClassTag[T] evidence, so additional machinery will be required to cast Array[AnyRef] to Array[T] even if this is safe (T is a protobuf message and so, an object and not a primitive).

P.S.: And Chunk.ArraySlice[T] also requires ClassTag[T] for construction. :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fredfp, I've created a PR into your fork with array-backend buffer implementation: fredfp#1 -- but I did not had a chance to run any benchmarks against your version as a base line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you open a separate PR to discuss this? I think it's complex enough to deserve its own discussion threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a separate PR: #819 -- still had no time to do the benchmarks. :(

@fredfp
Copy link
Contributor Author

fredfp commented Dec 12, 2025

I've added the .toIndexedChunk back, let's deal with it in another MR.

Below are the updated benchmark results.

Sample run of fs2-grpc v3.0.0 (baseline)

Test Results (22.426 s elapsed)
===============================

+---------------+-----------+-------------+
|       Counter |     Value |  Per second |
+---------------+-----------+-------------+
|         bytes | 209715200 | 9532509.091 |
|        chunks |    208173 |    9462.409 |
|      messages |    819200 |   37236.364 |
| response (ok) |       200 |       9.091 |
+---------------+-----------+-------------+

Sample run of fs2-grpc v3.0.0 + the current MR (calls .toIndexedChunk)

Test Results (6.871 s elapsed)
==============================

+---------------+-----------+--------------+
|       Counter |     Value |   Per second |
+---------------+-----------+--------------+
|         bytes | 209715200 | 34952533.333 |
|        chunks |     57837 |     9639.500 |
|      messages |    819200 |   136533.333 |
| response (ok) |       200 |       33.333 |
+---------------+-----------+--------------+

@seigert
Copy link
Contributor

seigert commented Dec 12, 2025

|        chunks |    208173 |    9462.409 |

|        chunks |     57837 |    9639.500 |

3.5-4x throughput is very good but I wonder why chunk size changes between runs?

@fredfp
Copy link
Contributor Author

fredfp commented Dec 12, 2025

My understanding was that, without dispatcher calls, we waste much less time adding received messages to the CE queue i.e., we are much more effective at it, so StreamIngest manages to extract many more messages before emitting a single chunk, resulting in fewer chunks (but the same number of messages). Does that make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants