Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/early-toys-beg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"partytracks": patch
---

Update track cleanup/close logic to be more stable
1 change: 1 addition & 0 deletions packages/partytracks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"check:test": "(cd tests; vitest --no-watch)",
"prebuild": "rm -rf dist",
"build": "tsup src/client/index.ts src/react/index.ts src/server/index.ts --format esm --dts --external react",
"dev": "npm run build -- --watch",
"postbuild": "prettier --write ./dist/*/**.d.ts"
},
"exports": {
Expand Down
67 changes: 34 additions & 33 deletions packages/partytracks/src/client/PartyTracks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class PartyTracks {
};
this.#params = new URLSearchParams(config.apiExtraParams);
this.history = new History<ApiHistoryEntry>(config.maxApiHistory);
this.peerConnection$ = new Observable<RTCPeerConnection>((subscribe) => {
this.peerConnection$ = new Observable<RTCPeerConnection>((subscriber) => {
let peerConnection: RTCPeerConnection;
const setup = () => {
peerConnection?.close();
Expand All @@ -96,7 +96,7 @@ export class PartyTracks {
logger.debug(
`💥 Peer connectionState is ${peerConnection.connectionState}`
);
subscribe.next(setup());
subscriber.next(setup());
}
});

Expand All @@ -110,7 +110,7 @@ export class PartyTracks {
logger.debug(
`💥 Peer iceConnectionState is ${peerConnection.iceConnectionState}`
);
subscribe.next(setup());
subscriber.next(setup());
} else if (peerConnection.iceConnectionState === "disconnected") {
// TODO: we should start to inspect the connection stats from here on for
// any other signs of trouble to guide what to do next (instead of just hoping
Expand All @@ -121,15 +121,15 @@ export class PartyTracks {
logger.debug(
`💥 Peer iceConnectionState was ${peerConnection.iceConnectionState} for more than ${timeoutSeconds} seconds`
);
subscribe.next(setup());
subscriber.next(setup());
}, timeoutSeconds * 1000);
}
});

return peerConnection;
};

subscribe.next(setup());
subscriber.next(setup());

return () => {
peerConnection.close();
Expand Down Expand Up @@ -257,7 +257,7 @@ export class PartyTracks {
sessionId: string,
trackName: string
): Observable<TrackMetadata> {
return new Observable<TrackMetadata>((subscribe) => {
return new Observable<TrackMetadata>((subscriber) => {
let pushedTrackPromise: Promise<unknown>;
// we're doing this in a timeout so that we can bail if the observable
// is unsubscribed from immediately after subscribing. This will prevent
Expand Down Expand Up @@ -306,32 +306,30 @@ export class PartyTracks {
.then(({ tracks }) => {
const trackData = tracks.find((t) => t.mid === transceiver.mid);
if (trackData) {
subscribe.next({
subscriber.next({
...trackData,
sessionId,
location: "remote"
});
subscriber.add(() => {
if (transceiver.mid) {
logger.debug("🔚 Closing pushed track ", trackName);
this.#closeTrackInBulk(
peerConnection,
transceiver.mid,
sessionId
);
}
});
} else {
subscribe.error(new Error("Missing TrackData"));
subscriber.error(new Error("Missing TrackData"));
}
})
.catch((err) => subscribe.error(err));
.catch((err) => subscriber.error(err));
});

return () => {
clearTimeout(timeout);
pushedTrackPromise?.then(() => {
this.taskScheduler.schedule(async () => {
if (transceiver.mid) {
logger.debug("🔚 Closing pushed track ", trackName);
this.#closeTrackInBulk(
peerConnection,
transceiver.mid,
sessionId
);
}
});
});
};
}).pipe(retryWithBackoff());
}
Expand Down Expand Up @@ -413,11 +411,10 @@ export class PartyTracks {
track: MediaStreamTrack;
trackMetadata: TrackMetadata;
}> {
let mid = "";
return new Observable<{
track: MediaStreamTrack;
trackMetadata: TrackMetadata;
}>((subscribe) => {
}>((subscriber) => {
let pulledTrackPromise: Promise<unknown>;
// we're doing this in a timeout so that we can bail if the observable
// is unsubscribed from immediately after subscribing. This will prevent
Expand Down Expand Up @@ -500,25 +497,29 @@ export class PartyTracks {
if (trackInfo) {
trackInfo.resolvedTrack
.then((track) => {
mid = trackInfo.mid;
subscribe.next({ track, trackMetadata });
subscriber.next({ track, trackMetadata });
subscriber.add(() => {
logger.debug(
"🔚 Closing pulled track ",
trackMetadata.trackName
);
this.#closeTrackInBulk(
peerConnection,
trackInfo.mid,
sessionId
);
});
})
.catch((err) => subscribe.error(err));
.catch((err) => subscriber.error(err));
} else {
subscribe.error(new Error("Missing Track Info"));
subscriber.error(new Error("Missing Track Info"));
}
return trackMetadata.trackName;
});
});

return () => {
clearTimeout(timeout);
pulledTrackPromise?.then((trackName) => {
if (mid) {
logger.debug("🔚 Closing pulled track ", trackName);
this.#closeTrackInBulk(peerConnection, mid, sessionId);
}
});
};
}).pipe(retryWithBackoff());
}
Expand Down