Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ENV VCON_SERVER_BUILD_TIME=${VCON_SERVER_BUILD_TIME}
RUN sed -i 's|http://deb.debian.org|https://deb.debian.org|g' /etc/apt/sources.list.d/debian.sources

RUN apt-get update && \
apt-get install -y libavdevice-dev ffmpeg
apt-get install -y libavdevice-dev ffmpeg libchromaprint1

# Install SoX dependency
# https://pysox.readthedocs.io/en/latest/#installation
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ecdsa= "^0.18.0"
python-dotenv = "^1.0.1"
starlette = ">=0.40.0"
transformers = "^4.48.0"
scipy = "^1.13.0"
trio = "^0.28.0"
pytest = "^8.3.4"
anyio = "^4.8.0"
Expand Down
130 changes: 73 additions & 57 deletions server/links/wtf_transcribe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
timeout: Request timeout in seconds (default: 300)
min-duration: Minimum dialog duration to transcribe in seconds (default: 5)
api-key: Optional API key for vfun server authentication
cacheish: Enable audio caching for repetitive content (default: true)

Example configuration in config.yml:
wtf_transcribe:
Expand Down Expand Up @@ -44,10 +45,14 @@
from lib.error_tracking import init_error_tracker
from lib.metrics import increment_counter
from redis_mgr import redis
from links.wtf_transcribe.fingerprint_cache import compute_hashes, FingerprintCache

init_error_tracker()
logger = init_logger(__name__)

# Per-worker in-memory fingerprint cache (populated after fork)
_fingerprint_cache = FingerprintCache(max_entries=1000)


# ---------------------------------------------------------------------------
# Health-aware vfun URL selector with self-healing
Expand Down Expand Up @@ -120,6 +125,7 @@ def get_ordered_urls(self, urls: List[str]) -> List[str]:
"min-duration": 5,
"api-key": None,
"cache-ttl": 604800, # 7 days in seconds
"fingerprint-cache-size": 1000, # max entries in fingerprint cache
}

# Redis cache key prefixes for transcription results
Expand Down Expand Up @@ -256,12 +262,38 @@ def get_audio_content(dialog: Dict[str, Any]) -> Optional[bytes]:
url = dialog["url"]
if url.startswith("file://"):
filepath = url[7:]
try:
with open(filepath, "rb") as f:
return f.read()
except Exception as e:
logger.error(f"Failed to read file {filepath}: {e}")
return None
# Read via the brick-direct bypass mount when available. The
# gluster mount at /mnt/nas is currently performing terribly
# (5s readdir, 14s for a 17KB read); the bypass NFSv3 goes
# straight to the XFS brick and is orders of magnitude faster.
# Falls back to the original path if the bypass copy is missing.
import os as _os, errno, time as _time
_BRICK_BASE = _os.environ.get("BRICK_BASE", "/mnt/slave_recording_bypass")
_NAS_BASE = _os.environ.get("NAS_BASE", "/mnt/nas")
read_path = filepath
if filepath.startswith(_NAS_BASE + "/") and _os.path.isdir(_BRICK_BASE):
candidate = _BRICK_BASE + filepath[len(_NAS_BASE):]
if _os.path.lexists(candidate):
read_path = candidate
for attempt in range(5):
try:
with open(read_path, "rb") as f:
return f.read()
except OSError as e:
last_exc = e
if e.errno in (errno.ESTALE, errno.ENOENT):
try:
_os.stat(_os.path.dirname(read_path) or ".")
except Exception:
pass
_time.sleep(0.05 * (1 << attempt))
continue
logger.error(f"Failed to read file {read_path}: {e}")
return None
logger.error(
f"Failed to read file {read_path} after retries: {last_exc}"
)
return None
else:
try:
resp = requests.get(url, timeout=60)
Expand Down Expand Up @@ -410,6 +442,10 @@ def run(

logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}")

# Update fingerprint cache size from config
fp_max = opts.get("fingerprint-cache-size", 1000)
_fingerprint_cache._max_entries = fp_max

# Check if any vfun URL is configured
if not opts.get("vfun-server-url") and not opts.get("vfun-server-urls"):
logger.error("wtf_transcribe: vfun-server-url or vfun-server-urls is required")
Expand Down Expand Up @@ -440,54 +476,34 @@ def run(
dialogs_skipped += 1
continue

# Check Redis cache before calling vfun
cache_key = get_cache_key(dialog)
cached_body = get_cached_transcription(cache_key, dialog) if cache_key else None

if cached_body:
# Cache hit - use cached transcription
cache_hits += 1
increment_counter("conserver.wtf_transcribe.cache", attributes={"result": "hit"})
logger.info(f"Cache HIT for dialog {i} (key={cache_key})")

wtf_analysis = {
"type": "wtf_transcription",
"dialog": i,
"mediatype": "application/json",
"vendor": "vfun",
"product": "parakeet-tdt-110m",
"schema": "wtf-1.0",
"body": cached_body,
}

vcon.add_analysis(
type=wtf_analysis["type"],
dialog=wtf_analysis["dialog"],
vendor=wtf_analysis.get("vendor"),
body=wtf_analysis["body"],
extra={
"mediatype": wtf_analysis.get("mediatype"),
"product": wtf_analysis.get("product"),
"schema": wtf_analysis.get("schema"),
},
)

dialogs_processed += 1
logger.info(f"Added cached WTF transcription for dialog {i}")
continue

# Cache miss - need to call vfun
cache_misses += 1
increment_counter("conserver.wtf_transcribe.cache", attributes={"result": "miss"})

# Get audio content
audio_content = get_audio_content(dialog)
if not audio_content:
logger.warning(f"Could not extract audio from dialog {i}")
dialogs_skipped += 1
continue

logger.info(f"Cache MISS for dialog {i} - calling vfun (key={cache_key})")
# Check fingerprint cache before calling vfun
fp_hashes = compute_hashes(audio_content)
if fp_hashes:
cached_body = _fingerprint_cache.lookup(fp_hashes)
if cached_body:
vcon.add_analysis(
type="wtf_transcription",
dialog=i,
vendor="vfun",
body=cached_body,
extra={
"mediatype": "application/json",
"product": "parakeet-tdt-110m",
"schema": "wtf-1.0",
},
)
dialogs_processed += 1
logger.info(f"Fingerprint HIT for dialog {i} (matches={cached_body.get('metadata',{}).get('fingerprint_matches',0)})")
continue

logger.info(f"Transcribing dialog {i} via vfun")

# Try each vfun instance in health-priority order until one succeeds
vfun_urls = get_vfun_urls(opts)
Expand All @@ -507,7 +523,8 @@ def run(
for attempt, vfun_server_url in enumerate(vfun_urls):
try:
files = {
"file-binary": ("audio", audio_content, mimetype)
"file-binary": ("audio", audio_content, mimetype),
"cachish": (None, "true")
}
response = requests.post(
vfun_server_url,
Expand All @@ -525,10 +542,6 @@ def run(
duration = dialog.get("duration", 30.0)
wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration))

if cache_key:
store_cached_transcription(cache_key, wtf_analysis["body"], cache_ttl)
logger.info(f"Cached transcription for dialog {i} (key={cache_key})")

vcon.add_analysis(
type=wtf_analysis["type"],
dialog=wtf_analysis["dialog"],
Expand All @@ -541,6 +554,12 @@ def run(
},
)

# Track fingerprint candidate; only store after seen multiple times
if fp_hashes:
fp_id = _fingerprint_cache.note_candidate(fp_hashes)
if _fingerprint_cache.should_store(fp_id):
_fingerprint_cache.store(fp_id, fp_hashes, wtf_analysis["body"])

dialogs_processed += 1
if attempt > 0:
logger.info(f"Added WTF transcription for dialog {i} (succeeded on attempt {attempt + 1})")
Expand Down Expand Up @@ -578,12 +597,9 @@ def run(
vcon_redis.store_vcon(vcon)
logger.info(
f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, "
f"skipped={dialogs_skipped}, cache_hits={cache_hits}, cache_misses={cache_misses}"
f"skipped={dialogs_skipped}"
)
else:
logger.info(
f"No dialogs transcribed for vCon {vcon_uuid} "
f"(cache_hits={cache_hits}, cache_misses={cache_misses})"
)
logger.info(f"No dialogs transcribed for vCon {vcon_uuid}")

return vcon_uuid
Loading
Loading