Skip to content

Conversation

@PhongChuong
Copy link
Collaborator

Support messaging ordering in Publisher by creating a new BatchWorker which handles that batching and message ordering logic.

When the Worker receives an Publish operation, it demultiplex it into the corresponding BatchWorker which ensures that there is at most one inflight batch at any given time.
When the Worker receives a Flush operation, it flushes all BatchWorker pending message and waits until the operation is complete.

As of now, the set of BatchWorkers grows indefinitely. This is be address in the next PR.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Dec 23, 2025
@codecov
Copy link

codecov bot commented Dec 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 95.24%. Comparing base (317b8d2) to head (ae5eccb).
⚠️ Report is 25 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4124      +/-   ##
==========================================
- Coverage   95.25%   95.24%   -0.02%     
==========================================
  Files         178      178              
  Lines        6811     6807       -4     
==========================================
- Hits         6488     6483       -5     
- Misses        323      324       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review December 23, 2025 22:27
@PhongChuong PhongChuong requested a review from a team as a code owner December 23, 2025 22:27
@PhongChuong PhongChuong requested a review from suzmue December 23, 2025 22:27
Copy link
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as ordering, this looks pretty good. However, I think this removed support for non-ordered messages where we can have multiple inflight batches.

Please let me know if I'm mistaken.

for (_, outstanding) in pending_batches.iter_mut() {
outstanding.flush(&mut inflight);
for (_, batch_worker) in batch_workers.iter_mut() {
let (tx, _) = oneshot::channel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, why do we need this channel?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see its because you don't want to wait on the result.

if this doesn't use a lot of resources seems fine.

let (tx, rx) = oneshot::channel();
batch_worker
.send(ToBatchWorker::Flush(tx))
.expect("Batch worker should not close the channel");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider using a helper message or helper function to avoid the duplication of this error message all over.

}
}

/// A background worker that continuously handle Publisher commands for a specific ordering key.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A background worker that continuously handle Publisher commands for a specific ordering key.
/// A background worker that continuously handles Publisher commands for a specific ordering key.

pending_batch: Batch,
// TODO(#4012): Track pending messages as within key message ordering is
// not currently respected during a failure.
pending_msges: VecDeque<BundledMessage>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pending_msges: VecDeque<BundledMessage>,
pending_msgs: VecDeque<BundledMessage>,

pending_batch: Batch,
// TODO(#4012): Track pending messages as within key message ordering is
// not currently respected during a failure.
pending_msges: VecDeque<BundledMessage>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: VecDeque

/// `Worker` drops the Sender.
pub(crate) async fn run(mut self) {
// While it is possible to use Some(JoinHandle) here as there is at max
// a single inflight task at any given time, the use of JoinSet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't true when ordering key is "".

Comment on lines +263 to +266
loop {
if self.pending_batch.is_empty() && self.pending_msges.is_empty() {
break;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a while loop.

Suggested change
loop {
if self.pending_batch.is_empty() && self.pending_msges.is_empty() {
break;
}
while !self.pending_batch.is_empty() || !self.pending_msges.is_empty() {

|| self.pending_batch.size() >= self.batching_options.byte_threshold
}

// Move pending publishes to the pending batch respecting batch thresholds.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Move pending publishes to the pending batch respecting batch thresholds.
// Move pending messages to the pending batch respecting batch thresholds.

for (_, outstanding) in pending_batches.iter_mut() {
outstanding.flush(&mut inflight);
for (_, batch_worker) in batch_workers.iter_mut() {
let (tx, _) = oneshot::channel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see its because you don't want to wait on the result.

if this doesn't use a lot of resources seems fine.

break;
}
self.move_to_batch();
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using flush for the timer seems slightly inefficient since if we have a max batch size of 1000 and we have a batch out and 1001 messages in the queue, then we we will send that single message as a batch of 1 even if more come in. I guess that is a possibility with having the timeout anyway, so we maybe its not a real problem and we don't need it to be changed necessarily.

This also makes we wonder actually if we have an outstanding batch, would it be better to just leave all messages in the channel? to avoid having to copy things around as much? I'm just curious about this. Something to maybe try out later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants