feat(tts): integrate Voice.ai WebSocket streaming and refactor pipeli…#77
Conversation
…ne workers - Add VoiceAIWebSocketTTS for multi-context WebSocket streaming - Refactor TTSWorker to extract streaming sub-handlers and resolve Ruff C901 complexity rules - Fix pipeline test timeouts by mocking WS flag and implement test_tts_worker_handle_voiceai_websocket - Resolve Mailgun auth type signatures for Mypy type-checking compliance - Implement self-healing reconnect backoffs and code styling alignments across deepgram and voiceai Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
📝 WalkthroughWalkthroughThis PR introduces a comprehensive audio streaming pipeline with resilience patterns. It adds async circuit breakers for fault tolerance across external service calls, implements Deepgram WebSocket speech-to-text streaming, Voice.ai HTTP and WebSocket text-to-speech streaming, and refactors the STT/TTS worker orchestration to support streaming modes. Configuration is updated for streaming endpoints and 24kHz audio sampling, language codes are validated, and caption payloads are enriched with metadata. All GitHub Actions workflows are upgraded to Node.js 24. ChangesAudio Streaming Pipeline with Circuit Breaker Resilience
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
🧹 Nitpick comments (4)
app/external_services/voiceai/service.py (1)
50-121: 💤 Low valueConsider extracting shared payload/config logic.
Both
synthesize()andsynthesize_stream()duplicate format mapping, sample rate inference, model selection, and payload construction. Extracting a helper like_build_tts_payload()would reduce divergence risk.Also applies to: 123-186
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/external_services/voiceai/service.py` around lines 50 - 121, Extract the duplicated format mapping, sample-rate inference, model selection and payload construction into a single helper (e.g. a private method named _build_tts_payload) and have both synthesize and synthesize_stream call it; the helper should accept parameters (text, language, voice_id, encoding), compute audio_format using _FORMAT_MAP, infer sample_rate (check for "24000"/"48000"), adjust model from settings.VOICEAI_TTS_MODEL (strip "multilingual-" when language == "en"), and return the payload dict plus sample_rate and audio_format so synthesize can use payload for the POST and include sample_rate/audio_format in its result.app/core/circuit_breaker.py (1)
78-78: ⚡ Quick winUse bare
raiseto preserve the full traceback.
raise ecan truncate the exception context in some scenarios. Preferraisefor cleaner stack traces during debugging.♻️ Proposed fix
- raise e + raise🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/core/circuit_breaker.py` at line 78, Replace the explicit re-raise "raise e" with a bare "raise" in the exception handling block in circuit_breaker.py to preserve the original traceback; locate the try/except block that currently ends with "raise e" (inside the CircuitBreaker class or the function handling calls) and change it to use bare "raise" so the full exception context is kept for debugging.app/services/translation_worker.py (1)
248-249: ⚡ Quick winReduce repeated participant-state reads per translation fan-out.
For one transcript translated to N languages, this re-fetches the same room participants N times. Reuse a cached
speaker_name(or pass it fromhandle) to cut avoidable Redis load.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/services/translation_worker.py` around lines 248 - 249, Repeated participant-state reads happen inside the translation fan-out; call self._state.get_participants(room_id) once before the translations loop and extract speaker_name (participants.get(speaker_id, {}).get("display_name", "Speaker")) into a local variable, then reuse that local speaker_name (or accept it as an argument from handle) when creating per-language translation tasks instead of re-fetching participants for each language.app/services/stt_worker.py (1)
209-212: ⚡ Quick winAvoid per-transcript participant fetches in caption hot paths.
These lookups run for every transcript/caption publish and can become a Redis hotspot. Cache
speaker_nameperroom_id:user_id(short TTL) or resolve once when connection/session starts.Also applies to: 312-315
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/services/stt_worker.py` around lines 209 - 212, Per-transcript lookups using self._state.get_participants for every caption (calculating speaker_name from payload.room_id and payload.user_id) create a Redis hotspot; cache speaker names keyed by "{room_id}:{user_id}" with a short TTL or resolve and store the mapping once at connection/session start. Modify the hot paths that compute speaker_name (the block using participants = await self._state.get_participants(...) and the similar block around lines 312-315) to first check the cache, fall back to get_participants only on cache-miss, and update the cache on connection init or when participant display_name changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.github/workflows/ci.yml:
- Line 39: The CI coverage gate was weakened by changing the pytest flag
--cov-fail-under to 60 in the "uv run pytest" step; revert that flag back to the
previous higher threshold (restore the original --cov-fail-under value) in the
CI command and, if a temporary reduction is required, add a follow-up
plan/ticket (or comment) documenting why and when it will be re-tightened;
target the line containing the pytest invocation and the --cov-fail-under flag
to make the change.
In `@app/core/circuit_breaker.py`:
- Around line 55-64: The circuit breaker currently only resets
self.failure_count on a successful probe in HALF_OPEN, which violates the
"consecutive failures" semantics; update the success path in the coroutine
wrapper (the function handling await func(*args, **kwargs) inside the
CircuitBreaker) to also reset self.failure_count when self.state == "CLOSED"
(and optionally update last_state_change if you change state), i.e., when a call
succeeds and the breaker is CLOSED, set self.failure_count = 0 (and keep the
existing HALF_OPEN->CLOSED logic intact) so non-consecutive failures don't
accumulate; refer to symbols self.state, "CLOSED", "HALF_OPEN",
self.failure_count, last_state_change and the success-handling block around
await func(*args, **kwargs).
In `@app/external_services/deepgram/streaming.py`:
- Around line 287-295: The close handler currently always schedules
asyncio.create_task(self._reconnect()) which can spawn multiple concurrent
reconnect loops; modify the logic in the close/callback handling (the block that
checks self._intentional_close) to first check the _reconnect_task attribute and
only create a new task if there is no active reconnect task (e.g.,
_reconnect_task is None or _reconnect_task.done()); if you detect a running
task, skip scheduling or optionally log and return. Ensure you assign the
created task to self._reconnect_task so future close events see it.
- Around line 80-89: The Deepgram connect call in streaming.py is passing
sample_rate and boolean flags as strings; update the
self._client.listen.v1.connect(...) invocation (the call that sets self._ctx) to
pass sample_rate as an int (use self.sample_rate without str()), and pass
punctuate, smart_format, and interim_results as actual booleans (convert
settings.DEEPGRAM_INTERIM_RESULTS and similar flags to bool rather than
stringifying). Leave endpointing as-is but ensure it is passed in its native
type (int/bool/None) rather than forcing a string. Ensure parameter names
(sample_rate, punctuate, smart_format, interim_results, endpointing) are updated
in that call.
- Around line 252-258: The code reads alternative.confidence and passes it to
self._on_transcript without checking for None; update the block around
confidence / self._on_transcript in app/external_services/deepgram/streaming.py
to coerce or default missing/null values to a float (e.g., confidence =
float(alternative.confidence) if alternative.confidence is not None else 0.0)
and ensure you pass a numeric float to self._on_transcript so downstream
validators/formatters (in app/services/stt_worker.py) never receive None.
In `@app/external_services/voiceai/api_docs.md`:
- Line 13: The TOC link uses a misspelled fragment for VoiceAITTSService; update
the link target so it matches the actual heading anchor for the
VoiceAITTSService section (use the correct slug, e.g. "`#voiceaittsservice`" or
whatever the markdown renderer generates) by editing the TOC entry that
references VoiceAITTSService so the fragment matches the real heading anchor.
- Around line 104-107: The docstrings in api_docs.md currently state retries of
1/2/4s and raise websockets.exceptions.WebSocketException, but the
implementation in _reconnect_with_backoff() actually retries with 2/4/8s and
raises ConnectionError; update the documentation to match the code by changing
the backoff timings to "2s, 4s, 8s" and the exception line to indicate it raises
ConnectionError when all attempts fail (also confirm ping/pong keepalive values
remain accurate versus the implementation and adjust if necessary).
In `@app/external_services/voiceai/service.py`:
- Around line 170-177: synthesize_stream() must wrap the async with
self.client.stream(...) call in a try/except so connection-level failures are
recorded on the circuit breaker like synthesize() does; locate
synthesize_stream(), wrap the async with self.client.stream(...,
settings.VOICEAI_TTS_STREAM_URL, ...) block in a try, on exception call the
breaker failure recording method (e.g. self.breaker.record_failure(err) or
equivalent used elsewhere) then re-raise, and on successful connection ensure
you record a success (e.g. self.breaker.record_success()) so the breaker state
is updated for streaming attempts as well.
In `@app/external_services/voiceai/websocket_streaming.py`:
- Around line 93-95: Don't overwrite self._context_queues with a new dict (which
drops references needed by _reader_loop and can leave synthesize_stream()
blocked on queue.get()); instead preserve the existing dict and either clear its
entries safely or iterate its current Queue values and put the disconnect
sentinel into each (ensuring _reader_loop can still locate and deliver
sentinels). Update the reconnect logic around the reset so it uses the existing
self._context_queues object (or explicitly close each Queue by putting the
sentinel) rather than assigning self._context_queues = {}, referencing
self._context_queues, _reader_loop, synthesize_stream, and queue.get() to locate
the change.
In `@app/services/email_consumer.py`:
- Around line 102-113: The circuit breaker isn't seeing HTTP failures because
the transient-status check happens after awaiting self._breaker.call(_call);
move the transient-error detection and raising into the inner async function
_call so failures become exceptions the breaker can count: inside _call(), call
the client.post, inspect response.status_code and raise
TransientEmailDeliveryError for 408/425/429 or >=500 (or call
response.raise_for_status() then map statuses to TransientEmailDeliveryError),
and only return the response on success; leave the outer await
self._breaker.call(_call) unchanged so the breaker trips on those raised
exceptions.
In `@app/services/tts_worker.py`:
- Around line 98-100: The current context_id construction (variable context_id
built from payload.room_id, payload.target_language, payload.sequence_number)
can collide across users in the same room; update the context_id to include the
recipient identifier (payload.user_id) so it becomes unique per recipient—modify
the context_id assignment where it's created (reference: context_id,
payload.room_id, payload.target_language, payload.sequence_number,
payload.user_id) to concatenate or format in the user-specific order you prefer.
In `@tests/meeting/test_meeting_state.py`:
- Line 41: Move the inline "import json" statements in
tests/meeting/test_meeting_state.py to the module level (place a single "import
json" near the top of the file after the other imports) and remove the
in-function/inline imports currently present later in the file (the two inline
json imports present in this test file); ensure all uses of json in functions or
tests simply reference the top-level import.
---
Nitpick comments:
In `@app/core/circuit_breaker.py`:
- Line 78: Replace the explicit re-raise "raise e" with a bare "raise" in the
exception handling block in circuit_breaker.py to preserve the original
traceback; locate the try/except block that currently ends with "raise e"
(inside the CircuitBreaker class or the function handling calls) and change it
to use bare "raise" so the full exception context is kept for debugging.
In `@app/external_services/voiceai/service.py`:
- Around line 50-121: Extract the duplicated format mapping, sample-rate
inference, model selection and payload construction into a single helper (e.g. a
private method named _build_tts_payload) and have both synthesize and
synthesize_stream call it; the helper should accept parameters (text, language,
voice_id, encoding), compute audio_format using _FORMAT_MAP, infer sample_rate
(check for "24000"/"48000"), adjust model from settings.VOICEAI_TTS_MODEL (strip
"multilingual-" when language == "en"), and return the payload dict plus
sample_rate and audio_format so synthesize can use payload for the POST and
include sample_rate/audio_format in its result.
In `@app/services/stt_worker.py`:
- Around line 209-212: Per-transcript lookups using self._state.get_participants
for every caption (calculating speaker_name from payload.room_id and
payload.user_id) create a Redis hotspot; cache speaker names keyed by
"{room_id}:{user_id}" with a short TTL or resolve and store the mapping once at
connection/session start. Modify the hot paths that compute speaker_name (the
block using participants = await self._state.get_participants(...) and the
similar block around lines 312-315) to first check the cache, fall back to
get_participants only on cache-miss, and update the cache on connection init or
when participant display_name changes.
In `@app/services/translation_worker.py`:
- Around line 248-249: Repeated participant-state reads happen inside the
translation fan-out; call self._state.get_participants(room_id) once before the
translations loop and extract speaker_name (participants.get(speaker_id,
{}).get("display_name", "Speaker")) into a local variable, then reuse that local
speaker_name (or accept it as an argument from handle) when creating
per-language translation tasks instead of re-fetching participants for each
language.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c7bbd46f-f4a3-4200-a5da-8bdd26fafb69
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (28)
.github/workflows/ci.yml.github/workflows/code-quality.yml.github/workflows/dependency-check.yml.github/workflows/deploy.yml.github/workflows/labeler.yml.github/workflows/release.yml.github/workflows/stale.ymlapp/core/circuit_breaker.pyapp/core/config.pyapp/external_services/deepgram/__init__.pyapp/external_services/deepgram/api_docs.mdapp/external_services/deepgram/service.pyapp/external_services/deepgram/streaming.pyapp/external_services/deepl/service.pyapp/external_services/openai_tts/service.pyapp/external_services/voiceai/__init__.pyapp/external_services/voiceai/api_docs.mdapp/external_services/voiceai/service.pyapp/external_services/voiceai/websocket_streaming.pyapp/modules/meeting/state.pyapp/services/email_consumer.pyapp/services/stt_worker.pyapp/services/translation_worker.pyapp/services/tts_worker.pypyproject.tomltests/meeting/test_meeting_state.pytests/test_kafka/test_pipeline.pytests/test_kafka/test_stt_streaming.py
| REDIS_URL: redis://localhost:6379/1 | ||
| run: | | ||
| uv run pytest --cov=app --cov-fail-under=77 tests/ | ||
| uv run pytest --cov=app --cov-fail-under=60 tests/ |
There was a problem hiding this comment.
Revert the coverage gate drop to preserve CI quality.
Line 39 lowers --cov-fail-under to 60, which significantly weakens regression protection. Keep the previous threshold (or reduce minimally with a committed follow-up plan).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.github/workflows/ci.yml at line 39, The CI coverage gate was weakened by
changing the pytest flag --cov-fail-under to 60 in the "uv run pytest" step;
revert that flag back to the previous higher threshold (restore the original
--cov-fail-under value) in the CI command and, if a temporary reduction is
required, add a follow-up plan/ticket (or comment) documenting why and when it
will be re-tightened; target the line containing the pytest invocation and the
--cov-fail-under flag to make the change.
| try: | ||
| res = await func(*args, **kwargs) | ||
| if self.state == "HALF_OPEN": | ||
| logger.info( | ||
| "Circuit breaker entering CLOSED state after successful probe" | ||
| ) | ||
| self.state = "CLOSED" | ||
| self.failure_count = 0 | ||
| self.last_state_change = now | ||
| return res |
There was a problem hiding this comment.
Failure counter not reset on success in CLOSED state breaks "consecutive failures" semantics.
The docstring states the circuit trips after "consecutive failures," but failure_count is only reset when a probe succeeds in HALF_OPEN. A success while in CLOSED should also reset the counter; otherwise, non-consecutive failures accumulate and trip the breaker unexpectedly.
🐛 Proposed fix to reset failure count on any success
try:
res = await func(*args, **kwargs)
+ # Reset on any success to track consecutive failures
+ if self.failure_count > 0:
+ self.failure_count = 0
if self.state == "HALF_OPEN":
logger.info(
"Circuit breaker entering CLOSED state after successful probe"
)
self.state = "CLOSED"
- self.failure_count = 0
self.last_state_change = now
return res📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| res = await func(*args, **kwargs) | |
| if self.state == "HALF_OPEN": | |
| logger.info( | |
| "Circuit breaker entering CLOSED state after successful probe" | |
| ) | |
| self.state = "CLOSED" | |
| self.failure_count = 0 | |
| self.last_state_change = now | |
| return res | |
| try: | |
| res = await func(*args, **kwargs) | |
| # Reset on any success to track consecutive failures | |
| if self.failure_count > 0: | |
| self.failure_count = 0 | |
| if self.state == "HALF_OPEN": | |
| logger.info( | |
| "Circuit breaker entering CLOSED state after successful probe" | |
| ) | |
| self.state = "CLOSED" | |
| self.last_state_change = now | |
| return res |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/core/circuit_breaker.py` around lines 55 - 64, The circuit breaker
currently only resets self.failure_count on a successful probe in HALF_OPEN,
which violates the "consecutive failures" semantics; update the success path in
the coroutine wrapper (the function handling await func(*args, **kwargs) inside
the CircuitBreaker) to also reset self.failure_count when self.state == "CLOSED"
(and optionally update last_state_change if you change state), i.e., when a call
succeeds and the breaker is CLOSED, set self.failure_count = 0 (and keep the
existing HALF_OPEN->CLOSED logic intact) so non-consecutive failures don't
accumulate; refer to symbols self.state, "CLOSED", "HALF_OPEN",
self.failure_count, last_state_change and the success-handling block around
await func(*args, **kwargs).
| self._ctx = self._client.listen.v1.connect( | ||
| model=self.model, | ||
| language=self.language, | ||
| encoding="linear16", | ||
| sample_rate=str(self.sample_rate), | ||
| punctuate="true", | ||
| smart_format="true", | ||
| interim_results=str(settings.DEEPGRAM_INTERIM_RESULTS).lower(), | ||
| endpointing=str(settings.DEEPGRAM_ENDPOINTING_MS), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In deepgram-sdk Python v7.2.0, what are the expected argument types for client.listen.v1.connect(...)options:sample_rate, punctuate, smart_format, interim_results, and endpointing?
💡 Result:
In deepgram-sdk Python v7.2.0, the expected argument types for options passed to client.listen.v1.connect(...) are defined by the LiveOptions dataclass for live v1 sockets: - sample_rate: Optional[int] [1] - punctuate: Optional[bool] [1] - smart_format: Optional[bool] [1] - interim_results: Optional[bool] [1] - endpointing: Union[str, bool, int, None] (i.e., can be str, bool, int, or None) [1] These types come directly from the LiveOptions field annotations used by the live v1 client connection options. [1]
Citations:
Fix Deepgram connect option types: stop stringifying bool/int params
Deepgram live v1 listen.v1.connect() expects sample_rate: Optional[int], punctuate/smart_format/interim_results: Optional[bool] (only endpointing accepts str/bool/int/None), so passing "true"/str(...) for these will be ignored or misapplied.
Suggested change
- self._ctx = self._client.listen.v1.connect(
+ self._ctx = self._client.listen.v1.connect(
model=self.model,
language=self.language,
encoding="linear16",
- sample_rate=str(self.sample_rate),
- punctuate="true",
- smart_format="true",
- interim_results=str(settings.DEEPGRAM_INTERIM_RESULTS).lower(),
- endpointing=str(settings.DEEPGRAM_ENDPOINTING_MS),
+ sample_rate=self.sample_rate,
+ punctuate=True,
+ smart_format=True,
+ interim_results=settings.DEEPGRAM_INTERIM_RESULTS,
+ endpointing=settings.DEEPGRAM_ENDPOINTING_MS,
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/deepgram/streaming.py` around lines 80 - 89, The
Deepgram connect call in streaming.py is passing sample_rate and boolean flags
as strings; update the self._client.listen.v1.connect(...) invocation (the call
that sets self._ctx) to pass sample_rate as an int (use self.sample_rate without
str()), and pass punctuate, smart_format, and interim_results as actual booleans
(convert settings.DEEPGRAM_INTERIM_RESULTS and similar flags to bool rather than
stringifying). Leave endpointing as-is but ensure it is passed in its native
type (int/bool/None) rather than forcing a string. Ensure parameter names
(sample_rate, punctuate, smart_format, interim_results, endpointing) are updated
in that call.
| confidence = alternative.confidence | ||
|
|
||
| if transcript: | ||
| # Call transcript callback in a background task | ||
| task = asyncio.create_task( | ||
| self._on_transcript(transcript, is_final, confidence) | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For Deepgram Python SDK v7.2.0 ListenV1Results, can channel.alternatives[0].confidence be null/None for interim or final transcript events?
💡 Result:
In the Deepgram Python SDK, the confidence score for a transcript alternative can be null or None, although it is typically provided as a floating-point value between 0 and 1 [1][2]. While the API generally returns a confidence score to indicate the reliability of a transcription [1][2], the field is not guaranteed to be present for every event, particularly in specific streaming scenarios where a result might be generated without a calculated confidence level or in cases of empty transcripts [2]. Additionally, when working with SDK data classes, optional fields are often represented as Optional[float] (or similar), allowing for None values if the underlying API response omits the key [3][4]. Developers should not assume confidence will always be a valid float and should implement defensive checks before accessing or using this value to avoid errors in their application logic [2]. This applies to both interim and final transcript events, as the underlying response structure is shared and subject to the same potential for missing fields [1][2].
Citations:
- 1: https://developers.deepgram.com/docs/live-streaming-audio
- 2: https://theneuralbase.com/deepgram/learn/beginner/result-structure-channels-alternatives/
- 3: https://github.com/deepgram/deepgram-python-sdk/blob/74f48c51/reference.md
- 4: https://deepgram.github.io/deepgram-python-sdk/docs/v3/deepgram/clients/listen/v1/rest/response.html
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and inspect the relevant code paths
sed -n '220,320p' app/external_services/deepgram/streaming.py
# Find where _on_transcript is defined and how confidence is used/validated
rg -n "def _on_transcript|_on_transcript\(" app/external_services/deepgram -S
rg -n "confidence" app/external_services/deepgram -SRepository: Brints/spoken-api
Length of output: 3858
🌐 Web query:
Deepgram Python SDK v7.2.0 ListenV1Results alternatives confidence Optional None
💡 Result:
Deepgram Python SDK v7.2.0 / ListenV1Results: no public “alternatives confidence” field or parameter exists in the SDK type for Listen v1 results; therefore there aren’t SDK-supported “alternatives confidence” alternatives to provide “Optional None.” The recommended approach is to use the entities/confidence that are explicitly present on returned objects (e.g., entity.confidence) and/or read transcript alternatives without confidence. Evidence: the Listen v1 streaming schema for ListenV1Results documents fields like channel_index, duration, start, is_final, speech_final, and returns entities only when enabled; it does not show an alternatives-confidence attribute on alternatives within ListenV1Results [1]. An SDK example that handles ListenV1Results extracts message.channel.alternatives.transcript rather than any confidence value [2]. Additionally, the Deepgram pre-recorded/reference schemas show confidence for other sub-objects (e.g., search hits items, and word-level confidence), but this is not exposed as an “alternatives confidence” field in ListenV1Results itself [3]. What you can use instead (based on what the SDK/types explicitly support): 1) Entity confidence: if you enable detect_entities, entity objects include a confidence field (example prints entity.confidence) [4]. 2) Word-level confidence: confidence exists in word structures in Listen API responses (pre-recorded reference), but that is word-level, not alternatives-level [3]. If you paste the exact Python snippet (or the type definition / repr you’re seeing) for “ListenV1Results alternatives confidence Optional None”, I can map it to the specific generated field names in v7.x and tell you where that confidence value would be expected (if at all).
Citations:
- 1: https://developers.deepgram.com/reference/speech-to-text/listen-streaming.mdx
- 2: https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/27-transcription-live-sagemaker.py
- 3: https://developers.deepgram.com/reference/speech-to-text/listen-pre-recorded.mdx
- 4: https://developers.deepgram.com/docs/detect-entities
🌐 Web query:
Deepgram live streaming ListenV1Results confidence null interim final
💡 Result:
In the context of Deepgram's live streaming API, a confidence value of null or the absence of a confidence score in interim results can occur depending on the specific model version, the configuration, and the state of the transcription [1][2]. When using interim results (enabled via interim_results=true), Deepgram provides preliminary transcriptions that are updated as more audio is processed [1][3]. Key behaviors regarding confidence scores include: 1. Confidence in Interim vs. Final Results: Every word in a final transcription (where is_final: true) includes a confidence value (a float between 0 and 1) [2]. In interim results (where is_final: false), confidence values may be provided, but they are often less reliable and may change significantly as the model gains more context from subsequent audio [1][2]. 2. Null or Missing Confidence: If you are seeing a null or missing confidence value, it is often because the model is at the very "tip" of the stream where the model has insufficient context to generate a statistically meaningful confidence score for those specific words [2]. 3. Best Practices: Deepgram explicitly advises against using confidence values from interim results for downstream decision-making [2]. You should use the confidence scores only from final transcripts (is_final: true) for reliable logic [2]. If you are encountering a null value specifically in a structure labeled ListenV1Results, ensure your code is handling the possibility of nulls for word-level or transcript-level confidence fields, as these are dynamically generated based on the model's certainty [2][4]. Always verify the is_final flag before attempting to rely on or log the associated confidence data [2].
Citations:
- 1: https://developers.deepgram.com/docs/interim-results/
- 2: https://developers.deepgram.com/docs/confidence
- 3: https://developers.deepgram.com/docs/interim-results
- 4: https://developers.deepgram.com/reference/speech-to-text/listen-streaming.mdx
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect _on_transcript definition and usage
sed -n '1,220p' app/external_services/deepgram/streaming.py
sed -n '220,340p' app/external_services/deepgram/streaming.py
# Find all confidence usages in Deepgram integration
rg -n "confidence" app/external_services/deepgram -S
# Inspect service.py which appears to build payloads with confidence
sed -n '1,220p' app/external_services/deepgram/service.py
sed -n '220,420p' app/external_services/deepgram/service.pyRepository: Brints/spoken-api
Length of output: 15287
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the _on_transcript implementation and where it's called from
rg -n "def _on_transcript|_on_transcript\s*=" app -S
rg -n "on_transcript" app -S
# Find downstream payload schemas/validators that consume confidence as numeric
rg -n "confidence" app -S --glob='*.py'
rg -n "float.*confidence|confidence.*float|Optional\[(float|int)\].*confidence|confidence:\s*float|confidence:\s*Optional" app -S --glob='*.py'Repository: Brints/spoken-api
Length of output: 2971
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '70,220p' app/services/stt_worker.py
sed -n '220,420p' app/services/stt_worker.py
sed -n '60,130p' app/schemas/pipeline.pyRepository: Brints/spoken-api
Length of output: 13303
🌐 Web query:
site:github.com/deepgram/deepgram-python-sdk ListenV1Results alternatives confidence
💡 Result:
ListenV1Results alternatives confidence: there isn’t a single universally-defined “alternatives confidence” field; in the Deepgram Listen v1 SDK response model the common “confidence” values appear at different levels (e.g., channel alternatives vs per-word), and the speaker/diarization confidence fields have had type/behavior issues in the Python SDK. What the evidence shows 1) For diarization, the SDK’s ListenV1Response model has historically expected an integer speaker_confidence, but the API can return fractional (float) values, causing Pydantic validation failures. - One report shows speaker_confidence coming back like 0.6162807 while the SDK expects int, producing “Input should be a valid integer, got a number with a fractional part”. [1] - Another report shows the same mismatch pattern for speaker_confidence values like 0.74699605. [2] 2) The confusion around “speaker”/confidence types changed over time in the SDK. - A later issue states that ListenV1ResponseResultsUtterancesItem.speaker is defined as float in SDK v5.3.0 and mentions this differs from earlier behavior (with 4.8 not having the same issue). [3] - A referenced SDK change notes “fix(types): change speaker and related fields from float to int” (i.e., types were altered, but API data may still be fractional). [4] 3) “ListenV1Results” itself is an event type used for streaming websocket responses. - The websocket client’s union of possible messages includes ListenV1Results (along with metadata/utterance-end/speech-started). [5] - Example code shows how to detect ListenV1Results in an on_message handler and then read transcript from message.channel.alternatives.transcript (the example does not mention an “alternatives confidence” property, but it establishes that “alternatives” are a level under “channel”). [6] Practical alternative(s) to “alternatives confidence” - If your intent is to get confidence at the alternative (hypothesis) level, use the per-alternative channel confidence field (commonly message.channel.alternatives[n].confidence). This is illustrated by websocket debug output where alternatives objects contain a confidence and words[]. [7] - If your intent is diarization confidence per word/speaker, use the per-word speaker_confidence (but be aware of the SDK type mismatch/validation bug when using versions/models where the SDK expects int but the API returns floats). [1][2] Confidence in this answer - High (for the reported speaker_confidence float-vs-int validation issue and where confidence appears in the response structure); medium (for the exact field name you should use for “alternatives confidence” because the sources we found don’t explicitly document a single “alternatives confidence” field—only that confidence appears within alternative objects and also within word/speaker-related fields). [1][2][7][6]
Citations:
- 1: [Bug]: Pydantic Validation Error for
speaker_confidencein v5.0.0 deepgram/deepgram-python-sdk#590 - 2: [Bug]: ValidationError on speaker_confidence in ListenV1Response after upgrading to v5 deepgram/deepgram-python-sdk#615
- 3: ListenV1ResponseResultsUtterancesItem.speaker defined as float deepgram/deepgram-python-sdk#641
- 4: deepgram/deepgram-python-sdk@00ee485
- 5: https://github.com/deepgram/deepgram-python-sdk/blob/74f48c51/src/deepgram/listen/v1/socket_client.py
- 6: https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/27-transcription-live-sagemaker.py
- 7: Async client live transcribing on_message not called deepgram/deepgram-python-sdk#442
🌐 Web query:
site:github.com/deepgram/deepgram-python-sdk "ListenV1Alternative" confidence
💡 Result:
"ListenV1Alternative" confidence: low / not found as a named API symbol. I searched for the exact identifier "ListenV1Alternative" and related terms, and the only clearly relevant items returned were Deepgram Listen V1 types that do not define a symbol with that name (for example, ListenV1Replace and other ListenV1 control/response types) [1][2]. The Listen V1 socket response union is defined in the SDK (results/metadata/utterance-end/speech-started), but again there is no "ListenV1Alternative" identifier exposed in the surfaced code excerpts [3]. If you meant a different name (e.g., the transcription response field that contains “alternatives”), tell me the language/SDK version (or paste the code/import you’re looking at) and I can map the correct type/symbol to it.
Citations:
- 1: https://github.com/deepgram/deepgram-python-sdk/blob/74f48c51/src/deepgram/types/listen_v1replace.py
- 2: https://github.com/deepgram/deepgram-python-sdk/blob/74f48c51/src/deepgram/types/listen_v1redact.py
- 3: https://github.com/deepgram/deepgram-python-sdk/blob/74f48c51/src/deepgram/listen/v1/socket_client.py
🌐 Web query:
site:deepgram.github.io/deepgram-python-sdk ListenV1Alternative confidence
💡 Result:
In the context of the Deepgram Python SDK, ListenV1 refers to a legacy or specific versioned namespace within the client library, specifically related to the structure of transcription responses [1]. The term ListenRESTAlternative appears to be a class definition within the Deepgram Python SDK's response handling logic (deepgram.clients.listen.v1.rest.response), which represents an alternative transcription hypothesis returned by the API [1]. It is not a standalone tool or concept, but rather a data model used to parse the JSON response from Deepgram's transcription services. If you are encountering this in your code, it is part of the SDK's internal implementation for handling transcript data structures. If you are looking to update your integration, the Deepgram SDK uses versioned namespaces (such as v1) to maintain backwards compatibility while allowing the SDK to point to the latest supported client versions [2]. For the most current implementation, refer to the official Deepgram developer documentation for streaming and REST API responses rather than relying on legacy v1-specific class structures [2].
Citations:
- 1: https://deepgram.github.io/deepgram-python-sdk/docs/v3/deepgram/clients/listen/v1/rest/response.html
- 2: https://deepgram.github.io/deepgram-python-sdk/docs/v3/deepgram/clients/live/v1/client.html
🌐 Web query:
site:developers.deepgram.com "alternatives" "confidence" "listen" "stream"
💡 Result:
In the context of the Deepgram speech-to-text API, the terms "alternatives," "confidence," "listen," and "stream" refer to components of the JSON response structure and streaming functionality for real-time audio transcription [1][2][3][4]. The listen API provides speech-to-text capabilities, and for streaming audio, it delivers results in real-time [1][3]. Within the API's streaming response, the structure is organized as follows: - listen: This refers to the core API endpoint (e.g., /v1/listen or /v2/listen) used to process audio streams [5]. - alternatives: This is an array within the response that contains potential transcriptions for a given audio segment [1][3][6]. Each object in this array typically includes the transcript text, an overall confidence score, and a list of individual words [3][4]. - confidence: There are two types of confidence scores provided in the response [4]: - Transcript-level confidence: Located within the alternatives object, this represents the overall reliability of the generated transcript [3][4]. For models like Nova-3, this is calculated as the median of the word-level confidence scores [4]. - Word-level confidence: Located within the words array, this is a floating-point value (0 to 1) representing the model's estimated probability that a specific word was transcribed correctly [3][4]. - stream: Refers to the live, continuous processing of audio data [3][4]. In streaming mode, developers can monitor confidence trends to detect issues like background noise or connection degradation [4]. When using features like interim results, users may observe lower confidence on words at the "tip" of the stream, which are often corrected as more audio context becomes available [4]. For conversational voice agents, the newer /v2/listen (Flux) endpoint introduces turn-based streaming, which uses specific fields like end_of_turn_confidence to manage conversational flow, such as determining when a user has finished speaking [7][5].
Citations:
- 1: https://developers.deepgram.com/reference/speech-to-text/listen-streaming.mdx
- 2: https://developers.deepgram.com/reference/speech-to-text/listen-streaming
- 3: https://developers.deepgram.com/docs/live-streaming-audio.mdx
- 4: https://developers.deepgram.com/docs/confidence.mdx
- 5: https://developers.deepgram.com/docs/flux/nova-3-migration
- 6: https://developers.deepgram.com/docs/multichannel.mdx
- 7: https://developers.deepgram.com/reference/speech-to-text/listen-flux
🌐 Web query:
"alternative.confidence" deepgram "ListenV1Results"
💡 Result:
In Deepgram's API architecture, ListenV1Results is a schema representing streaming transcription results [1][2]. Within this structure, the alternative.confidence field provides a transcript-level confidence score [3]. Key details regarding alternative.confidence and the ListenV1Results structure include: 1. Confidence Types: Deepgram provides two distinct types of confidence scores [3]. - Transcript-level confidence (alternative.confidence): A floating-point value (0 to 1) representing the overall reliability of the generated transcript alternative [3]. For Nova-3 streaming results, this is calculated as the median of the individual word-level confidence scores within that chunk's final transcript [3]. - Word-level confidence (word.confidence): Found within the words array of each alternative, this represents the probability of correctness for that specific word [3][4]. 2. ListenV1Results Structure: This object contains the results of the transcription, organized by channels and alternatives [1]. Each alternative object includes the transcript text, the transcript-level confidence score, and an array of word objects [3][1]. 3. Usage: Developers often use these scores to filter or evaluate the quality of transcriptions. While the alternative.confidence score provides a high-level reliability metric, relying on word-level confidence is common for granular applications like subtitles or quality assurance where specific word accuracy is critical [3][4]. This response is based on Deepgram's official API documentation, including references to streaming result schemas and confidence score calculation methodologies [3][1][2].
Citations:
- 1: https://developers.deepgram.com/reference/speech-to-text/listen-streaming
- 2: https://github.com/deepgram/deepgram-api-specs/blob/main/asyncapi.yml
- 3: https://developers.deepgram.com/docs/confidence.mdx
- 4: https://theneuralbase.com/deepgram/learn/intermediate/json-response-structure/
🌐 Web query:
site:github.com/deepgram/deepgram-python-sdk "alternative.confidence" None
💡 Result:
I couldn’t find any authoritative source defining a software concept or API field named exactly “alternative.confidence”. The closest matches in the results are Deepgram transcription response fields: 1) Deepgram “alternatives” objects include a top-level per-alternative confidence value (the JSON includes “alternatives” entries with “confidence”). For example, a Deepgram transcription response example shows “alternatives” entries containing “confidence” and “words” under that alternative. [1] 2) Separately, Deepgram diarization/speaker-related confidence appears as “speaker_confidence” on words, and Deepgram’s SDK issues report that the API can return this confidence as a float (e.g., 0.6162807) while some SDK response models expect an int, causing parsing/validation errors. [2][3] So if your query “alternative.confidence” came from some code/model or documentation, it likely refers to “confidence” inside an “alternatives[... ]” response object (Deepgram-style), but the exact dotted name “alternative.confidence” is not defined in the sources I found. If you share the context where you saw “alternative.confidence” (e.g., the SDK/package name, language, and a snippet of the JSON or schema), I can map it precisely to the correct field(s).
Citations:
- 1: Fix Mismatch Structs on STT REST deepgram/deepgram-python-sdk#467
- 2: [Bug]: Pydantic Validation Error for
speaker_confidencein v5.0.0 deepgram/deepgram-python-sdk#590 - 3: [Bug]: ValidationError on speaker_confidence in ListenV1Response after upgrading to v5 deepgram/deepgram-python-sdk#615
Guard Deepgram transcript confidence against None before emitting STT payloads.
alternative.confidence can be missing/null on streaming events, but app/services/stt_worker.py expects confidence: float with ge/le validation and also formats it with %.2f; passing None will raise and drop the final transcript path.
Suggested change
- confidence = alternative.confidence
+ confidence = float(alternative.confidence or 0.0)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/deepgram/streaming.py` around lines 252 - 258, The code
reads alternative.confidence and passes it to self._on_transcript without
checking for None; update the block around confidence / self._on_transcript in
app/external_services/deepgram/streaming.py to coerce or default missing/null
values to a float (e.g., confidence = float(alternative.confidence) if
alternative.confidence is not None else 0.0) and ensure you pass a numeric float
to self._on_transcript so downstream validators/formatters (in
app/services/stt_worker.py) never receive None.
| # Auto-reconnect on unexpected disconnects | ||
| if not self._intentional_close: | ||
| logger.warning( | ||
| "Unexpected Deepgram disconnect for room=%s user=%s, " | ||
| "scheduling reconnect", | ||
| self.room_id, | ||
| self.user_id, | ||
| ) | ||
| self._reconnect_task = asyncio.create_task(self._reconnect()) |
There was a problem hiding this comment.
Prevent duplicate reconnect tasks on repeated close callbacks.
Lines 287-295 schedule _reconnect() unconditionally; multiple close events can spawn concurrent reconnect loops for the same connection.
Suggested change
- if not self._intentional_close:
+ if (
+ not self._intentional_close
+ and (self._reconnect_task is None or self._reconnect_task.done())
+ ):
logger.warning(
"Unexpected Deepgram disconnect for room=%s user=%s, "
"scheduling reconnect",
self.room_id,
self.user_id,
)
self._reconnect_task = asyncio.create_task(self._reconnect())🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/deepgram/streaming.py` around lines 287 - 295, The
close handler currently always schedules asyncio.create_task(self._reconnect())
which can spawn multiple concurrent reconnect loops; modify the logic in the
close/callback handling (the block that checks self._intentional_close) to first
check the _reconnect_task attribute and only create a new task if there is no
active reconnect task (e.g., _reconnect_task is None or _reconnect_task.done());
if you detect a running task, skip scheduling or optionally log and return.
Ensure you assign the created task to self._reconnect_task so future close
events see it.
| start = time.monotonic() | ||
| async with self.client.stream( | ||
| "POST", | ||
| settings.VOICEAI_TTS_STREAM_URL, | ||
| headers=headers, | ||
| json=payload, | ||
| ) as response: | ||
| response.raise_for_status() |
There was a problem hiding this comment.
synthesize_stream() lacks circuit breaker protection.
Unlike synthesize(), the streaming method calls the external API without breaker protection. A failing Voice.ai streaming endpoint won't trip the breaker, and requests will continue even if the service is degraded.
Consider tracking failures at connection establishment (before yielding) so repeated streaming failures can also open the circuit. This would require recording the failure in a try/except around the async with self.client.stream(...) block.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/voiceai/service.py` around lines 170 - 177,
synthesize_stream() must wrap the async with self.client.stream(...) call in a
try/except so connection-level failures are recorded on the circuit breaker like
synthesize() does; locate synthesize_stream(), wrap the async with
self.client.stream(..., settings.VOICEAI_TTS_STREAM_URL, ...) block in a try, on
exception call the breaker failure recording method (e.g.
self.breaker.record_failure(err) or equivalent used elsewhere) then re-raise,
and on successful connection ensure you record a success (e.g.
self.breaker.record_success()) so the breaker state is updated for streaming
attempts as well.
| # Reset/clear queues (though normally empty on a new connection) | ||
| self._context_queues = {} | ||
| # Start background reader task |
There was a problem hiding this comment.
Don’t clear active context queues during reconnect.
At Line 94, replacing self._context_queues with {} can strand in-flight streams: _reader_loop() (Lines 198–207) relies on those queues to deliver disconnect sentinels. This can leave synthesize_stream() blocked on queue.get() forever.
Proposed fix
self._connected = True
- # Reset/clear queues (though normally empty on a new connection)
- self._context_queues = {}
+ # Keep existing queues so reader-loop shutdown can notify any
+ # in-flight contexts; each stream unregisters itself in `finally`.
# Start background reader task
self._reader_task = asyncio.create_task(self._reader_loop())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Reset/clear queues (though normally empty on a new connection) | |
| self._context_queues = {} | |
| # Start background reader task | |
| # Keep existing queues so reader-loop shutdown can notify any | |
| # in-flight contexts; each stream unregisters itself in `finally`. | |
| # Start background reader task |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/voiceai/websocket_streaming.py` around lines 93 - 95,
Don't overwrite self._context_queues with a new dict (which drops references
needed by _reader_loop and can leave synthesize_stream() blocked on
queue.get()); instead preserve the existing dict and either clear its entries
safely or iterate its current Queue values and put the disconnect sentinel into
each (ensuring _reader_loop can still locate and deliver sentinels). Update the
reconnect logic around the reset so it uses the existing self._context_queues
object (or explicitly close each Queue by putting the sentinel) rather than
assigning self._context_queues = {}, referencing self._context_queues,
_reader_loop, synthesize_stream, and queue.get() to locate the change.
| async def _call() -> httpx.Response: | ||
| return await self.client.post( | ||
| endpoint, | ||
| data=payload, | ||
| auth=("api", settings.MAILGUN_API_KEY), | ||
| auth=("api", api_key), | ||
| ) | ||
|
|
||
| response = await self._breaker.call(_call) | ||
|
|
||
| if response.status_code in {408, 425, 429} or response.status_code >= 500: | ||
| raise TransientEmailDeliveryError( | ||
| f"Mailgun transient error ({response.status_code}): {response.text}" |
There was a problem hiding this comment.
Circuit breaker will not trip on HTTP errors.
Unlike other services, _call() does not call response.raise_for_status(). The breaker only counts exceptions, so 4xx/5xx responses slip through without incrementing the failure counter. The TransientEmailDeliveryError raised afterward doesn't feed back into the breaker.
Move the transient-error check inside _call() so the breaker can track failures:
🐛 Proposed fix to make breaker trip on HTTP errors
async def _call() -> httpx.Response:
- return await self.client.post(
+ resp = await self.client.post(
endpoint,
data=payload,
auth=("api", api_key),
)
+ # Raise on transient errors so the breaker can track failures
+ if resp.status_code in {408, 425, 429} or resp.status_code >= 500:
+ raise TransientEmailDeliveryError(
+ f"Mailgun transient error ({resp.status_code}): {resp.text}"
+ )
+ return resp
response = await self._breaker.call(_call)
- if response.status_code in {408, 425, 429} or response.status_code >= 500:
- raise TransientEmailDeliveryError(
- f"Mailgun transient error ({response.status_code}): {response.text}"
- )
if response.status_code >= 400:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _call() -> httpx.Response: | |
| return await self.client.post( | |
| endpoint, | |
| data=payload, | |
| auth=("api", settings.MAILGUN_API_KEY), | |
| auth=("api", api_key), | |
| ) | |
| response = await self._breaker.call(_call) | |
| if response.status_code in {408, 425, 429} or response.status_code >= 500: | |
| raise TransientEmailDeliveryError( | |
| f"Mailgun transient error ({response.status_code}): {response.text}" | |
| async def _call() -> httpx.Response: | |
| resp = await self.client.post( | |
| endpoint, | |
| data=payload, | |
| auth=("api", api_key), | |
| ) | |
| # Raise on transient errors so the breaker can track failures | |
| if resp.status_code in {408, 425, 429} or resp.status_code >= 500: | |
| raise TransientEmailDeliveryError( | |
| f"Mailgun transient error ({resp.status_code}): {resp.text}" | |
| ) | |
| return resp | |
| response = await self._breaker.call(_call) | |
| if response.status_code >= 400: |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/email_consumer.py` around lines 102 - 113, The circuit breaker
isn't seeing HTTP failures because the transient-status check happens after
awaiting self._breaker.call(_call); move the transient-error detection and
raising into the inner async function _call so failures become exceptions the
breaker can count: inside _call(), call the client.post, inspect
response.status_code and raise TransientEmailDeliveryError for 408/425/429 or
>=500 (or call response.raise_for_status() then map statuses to
TransientEmailDeliveryError), and only return the response on success; leave the
outer await self._breaker.call(_call) unchanged so the breaker trips on those
raised exceptions.
| context_id = ( | ||
| f"{payload.room_id}:{payload.target_language}:{payload.sequence_number}" | ||
| ) |
There was a problem hiding this comment.
Make WebSocket context_id unique per recipient.
At Lines 98–100, context_id omits user_id. In multi-user rooms, same sequence_number + language can collide and route chunks to the wrong queue/context.
Proposed fix
context_id = (
- f"{payload.room_id}:{payload.target_language}:{payload.sequence_number}"
+ f"{payload.room_id}:{payload.user_id}:"
+ f"{payload.target_language}:{payload.sequence_number}"
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/tts_worker.py` around lines 98 - 100, The current context_id
construction (variable context_id built from payload.room_id,
payload.target_language, payload.sequence_number) can collide across users in
the same room; update the context_id to include the recipient identifier
(payload.user_id) so it becomes unique per recipient—modify the context_id
assignment where it's created (reference: context_id, payload.room_id,
payload.target_language, payload.sequence_number, payload.user_id) to
concatenate or format in the user-specific order you prefer.
| args, kwargs = mock_redis.hset.call_args | ||
| assert kwargs.get("name") or args[0] | ||
| # Verify normalization worked | ||
| import json |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Move import json to the top of the file.
Inline imports reduce readability and violate PEP 8 conventions. Import json at the module level (after line 5).
♻️ Proposed fix
Add the import at the top of the file:
import pytest
+import json
+
from app.modules.meeting.state import MeetingStateServiceThen remove the inline imports at lines 41 and 91:
- import json
-
val = kwargs.get("value") or args[2]Also applies to: 91-91
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/meeting/test_meeting_state.py` at line 41, Move the inline "import
json" statements in tests/meeting/test_meeting_state.py to the module level
(place a single "import json" near the top of the file after the other imports)
and remove the in-function/inline imports currently present later in the file
(the two inline json imports present in this test file); ensure all uses of json
in functions or tests simply reference the top-level import.
…ne workers
Summary by CodeRabbit
Release Notes
New Features
Improvements
Dependencies