-
Notifications
You must be signed in to change notification settings - Fork 99
impl(pubsub): message ordering in Publisher #4124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4124 +/- ##
==========================================
- Coverage 95.25% 95.24% -0.02%
==========================================
Files 178 178
Lines 6811 6807 -4
==========================================
- Hits 6488 6483 -5
- Misses 323 324 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
suzmue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as ordering, this looks pretty good. However, I think this removed support for non-ordered messages where we can have multiple inflight batches.
Please let me know if I'm mistaken.
| for (_, outstanding) in pending_batches.iter_mut() { | ||
| outstanding.flush(&mut inflight); | ||
| for (_, batch_worker) in batch_workers.iter_mut() { | ||
| let (tx, _) = oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused, why do we need this channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see its because you don't want to wait on the result.
if this doesn't use a lot of resources seems fine.
| let (tx, rx) = oneshot::channel(); | ||
| batch_worker | ||
| .send(ToBatchWorker::Flush(tx)) | ||
| .expect("Batch worker should not close the channel"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Consider using a helper message or helper function to avoid the duplication of this error message all over.
| } | ||
| } | ||
|
|
||
| /// A background worker that continuously handle Publisher commands for a specific ordering key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// A background worker that continuously handle Publisher commands for a specific ordering key. | |
| /// A background worker that continuously handles Publisher commands for a specific ordering key. |
| pending_batch: Batch, | ||
| // TODO(#4012): Track pending messages as within key message ordering is | ||
| // not currently respected during a failure. | ||
| pending_msges: VecDeque<BundledMessage>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pending_msges: VecDeque<BundledMessage>, | |
| pending_msgs: VecDeque<BundledMessage>, |
| pending_batch: Batch, | ||
| // TODO(#4012): Track pending messages as within key message ordering is | ||
| // not currently respected during a failure. | ||
| pending_msges: VecDeque<BundledMessage>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL: VecDeque
| /// `Worker` drops the Sender. | ||
| pub(crate) async fn run(mut self) { | ||
| // While it is possible to use Some(JoinHandle) here as there is at max | ||
| // a single inflight task at any given time, the use of JoinSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't true when ordering key is "".
| loop { | ||
| if self.pending_batch.is_empty() && self.pending_msges.is_empty() { | ||
| break; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a while loop.
| loop { | |
| if self.pending_batch.is_empty() && self.pending_msges.is_empty() { | |
| break; | |
| } | |
| while !self.pending_batch.is_empty() || !self.pending_msges.is_empty() { |
| || self.pending_batch.size() >= self.batching_options.byte_threshold | ||
| } | ||
|
|
||
| // Move pending publishes to the pending batch respecting batch thresholds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Move pending publishes to the pending batch respecting batch thresholds. | |
| // Move pending messages to the pending batch respecting batch thresholds. |
| for (_, outstanding) in pending_batches.iter_mut() { | ||
| outstanding.flush(&mut inflight); | ||
| for (_, batch_worker) in batch_workers.iter_mut() { | ||
| let (tx, _) = oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see its because you don't want to wait on the result.
if this doesn't use a lot of resources seems fine.
| break; | ||
| } | ||
| self.move_to_batch(); | ||
| self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using flush for the timer seems slightly inefficient since if we have a max batch size of 1000 and we have a batch out and 1001 messages in the queue, then we we will send that single message as a batch of 1 even if more come in. I guess that is a possibility with having the timeout anyway, so we maybe its not a real problem and we don't need it to be changed necessarily.
This also makes we wonder actually if we have an outstanding batch, would it be better to just leave all messages in the channel? to avoid having to copy things around as much? I'm just curious about this. Something to maybe try out later.
Support messaging ordering in Publisher by creating a new BatchWorker which handles that batching and message ordering logic.
When the Worker receives an Publish operation, it demultiplex it into the corresponding BatchWorker which ensures that there is at most one inflight batch at any given time.
When the Worker receives a Flush operation, it flushes all BatchWorker pending message and waits until the operation is complete.
As of now, the set of BatchWorkers grows indefinitely. This is be address in the next PR.