Skip to content

Data tracks - incoming manager#1819

Merged
1egoman merged 53 commits intomainfrom
data-track-incoming-manager
Mar 17, 2026
Merged

Data tracks - incoming manager#1819
1egoman merged 53 commits intomainfrom
data-track-incoming-manager

Conversation

@1egoman
Copy link
Contributor

@1egoman 1egoman commented Feb 20, 2026

This pull request builds on the outgoing manager here and introduces a new "incoming manager" which is used for ingesting events from remote participants which are publishing data tracks.

@changeset-bot
Copy link

changeset-bot bot commented Feb 20, 2026

⚠️ No Changeset found

Latest commit: 9ded763

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@github-actions
Copy link
Contributor

github-actions bot commented Feb 20, 2026

size-limit report 📦

Path Size
dist/livekit-client.esm.mjs 86.76 KB (-0.03% 🔽)
dist/livekit-client.umd.js 97.33 KB (+0.08% 🔺)

Comment on lines +286 to +291
// FIXME: this might be wrong? Shouldn't this only occur if it is the last subscription to
// terminate?
const previousDescriptorSubscription = descriptor.subscription;
descriptor.subscription = { type: 'none' };
this.subscriptionHandles.delete(previousDescriptorSubscription.subcriptionHandle);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the behavior of the rust implementation but by eye I think something might be off here, I would think that this code should only run on the final subscription termination, not on any subscription termination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ladvoc FYI, could you take a look at this and see if it looks right to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, this should only be sent on the final subscription termination (i.e., the client no longer wishes to be forwarded frames for the track). In the Rust SDK, UnsubscribeEvent is emitted from the track task here when the frame channel is closed, which occurs when there are no more receivers.

@1egoman 1egoman marked this pull request as ready for review February 20, 2026 20:27
@1egoman 1egoman requested review from ladvoc and lukasIO February 20, 2026 20:27
@1egoman 1egoman mentioned this pull request Feb 20, 2026
2 tasks
1egoman added a commit that referenced this pull request Feb 23, 2026
Leave these internal and export them later once these interfaces get
used more widely across all tracks. More context here:
#1819 (comment)
}
};

abortSignal?.addEventListener('abort', onAbort);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for safety it would make sense to check for abortSignal.aborted here before adding the event listener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I also fixed this in the OutgoingDataTrackManager at the same time and added tests exercising both.

Also when digging into this I found another fairly nuanced / gnarly subscription management edge case where cancelling an abortsignal on one subscription would propagate to others. I think I've addressed it though and have a test for this case as well.

Just generally, if you can I'd love a second set of eyes going through the test cases and if you can think of any other ones for subscriptions specifically (that's probably the most nuanced part of this change) feel free to suggest additional ones. I still have a bit of work to add more of these.

const combinedSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal;

// Wait for the subscription to complete, or time out if it takes too long
const reader = await waitForCompletionFuture(descriptor, combinedSignal);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any means to also react to participant's disconnection here? both the local participant disconnecting and the remote participant disconnecting should probably error out early

Copy link
Contributor Author

@1egoman 1egoman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I added an event in 6cbf73e which the room can send in when a remote participant disconnects and throw a new DataTrackSubscribeError.disconnected() error, along with an associated test case for both this when a subscription is pending and active.

also cc @ladvoc because the rust implementation probably also has this problem.

* SFU. */
private createReadableStream(sid: DataTrackSid) {
let streamController: ReadableStreamDefaultController<DataTrackFrame> | null = null;
return new ReadableStream<DataTrackFrame>({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set an appropriate queuing strategy on the readable stream and
also potentially start to drop frames if the backpressure gets too high 🤔

Copy link
Contributor Author

@1egoman 1egoman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea good point, I added something in 6bdd819. But FYI that other readable streams like the ones that power data streams don't have this, so it sounds like maybe there needs to be a pass done through everything to add it.

also cc @ladvoc, because this is similar to a discussion we had in regards to the rust implementation - the default channel buffer length is I think 4 and this should probably be configurable externally somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. We should arguably add something for data streams as well. What makes it less significant - in theory - for data streams is that they are supposed to have a defined end.

Thanks for adding that queuing strategy. Should we maybe use a default that's a bit higher? 4 seems very low, but not sure either about the exact use cases.
ideally we'd want something that is a sane default that doesn't require adjustments from 90% of users

Copy link
Contributor Author

@1egoman 1egoman Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with 4 because that is what the rust implementation had used for its internal mpsc channel which acts in a similar fashion, but it looks like that might have been updated to 16 and I missed that (link), so I made that update!

If you think it should be higher, I'm open to it, but It's a hard thing to pick a non super high default for. The optimal value is heavily dependent on the sample rate of data being received, the size of each sample, and the speed at which samples are being consumed. So either we pick a super high default and users just assume it is unlimited until they hit an edge case and realize it isn't, or we pick a lower threshold and users may run into the barrier faster but it means they have to be more strategic about what it is configured to for their specific use case. Also cc @ladvoc in case you have any thoughts.

I think my preference would be to either keep it unlimited by default, pick some fairly conservative threshold, or (ideally but most complex) to somehow expose some function which can be fed in derived metrics and which could compute an optimal high water mark value.

No matter what we choose, both implementations should be updated in lock step so they have similar behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will keep this as is for now, and we can discuss it further in the integration pull request if need be.

@1egoman
Copy link
Contributor Author

1egoman commented Mar 5, 2026

Ok I think this is completely done now in my eyes, @ladvoc @lukasIO other than the queuing strategy comment / coming to a decision on the default value, is there anything else either of you would like to see done to this before merging?

@1egoman 1egoman force-pushed the data-track-incoming-manager branch from c5a8d77 to 2c2a1bd Compare March 16, 2026 16:05
@1egoman
Copy link
Contributor Author

1egoman commented Mar 16, 2026

(rebased on top of latest main)

descriptor.subscription.completionFuture.reject?.(DataTrackSubscribeError.disconnected());
break;
case 'active':
this.unSubscribeRequest(descriptor.info.sid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Handling remote participant disconnected events is not apart of the Rust implementation yet, so I'm curious about the logic here. In this case, do we need/want to explicitly unsubscribe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the discussion here in a comment above: #1819 (comment)

TL;DR - This was something @lukasIO surfaced as being important so I added it and it probably should also make its way into the rust implementation as well.

Copy link
Contributor

@ladvoc ladvoc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ✅, logic looks consistent with the Rust implementation besides a few places I comment on.

@1egoman 1egoman merged commit 3bea46a into main Mar 17, 2026
6 checks passed
@1egoman 1egoman deleted the data-track-incoming-manager branch March 17, 2026 17:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants