Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.test

This file was deleted.

20 changes: 20 additions & 0 deletions .env.test.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Test-environment credentials. Copy to .env.test and fill in real values
# to unlock integration tests gated on these variables. pytest.ini loads
# .env.test pre-collection via pytest-dotenv so module-level skipif gates
# (deepgram_link, groq_whisper, etc.) see the variables.
#
# .env.test is gitignored — keep it that way. Never commit real keys.

CONSERVER_API_TOKEN=fake-token

# Deepgram ASR integration tests
DEEPGRAM_KEY=

# OpenAI: unlocks detect_engagement tests, and (with the RUN_* flags
# below) the analyze / analyze_and_label real-API tests.
OPENAI_API_KEY=
RUN_OPENAI_ANALYZE_TESTS=
RUN_OPENAI_ANALYZE_LABEL_TESTS=

# Groq Whisper integration tests
GROQ_API_KEY=
11 changes: 10 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,18 @@ jobs:
- name: Run tests inside Docker container
run: |
docker-compose run --rm conserver bash -c "
pytest --maxfail=5 --disable-warnings
pytest --maxfail=5 --disable-warnings --cov --cov-report=term-missing:skip-covered --cov-report=xml --cov-report=html
"

- name: Upload coverage report
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: |
coverage.xml
htmlcov/

- name: Clean up
if: always()
run: docker-compose down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
*.env
# Test env file: tracked historically, but holds real API keys when
# developers run integration tests locally. Untrack and ignore so
# credentials can't be committed by accident. A sanitized template
# lives at .env.test.example.
.env.test
venv
**/__pycache__
.DS_Store
Expand Down
90 changes: 90 additions & 0 deletions common/lib/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Thin abstraction over Redis queue operations used by the conserver.

Goal: give links and the main loop a single chokepoint for queue work so
no module has to import ``redis_mgr.redis`` directly. Redis remains the
implementation; this is *not* a pluggable-backend interface.

Operations exposed map 1:1 to the patterns already in use:

- ``enqueue(list_name, vcon_id)`` — ``LPUSH`` (head-insert, FIFO with BLPOP)
- ``route_to(list_name, vcon_id)`` — ``RPUSH`` (tail-insert, used by routers)
- ``dequeue(list_names, timeout)`` — ``BLPOP`` (blocking pop, worker loop)
- ``enqueue_dlq(ingress_list, vcon_id)`` — LPUSH onto the DLQ derived from
an ingress list name (uses the existing ``dlq_utils`` naming convention).
- ``set_vcon_ttl(vcon_id, seconds)`` — ``EXPIRE`` on the vCon JSON key.
- ``queue_length(list_name)`` — ``LLEN``.

The class is intentionally stateless and accepts an injectable Redis client
to make unit testing easy. By default it resolves the shared client from
``redis_mgr`` so existing test fixtures that patch ``redis_mgr.redis``
continue to work unchanged.
"""

from typing import List, Optional, Tuple

from lib.logging_utils import init_logger

logger = init_logger(__name__)


def _vcon_key(vcon_id: str) -> str:
return f"vcon:{vcon_id}"


class VconQueue:
"""Queue + TTL operations for vCons in Redis.

Stateless. Holds a reference to a Redis client; defaults to the
shared client managed by ``redis_mgr``.
"""

def __init__(self, client=None) -> None:
if client is None:
# Lazy import so test fixtures that monkey-patch
# ``redis_mgr.redis`` see the patched value.
import redis_mgr

client = redis_mgr.redis
self._client = client

# ---- Queue ops -------------------------------------------------

def enqueue(self, list_name: str, vcon_id: str) -> int:
"""LPUSH a vCon id onto a queue. Pairs with ``dequeue`` (BLPOP)."""
return self._client.lpush(list_name, vcon_id)

def route_to(self, list_name: str, vcon_id: str) -> int:
"""RPUSH a vCon id onto a queue. Used by tag_router / fan-out."""
return self._client.rpush(list_name, str(vcon_id))

def dequeue(
self,
list_names: List[str],
timeout: int = 15,
) -> Optional[Tuple[str, str]]:
"""Block-pop from the first non-empty queue.

Returns ``(list_name, vcon_id)`` or ``None`` on timeout.
"""
return self._client.blpop(list_names, timeout=timeout)

def enqueue_dlq(self, ingress_list: str, vcon_id: str) -> int:
"""Push a vCon onto the DLQ derived from its ingress list.

Uses ``dlq_utils.get_ingress_list_dlq_name`` so the naming
convention stays in one place.
"""
# Lazy import: dlq_utils lives under conserver/, not common/.
from dlq_utils import get_ingress_list_dlq_name

dlq_name = get_ingress_list_dlq_name(ingress_list)
return self._client.lpush(dlq_name, vcon_id)

def queue_length(self, list_name: str) -> int:
return self._client.llen(list_name)

# ---- vCon-key TTL ops -----------------------------------------

def set_vcon_ttl(self, vcon_id: str, seconds: int) -> bool:
"""Set TTL on the vCon JSON key. Returns True if key existed."""
return bool(self._client.expire(_vcon_key(vcon_id), seconds))
94 changes: 94 additions & 0 deletions common/lib/vcon_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Legacy-field tolerance for vCons read from storage.

Older releases of vcon-mcp / vcon-lib / hand-rolled adapters wrote vCons
with field names that pre-date ``draft-ietf-vcon-vcon-core-02``. We want
links to keep processing those records, but we want every *write* to
land in spec-correct form.

This module exposes one function — :func:`normalize_legacy_fields` — that
maps legacy names to spec names on a vCon dict in place. It is intended
to be called at the read boundary (``VconRedis.get_vcon`` /
``get_vcon_dict``) so the rest of the codebase only ever sees
spec-compliant data.

Mappings applied:

- top-level ``appended`` → ``amended``
- attachment / analysis ``schema_version`` → ``schema``
- attachment ``type`` → ``purpose`` (per draft-02 §5.5, but **only** for
attachments whose ``purpose`` slot isn't already populated; the
``lawful_basis`` extension legitimately uses ``type``, so we don't
touch entries that already have a ``type`` *and* a ``purpose``)
- attachment / dialog ``mimetype`` → ``mediatype``
- extension/critical: ``must_support`` (top-level or in entries) →
``critical``

The function is conservative: if both the legacy and the spec field are
present, the spec field wins and the legacy field is dropped.
"""

from typing import Any, Dict, List


_TOP_LEVEL_RENAMES = {
"appended": "amended",
"must_support": "critical",
}


def _rename(d: Dict[str, Any], old: str, new: str) -> None:
"""Move ``d[old]`` to ``d[new]`` unless ``d[new]`` is already set."""
if old not in d:
return
if new in d:
# Spec field already there — drop the legacy one silently.
d.pop(old, None)
return
d[new] = d.pop(old)


def _normalize_entry(entry: Dict[str, Any]) -> None:
if not isinstance(entry, dict):
return
_rename(entry, "schema_version", "schema")
_rename(entry, "mimetype", "mediatype")
_rename(entry, "must_support", "critical")


def _normalize_attachment(att: Dict[str, Any]) -> None:
"""Normalize a single attachment dict.

Per the speckit, attachments use ``purpose``. The legacy field was
``type``. Some extensions (notably ``lawful_basis``) re-use ``type``
*as* their purpose value, so we only migrate when ``purpose`` is
absent.
"""
if not isinstance(att, dict):
return
_normalize_entry(att)
if "purpose" not in att and "type" in att:
att["purpose"] = att.pop("type")


def normalize_legacy_fields(vcon_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Mutate ``vcon_dict`` in place, mapping legacy names to spec names.

Returns the same dict for convenience.
"""
if not isinstance(vcon_dict, dict):
return vcon_dict

for old, new in _TOP_LEVEL_RENAMES.items():
_rename(vcon_dict, old, new)

for entry in vcon_dict.get("dialog", []) or []:
_normalize_entry(entry)

for entry in vcon_dict.get("analysis", []) or []:
_normalize_entry(entry)

attachments: List[Dict[str, Any]] = vcon_dict.get("attachments", []) or []
for att in attachments:
_normalize_attachment(att)

return vcon_dict
61 changes: 57 additions & 4 deletions common/lib/vcon_redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional
import json
from typing import Any, Optional
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from lib.vcon_compat import normalize_legacy_fields
from redis.commands.json.path import Path
from redis_mgr import redis
from settings import VCON_REDIS_EXPIRY
Expand All @@ -22,6 +24,48 @@ class VconRedis:

DEFAULT_TTL = VCON_REDIS_EXPIRY

@staticmethod
def _stringify_json_body(entry: Any) -> None:
"""Force ``body`` to be a string + correct ``encoding`` per speckit.

The speckit non-negotiable: analysis/attachment bodies are
strings. JSON content pairs ``body: json.dumps(...)`` with
``encoding: "json"``. vcon-lib's ``add_analysis`` currently
emits dict/list bodies with ``encoding: "none"``, which we
normalize on the way out so storage is spec-correct.
"""
if not isinstance(entry, dict):
return
body = entry.get("body")
if isinstance(body, (dict, list)):
entry["body"] = json.dumps(body)
entry["encoding"] = "json"

@classmethod
def _enforce_spec_on_write(cls, vcon_dict: dict) -> dict:
"""Ensure a vCon dict is spec-compliant before persistence.

vcon-lib 0.9.2 produces spec-correct output from ``build_new()``
but a ``Vcon(legacy_dict)`` round-trip can still surface empty
``group``/``redacted`` defaults or a missing top-level syntax
param. Normalize defensively here so storage is always clean.
"""
# draft-ietf-vcon-vcon-core-02 §4.1.1 — syntax param.
if not vcon_dict.get("vcon"):
vcon_dict["vcon"] = "0.4.0"
# speckit: ``group`` is reserved and must not be emitted empty.
if vcon_dict.get("group") == []:
vcon_dict.pop("group", None)
# speckit: empty ``redacted: {}`` should be omitted.
if vcon_dict.get("redacted") == {}:
vcon_dict.pop("redacted", None)
# speckit: analysis/attachment bodies are strings.
for entry in vcon_dict.get("analysis", []) or []:
cls._stringify_json_body(entry)
for entry in vcon_dict.get("attachments", []) or []:
cls._stringify_json_body(entry)
return vcon_dict

def store_vcon(self, vCon: vcon.Vcon, ttl: Optional[int] = None) -> None:
"""Stores the vcon into redis with optional TTL.

Expand All @@ -31,7 +75,7 @@ def store_vcon(self, vCon: vcon.Vcon, ttl: Optional[int] = None) -> None:
Use DEFAULT_TTL for the configured default expiry.
"""
key = f"vcon:{vCon.uuid}"
cleanvCon = vCon.to_dict()
cleanvCon = self._enforce_spec_on_write(vCon.to_dict())
redis.json().set(key, Path.root_path(), cleanvCon)
if ttl is not None:
redis.expire(key, ttl)
Expand All @@ -52,6 +96,10 @@ def get_vcon(self, vcon_id: str) -> Optional[vcon.Vcon]:
if not vcon_dict:
increment_counter("conserver.lib.vcon_redis.get_vcon_not_found")
return None
# Tolerate legacy field names from older writers so links always
# see spec-compliant data; spec-correct writes are produced via
# vcon-lib 0.9.2 and the wrapper helpers.
normalize_legacy_fields(vcon_dict)
_vcon = vcon.Vcon(vcon_dict)
return _vcon

Expand All @@ -64,6 +112,7 @@ def store_vcon_dict(self, vcon_dict: dict, ttl: Optional[int] = None) -> None:
Use DEFAULT_TTL for the configured default expiry.
"""
key = f"vcon:{vcon_dict['uuid']}"
self._enforce_spec_on_write(vcon_dict)
redis.json().set(key, Path.root_path(), vcon_dict)
if ttl is not None:
redis.expire(key, ttl)
Expand All @@ -78,9 +127,12 @@ def get_vcon_dict(self, vcon_id: str) -> Optional[dict]:
Returns:
Optional[dict]: The vCon as a dictionary, or None if not found.
"""
return redis.json().get(
vcon_dict = redis.json().get(
f"vcon:{vcon_id}", Path.root_path()
)
if vcon_dict:
normalize_legacy_fields(vcon_dict)
return vcon_dict

def set_expiry(self, vcon_id: str, ttl: int) -> bool:
"""Sets or updates the TTL on an existing vCon.
Expand Down Expand Up @@ -142,7 +194,7 @@ async def store_vcon_async(
Use DEFAULT_TTL for the configured default expiry.
"""
key = f"vcon:{vCon.uuid}"
cleanvCon = vCon.to_dict()
cleanvCon = self._enforce_spec_on_write(vCon.to_dict())
await redis_async.json().set(key, "$", cleanvCon)
if ttl is not None:
await redis_async.expire(key, ttl)
Expand All @@ -163,6 +215,7 @@ async def store_vcon_dict_async(
Use DEFAULT_TTL for the configured default expiry.
"""
key = f"vcon:{vcon_dict['uuid']}"
self._enforce_spec_on_write(vcon_dict)
await redis_async.json().set(key, "$", vcon_dict)
if ttl is not None:
await redis_async.expire(key, ttl)
Expand Down
Loading
Loading