Conversation
|
size-limit report 📦
|
| // FIXME: this might be wrong? Shouldn't this only occur if it is the last subscription to | ||
| // terminate? | ||
| const previousDescriptorSubscription = descriptor.subscription; | ||
| descriptor.subscription = { type: 'none' }; | ||
| this.subscriptionHandles.delete(previousDescriptorSubscription.subcriptionHandle); | ||
|
|
There was a problem hiding this comment.
This matches the behavior of the rust implementation but by eye I think something might be off here, I would think that this code should only run on the final subscription termination, not on any subscription termination.
There was a problem hiding this comment.
@ladvoc FYI, could you take a look at this and see if it looks right to you?
There was a problem hiding this comment.
That's correct, this should only be sent on the final subscription termination (i.e., the client no longer wishes to be forwarded frames for the track). In the Rust SDK, UnsubscribeEvent is emitted from the track task here when the frame channel is closed, which occurs when there are no more receivers.
Leave these internal and export them later once these interfaces get used more widely across all tracks. More context here: #1819 (comment)
| } | ||
| }; | ||
|
|
||
| abortSignal?.addEventListener('abort', onAbort); |
There was a problem hiding this comment.
just for safety it would make sense to check for abortSignal.aborted here before adding the event listener
There was a problem hiding this comment.
Good point. I also fixed this in the OutgoingDataTrackManager at the same time and added tests exercising both.
Also when digging into this I found another fairly nuanced / gnarly subscription management edge case where cancelling an abortsignal on one subscription would propagate to others. I think I've addressed it though and have a test for this case as well.
Just generally, if you can I'd love a second set of eyes going through the test cases and if you can think of any other ones for subscriptions specifically (that's probably the most nuanced part of this change) feel free to suggest additional ones. I still have a bit of work to add more of these.
| const combinedSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; | ||
|
|
||
| // Wait for the subscription to complete, or time out if it takes too long | ||
| const reader = await waitForCompletionFuture(descriptor, combinedSignal); |
There was a problem hiding this comment.
do we have any means to also react to participant's disconnection here? both the local participant disconnecting and the remote participant disconnecting should probably error out early
There was a problem hiding this comment.
Good point - I added an event in 6cbf73e which the room can send in when a remote participant disconnects and throw a new DataTrackSubscribeError.disconnected() error, along with an associated test case for both this when a subscription is pending and active.
also cc @ladvoc because the rust implementation probably also has this problem.
| * SFU. */ | ||
| private createReadableStream(sid: DataTrackSid) { | ||
| let streamController: ReadableStreamDefaultController<DataTrackFrame> | null = null; | ||
| return new ReadableStream<DataTrackFrame>({ |
There was a problem hiding this comment.
we should set an appropriate queuing strategy on the readable stream and
also potentially start to drop frames if the backpressure gets too high 🤔
There was a problem hiding this comment.
Yea good point, I added something in 6bdd819. But FYI that other readable streams like the ones that power data streams don't have this, so it sounds like maybe there needs to be a pass done through everything to add it.
also cc @ladvoc, because this is similar to a discussion we had in regards to the rust implementation - the default channel buffer length is I think 4 and this should probably be configurable externally somehow.
There was a problem hiding this comment.
Yeah, that's true. We should arguably add something for data streams as well. What makes it less significant - in theory - for data streams is that they are supposed to have a defined end.
Thanks for adding that queuing strategy. Should we maybe use a default that's a bit higher? 4 seems very low, but not sure either about the exact use cases.
ideally we'd want something that is a sane default that doesn't require adjustments from 90% of users
There was a problem hiding this comment.
I went with 4 because that is what the rust implementation had used for its internal mpsc channel which acts in a similar fashion, but it looks like that might have been updated to 16 and I missed that (link), so I made that update!
If you think it should be higher, I'm open to it, but It's a hard thing to pick a non super high default for. The optimal value is heavily dependent on the sample rate of data being received, the size of each sample, and the speed at which samples are being consumed. So either we pick a super high default and users just assume it is unlimited until they hit an edge case and realize it isn't, or we pick a lower threshold and users may run into the barrier faster but it means they have to be more strategic about what it is configured to for their specific use case. Also cc @ladvoc in case you have any thoughts.
I think my preference would be to either keep it unlimited by default, pick some fairly conservative threshold, or (ideally but most complex) to somehow expose some function which can be fed in derived metrics and which could compute an optimal high water mark value.
No matter what we choose, both implementations should be updated in lock step so they have similar behavior.
There was a problem hiding this comment.
I think I will keep this as is for now, and we can discuss it further in the integration pull request if need be.
…in Throws wherever possible This is a LOT cleaner!
…r boundary in RemoteDataTrack too
…er handle subscription cancellation
… data track sfu subscriptions
…bleStreams This lets subscribers pick their optimal tradeoff between dropping events versus caching them all in memory.
c5a8d77 to
2c2a1bd
Compare
|
(rebased on top of latest |
| descriptor.subscription.completionFuture.reject?.(DataTrackSubscribeError.disconnected()); | ||
| break; | ||
| case 'active': | ||
| this.unSubscribeRequest(descriptor.info.sid); |
There was a problem hiding this comment.
question: Handling remote participant disconnected events is not apart of the Rust implementation yet, so I'm curious about the logic here. In this case, do we need/want to explicitly unsubscribe?
There was a problem hiding this comment.
See the discussion here in a comment above: #1819 (comment)
TL;DR - This was something @lukasIO surfaced as being important so I added it and it probably should also make its way into the rust implementation as well.
ladvoc
left a comment
There was a problem hiding this comment.
LGTM ✅, logic looks consistent with the Rust implementation besides a few places I comment on.
…bare error panic throw Context here: #1819 (comment)
Also rename the events to not have "Output" in them.
This pull request builds on the outgoing manager here and introduces a new "incoming manager" which is used for ingesting events from remote participants which are publishing data tracks.