Skip to content

Conversation

@toubatbrian
Copy link
Contributor

@toubatbrian toubatbrian commented Dec 8, 2025

Problem

Previously we have #849 which adds close() call to STT / TTS / VAD on agent activity close and subsequently #883. Turned out these are not correct way of handling resource cleanup, because there could be another AgentActivity starts after the closed activity, i.e. agent transfer. However, the close() method on STT / TTS are "global" w.r.t the STT / TTS class, meaning aborting the class object not only cancels any active stt / tts stream, but also make STT / TTS unusable for any future streams created under this STT / TTS object, making the speeches after agent transfer failed.

Fix

  • Refactor default sttNode llmNode and ttsNode to always call close() on respective stream objects created from global STT TTS LLM component, allowing plugins to implement proper resource cleanup inside SpeechStream SynthsizeStream and LLMStream instead of on the root class.
  • Some minor bug fixes to improve stability

Future TODOs

  • Ensure proper abort handling for external request in TTS / LLM streams across all plugins

@changeset-bot
Copy link

changeset-bot bot commented Dec 8, 2025

🦋 Changeset detected

Latest commit: cecac00

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 15 packages
Name Type
@livekit/agents Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-deepgram Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugins-test Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@toubatbrian
Copy link
Contributor Author

cc @simllll @KrishnaShuk

@simllll
Copy link
Contributor

simllll commented Dec 8, 2025

We have multiple agents with handoffs, the close handler is only called if the instance is draining / closing. So as long as the session is not closed, it should not close anything? Or what I'm missing here? Can you show a test or a demo where this happens?

Looking at the code it also stops the main queue during cleanup, so do you start another session and the new session is then broken?

@toubatbrian
Copy link
Contributor Author

toubatbrian commented Dec 8, 2025

We have multiple agents with handoffs, the close handler is only called if the instance is draining / closing. So as long as the session is not closed, it should not close anything? Or what I'm missing here?

The close() is on AgentActivity not on AgentSession. During an agent handoff, one activity is drained and closed and the other activity starts. The PR you had close the stt / stt / vad inside AgentActivity instead of AgentSession. Depending on the logics inside close() this could work properly, but the combination of PRs actually fails agent transfer. Maybe I shouldn't remove the close() inside agent activity. Let me think of a better approach.

Can you show a test or a demo where this happens?

You can try running pnpm build && node ./examples/src/basic_tool_call_agent.ts dev and setting the stt to openai.STT. Then when after doing an agent transfer, notice that STT request could no longer be invoked since the class variable abortController would be closed already.

@toubatbrian
Copy link
Contributor Author

I'm working on a fix right now.

@toubatbrian toubatbrian changed the title Revert Improper Resource Cleanup [WIP] Revert Improper Resource Cleanup Dec 8, 2025
_recognize(frame: AudioFrame): Promise<SpeechEvent> {
return this.#stt.recognize(frame);
_recognize(frame: AudioFrame, abortSignal?: AbortSignal): Promise<SpeechEvent> {
return this.#stt.recognize(frame, abortSignal);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugins can now attach this abortSignal for any external request cancellation

private logger = log();
private _connOptions: APIConnectOptions;

protected abortController = new AbortController();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an abortController in root STT class, which mainly handles any external request cancellation such as fetch. abortSignal will be aborted whenever the underling STT stream is closed.

);
}
wrapped_stt = new STTStreamAdapter(wrapped_stt, agent.vad);
wrapped_stt = new STTStreamAdapter(wrapped_stt, activity.vad);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix a minor bug on non-streaming STT, allowing vad on AgentSession to forward to Agent if not set explicitly

const cleanup = () => {
if (cleaned) return;
cleaned = true;
stream.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ stream.close() in STT / TTS / LLM stream should be the method that plugin uses to do proper resource cancellation. @simllll @KrishnaShuk

samples100Ms,
);

const abortPromise = waitForAbort(this.abortSignal);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor bug fix: prevent overflowing max #listeners

@toubatbrian toubatbrian changed the title [WIP] Revert Improper Resource Cleanup Improve AgentActivity resource cleanup Dec 8, 2025
@toubatbrian toubatbrian requested a review from a team December 8, 2025 12:02
@KrishnaShuk
Copy link
Contributor

KrishnaShuk commented Dec 8, 2025

@toubatbrian just to confirm my understanding: the new pattern will be that the base SpeechStream (and SynthesizeStream, etc.) will have its own AbortController, and plugins should use this.abortSignal for any internal async operations, correct?

@toubatbrian
Copy link
Contributor Author

@KrishnaShuk yes, the abortController is only exposed in base stream class. Any child class only have access to abortSignal (for cancelling external http / SSE / WS requests), and the only way to abort a stream is via stream.close() method, which is basically the same as previous approach except we move close() logic to steam level not STT / TTS class level.

btw there's the PR for TTS resource cleanup #893, let me know your thoughts!

@KrishnaShuk
Copy link
Contributor

So after merging of both PRs (this one and #893) the resource cleanup issue is fully resolved right?

@toubatbrian
Copy link
Contributor Author

So after merging of both PRs (this one and #893) the resource cleanup issue is fully resolved right?

After those 2 PRs I still need to handle cleanup on LLM nodes, but after all those been done the issue should be fully resolved

@toubatbrian toubatbrian merged commit e590012 into main Dec 9, 2025
8 checks passed
@toubatbrian toubatbrian deleted the brian/fix-cleanup branch December 9, 2025 08:19
@github-actions github-actions bot mentioned this pull request Dec 9, 2025
@toubatbrian toubatbrian mentioned this pull request Dec 10, 2025
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants