Conversation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Guard audio/text channel sends with ChanClosed suppression - Wrap recv_task message handlers in try/except to prevent single malformed frame from breaking the receive loop - Replace deprecated asyncio.get_event_loop() with get_running_loop() - Add response_id to GenerationCreatedEvent emissions - Add exponential backoff (1s -> 30s max) on reconnect instead of fixed 1s delay - Add 32 unit tests covering init, URL building, audio conversion, retry backoff, options, and data structures Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove unused imports in tests (ruff F401) - Fix import sorting (ruff I001) - Apply ruff format to realtime_model.py - Add __pdoc__ dict to realtime/__init__.py for docs generation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds a new PersonaPlex LiveKit plugin: packaging and README, plugin bootstrap (registration, logger, version), voice type definitions, a full RealtimeModel/RealtimeSession implementation (WebSocket + Opus audio I/O, generation lifecycle, retry/backoff, metrics), and tests. Changes
Sequence DiagramsequenceDiagram
participant Agent as LiveKit Agent
participant Session as RealtimeSession
participant Encoder as Opus Encoder
participant WS as WebSocket
participant Server as PersonaPlex Server
participant Decoder as Opus Decoder
Agent->>Session: push_audio(AudioFrame)
Session->>Session: resample to 24kHz mono
Session->>Encoder: encode(PCM)
Encoder-->>Session: Opus bytes
Session->>WS: send(MSG_AUDIO, Opus bytes)
WS->>Server: WebSocket message
Server->>WS: send(MSG_TEXT / MSG_AUDIO)
WS->>Session: receive(MSG_TEXT / MSG_AUDIO)
alt MSG_TEXT
Session->>Session: filter special tokens
Session->>Agent: emit(text token)
else MSG_AUDIO
Session->>Decoder: decode(Opus bytes)
Decoder-->>Session: PCM samples
Session->>Agent: emit(AudioFrame `@24kHz`)
end
Note over Session: silence timeout triggers finalization
Session->>Session: _finalize_generation()
Session->>Agent: emit(GenerationCreatedEvent + metrics)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used📓 Path-based instructions (1)**/*.py📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧠 Learnings (2)📓 Common learnings📚 Learning: 2026-01-30T12:53:12.738ZApplied to files:
🧬 Code graph analysis (1)livekit-plugins/livekit-plugins-personaplex/tests/test_realtime_model.py (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
🔇 Additional comments (8)
✏️ Tip: You can disable this entire section by setting Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Preserve wss:// / https:// scheme for TLS deployments - Use logger.warning for unsupported dynamic instructions - Remove redundant length check on opus_bytes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Rename TestBuildWsUrl to TestSessionOptions with proper typed helper - Rename TestHandleTextToken to TestSpecialTokens to reflect actual scope - Wrap ResponseGeneration assertions in try/finally for channel cleanup Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Added by upstream in AGT-2474 (livekit#4622). PersonaPlex is full-duplex and does not support explicit user turn commits. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use ws:// scheme in all example URLs (README, docstrings) - Sync uv.lock after upstream merge Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Thanks for the PR! |
# Conflicts: # uv.lock
…r special tokens by raw byte
# Conflicts: # uv.lock
|
Unable to connect to the Personaplex server via Agent Livekit. The server runs correctly standalone (port 8998), but throws an error when Agent Livekit is connected. |
|
This is a known issue in the moshi server (NVIDIA PersonaPlex fork), not in the LiveKit plugin. In moshi/server.py:212, opus_loop() calls pcm.shape without checking if pcm is None. When the Opus decoder doesn't have enough data to produce a full frame, read_pcm() returns None and it crashes. To fix this, patch moshi/server.py around line 212: beforeif pcm.shape[-1] == 0: afterif pcm is None or pcm.shape[-1] == 0: On the LiveKit plugin side, we've already pushed defensive fixes (input validation, sphn 0.2+ API compatibility) in the feat/personaplex-plugin branch. |
# Conflicts: # uv.lock
|
Hi @milanperovic, thank you so much for the PR!! I'll take a look and test this out, could you move the files to |
# Conflicts: # uv.lock
…hen pending future exists
|
Hi @tinalenguyen, done! I've moved everything into |
|
Could we restructure to I wasn't able to fully test the plugin after encountering some problems with my server, though I think it might be due to the SSL certificate handling, I'll report back on that. Here are a few notes in the meantime:
Changing that line to
Let me know your thoughts! |
…Error, move tests to root
…ve codecs to optional deps
| try: | ||
| await ws_conn.send_bytes(msg) | ||
| except Exception as e: | ||
| logger.error(f"Error sending message: {e}", exc_info=True) | ||
| break |
There was a problem hiding this comment.
🔴 _send_task swallows WebSocket send exceptions, bypassing error recovery in _main_task
In _send_task (line 449-453), exceptions from ws_conn.send_bytes(msg) are caught, logged, and the loop breaks normally. Because the task completes without raising, task.result() at realtime_model.py:390 does not re-raise, and the outer except Exception as e block (line 404) is never entered. This means on a send failure: (1) no error event is emitted to listeners, (2) no retry delay is applied — the loop immediately reconnects, potentially causing a tight reconnect-fail loop, (3) _msg_ch is not reset (it is only reset in _mark_restart_needed at line 328 and the error handler at line 415), so stale Opus packets encoded with the now-discarded _opus_writer will be sent on the new connection with a fresh server-side decoder, causing audio corruption. By contrast, _recv_task correctly raises APIConnectionError on failure (realtime_model.py:496-500), which propagates to _main_task's error recovery block.
| try: | |
| await ws_conn.send_bytes(msg) | |
| except Exception as e: | |
| logger.error(f"Error sending message: {e}", exc_info=True) | |
| break | |
| try: | |
| await ws_conn.send_bytes(msg) | |
| except Exception: | |
| raise |
Was this helpful? React with 👍 or 👎 to provide feedback.
|
Hi @tinalenguyen ! I've addressed all your feedback: Restructured to livekit.plugins.nvidia.experimental.realtime — matches the AWS plugin pattern exactly |
|
Hi @milanp-sh, thanks again for iterating! We made a PR with some edits on top of yours, it'd be great to combine the changes here so it works all around for everyone. I get spammed these logs when testing your PR: I believe some edits from Shayne's PR will resolve it. Could you take a look at the differences made? In the linked PR, I don't see those logs, but I don't receive any responses from the model either. With your PR, I see those logs, but the model responds after I resolved the audioframe issue (though sometimes I see the agent transcripts twice). Let me know your thoughts! |
- Use 1920 samples (80ms) instead of SAMPLE_RATE//10 (2400) for valid Opus frame sizes, fixing audio frame warning spam - Wait for server handshake before sending audio in _send_task to prevent dropped frames during system prompt processing - Reset _msg_ch on reconnect to discard stale Opus packets - Handle WS close code 1000 gracefully instead of treating as error
…oByteStream The AudioByteStream in _main_task was created with SAMPLE_RATE // 10 = 2400 samples, which is not a valid Opus frame size. This caused warning spam: "Skipping invalid audio frame... got 2400". Changed to 1920 (80ms) to match the __init__ value and valid Opus frame sizes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ask AudioByteStream" This reverts commit f0ea6fc.
|
Hi @tinalenguyen ! I looked at both the warning spam and Shayne's PR. (@ShayneP) The "Skipping invalid audio frame... got 2400" warnings are caused by a frame size bug — _main_task recreates AudioByteStream with SAMPLE_RATE // 10 (2400 samples), which isn't a valid Opus frame size. The fix is changing it to 1920 (80ms). Devin flagged the same thing. I've also integrated Shayne's connection reliability fixes (handshake wait, graceful WS close, channel cleanup on reconnect) which should address the duplicate transcripts. I kept our Opus encoding API since it uses the newer sphn interface — that might be why Shayne's version wasn't getting model responses. |
|
@milanperovic I just tested it out and I was able to get it working with audio 🎉 During my testing, I noticed that the generation cycle would not update accordingly. After the model initiates the conversation first, another generation is immediately started and the conversation would continue under that one generation. Let me know if you are able to repro this! Also, it seems that |
Summary
Add new livekit-plugins-personaplex plugin for NVIDIA PersonaPlex full-duplex conversational AI
Implements RealtimeModel / RealtimeSession as a WebSocket client connecting to a separately-deployed PersonaPlex server
Binary WebSocket protocol with Opus audio encoding/decoding via sphn
Silence-based generation boundary detection, exponential backoff reconnection, and metrics emission
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.