-
Notifications
You must be signed in to change notification settings - Fork 0
feat(tts): integrate Voice.ai WebSocket streaming and refactor pipeli… #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,78 @@ | ||||||||||||||||||||||||||||||||||||||||||||||
| """Lightweight asynchronous circuit breaker pattern implementation. | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Protects the application from cascading failures when calling external APIs. | ||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||
| from collections.abc import Callable | ||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| class CircuitBreakerOpenException(Exception): | ||||||||||||||||||||||||||||||||||||||||||||||
| """Raised when an execution is attempted while the circuit breaker is open.""" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| class AsyncCircuitBreaker: | ||||||||||||||||||||||||||||||||||||||||||||||
| """Lightweight async circuit breaker. | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| States: | ||||||||||||||||||||||||||||||||||||||||||||||
| CLOSED: Normal operation. All calls go through. | ||||||||||||||||||||||||||||||||||||||||||||||
| OPEN: Failure threshold reached. Calls are blocked immediately. | ||||||||||||||||||||||||||||||||||||||||||||||
| HALF_OPEN: Cooldown period expired. A probe call is allowed. | ||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||||||||||||||||
| self, failure_threshold: int = 5, recovery_timeout: float = 30.0 | ||||||||||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||
| """Initialize the circuit breaker. | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||
| failure_threshold: Number of consecutive failures to open the circuit. | ||||||||||||||||||||||||||||||||||||||||||||||
| recovery_timeout: Cooldown duration in seconds before attempting recovery. | ||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||
| self.failure_threshold = failure_threshold | ||||||||||||||||||||||||||||||||||||||||||||||
| self.recovery_timeout = recovery_timeout | ||||||||||||||||||||||||||||||||||||||||||||||
| self.failure_count = 0 | ||||||||||||||||||||||||||||||||||||||||||||||
| self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN | ||||||||||||||||||||||||||||||||||||||||||||||
| self.last_state_change = time.monotonic() | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| async def call(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||||||||||
| """Execute the function, wrapped in circuit breaker logic.""" | ||||||||||||||||||||||||||||||||||||||||||||||
| now = time.monotonic() | ||||||||||||||||||||||||||||||||||||||||||||||
| if self.state == "OPEN": | ||||||||||||||||||||||||||||||||||||||||||||||
| if now - self.last_state_change > self.recovery_timeout: | ||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Circuit breaker entering HALF_OPEN state") | ||||||||||||||||||||||||||||||||||||||||||||||
| self.state = "HALF_OPEN" | ||||||||||||||||||||||||||||||||||||||||||||||
| self.last_state_change = now | ||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||
| raise CircuitBreakerOpenException("Circuit breaker is OPEN") | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||
| res = await func(*args, **kwargs) | ||||||||||||||||||||||||||||||||||||||||||||||
| if self.state == "HALF_OPEN": | ||||||||||||||||||||||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||
| "Circuit breaker entering CLOSED state after successful probe" | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| self.state = "CLOSED" | ||||||||||||||||||||||||||||||||||||||||||||||
| self.failure_count = 0 | ||||||||||||||||||||||||||||||||||||||||||||||
| self.last_state_change = now | ||||||||||||||||||||||||||||||||||||||||||||||
| return res | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+55
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Failure counter not reset on success in CLOSED state breaks "consecutive failures" semantics. The docstring states the circuit trips after "consecutive failures," but 🐛 Proposed fix to reset failure count on any success try:
res = await func(*args, **kwargs)
+ # Reset on any success to track consecutive failures
+ if self.failure_count > 0:
+ self.failure_count = 0
if self.state == "HALF_OPEN":
logger.info(
"Circuit breaker entering CLOSED state after successful probe"
)
self.state = "CLOSED"
- self.failure_count = 0
self.last_state_change = now
return res📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||
| self.failure_count += 1 | ||||||||||||||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||||||||||||||
| self.state in ("CLOSED", "HALF_OPEN") | ||||||||||||||||||||||||||||||||||||||||||||||
| and self.failure_count >= self.failure_threshold | ||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||
| "Circuit breaker entering OPEN state " | ||||||||||||||||||||||||||||||||||||||||||||||
| "due to %d consecutive failures", | ||||||||||||||||||||||||||||||||||||||||||||||
| self.failure_count, | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| self.state = "OPEN" | ||||||||||||||||||||||||||||||||||||||||||||||
| self.last_state_change = now | ||||||||||||||||||||||||||||||||||||||||||||||
| raise e | ||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from app.external_services.deepgram.service import DeepgramSTTService | ||
| from app.external_services.deepgram.streaming import DeepgramStreamingSTT | ||
|
|
||
| __all__ = ["DeepgramSTTService"] | ||
| __all__ = ["DeepgramSTTService", "DeepgramStreamingSTT"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the coverage gate drop to preserve CI quality.
Line 39 lowers
--cov-fail-underto60, which significantly weakens regression protection. Keep the previous threshold (or reduce minimally with a committed follow-up plan).🤖 Prompt for AI Agents