Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from starlette.responses import Response

from version import get_version_info, get_version_string, get_version, get_git_commit
import vcon_hook

# OpenTelemetry trace context extraction is now in lib.context_utils
from settings import (
Expand Down Expand Up @@ -778,6 +779,11 @@ async def post_vcon(
await store_context_async(redis_async, ingress_list, vcon_uuid_str, context)
await redis_async.rpush(ingress_list, vcon_uuid_str)

try:
vcon_hook.on_vcon_created(str(inbound_vcon.uuid), dict_vcon, ingress_lists)
except Exception:
pass

return JSONResponse(content=dict_vcon, status_code=201)

except Exception as e:
Expand Down Expand Up @@ -875,6 +881,11 @@ async def external_ingress_vcon(
f"Successfully stored vCon {inbound_vcon.uuid} and added to ingress list {ingress_list}"
)

try:
vcon_hook.on_vcon_created(str(inbound_vcon.uuid), dict_vcon, [ingress_list])
except Exception:
pass

return None

except Exception as e:
Expand Down Expand Up @@ -936,6 +947,11 @@ async def delete_vcon(vcon_uuid: UUID) -> None:
else:
logger.info(f"vCon {vcon_uuid} deletion completed")

try:
vcon_hook.on_vcon_deleted(str(vcon_uuid))
except Exception:
pass


# Ingress and egress endpoints for vCon IDs
# Create an endpoint to push vcon IDs to one or more redis lists
Expand Down
20 changes: 20 additions & 0 deletions api/vcon_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""vCon lifecycle hook — no-op default implementation.

Called when vCons are created or deleted via the REST API.
Replace this file at Docker build time with any custom implementation
(e.g. audit logging, metrics, notifications).
"""

from typing import Dict, List, Optional


def on_vcon_created(
vcon_id: str,
vcon_dict: Dict,
ingress_lists: Optional[List[str]],
) -> None:
pass


def on_vcon_deleted(vcon_id: str) -> None:
pass
21 changes: 21 additions & 0 deletions conserver/after_link_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""after_link hook — no-op default implementation.

Called after every link in the chain completes (success or error).
Replace this file at Docker build time with any custom implementation
(e.g. audit logging, metrics, notifications).
"""

from typing import Any, Dict, List, Optional


def after_link(
vcon_id: str,
link_name: str,
link_module: Any,
link_opts: Optional[Dict],
link_hook_config: Optional[Dict],
status: str,
error: Optional[Exception],
parties: Optional[List[str]],
) -> None:
pass
9 changes: 9 additions & 0 deletions conserver/links/analyze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "transcript_text",
"model_key": "model",
"transformation": "Added AI analysis to transcript",
"safe_opts_keys": ["analysis_type", "model"],
}

default_options = {
"prompt": "",
"analysis_type": "summary",
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/analyze_and_label/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "transcript_text",
"model_key": "model",
"transformation": "Added AI labels to transcript",
"safe_opts_keys": ["analysis_type", "model"],
}

default_options = {
"prompt": "Analyze this transcript and provide a list of relevant labels for categorization. Return your response as a JSON object with a single key 'labels' containing an array of strings.",
"analysis_type": "labeled_analysis",
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/analyze_vcon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "vcon_json",
"model_key": "model",
"transformation": "Analyzed full vCon with AI",
"safe_opts_keys": ["analysis_type", "model"],
}

default_options = {
"prompt": "Analyze this vCon and return a JSON object with your analysis.",
"analysis_type": "json_analysis",
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/check_and_tag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "transcript_text",
"model_key": "model",
"transformation": "Evaluated and tagged transcript with AI",
"safe_opts_keys": ["analysis_type", "model"],
}

default_options = {
"analysis_type": "tag_evaluation",
"model": "gpt-5",
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/deepgram_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
# Set up a module-level logger
logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "Deepgram",
"policy_url": "https://deepgram.com/privacy",
"data_type": "audio_recording",
"model_key": "model",
"transformation": "Transcribed audio to text",
"safe_opts_keys": ["model", "minimum_duration"],
}

# Default options for Deepgram transcription link
# - minimum_duration: minimum length (in seconds) for a dialog to be considered for transcription
# - DEEPGRAM_KEY: API key for Deepgram (when not using LiteLLM proxy)
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/detect_engagement/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
import os
logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "transcript_text",
"model_key": "model",
"transformation": "Detected agent/customer engagement from transcript",
"safe_opts_keys": ["model"],
}

default_options = {
"prompt": "Did both the customer and the agent speak? Respond with 'true' if yes, 'false' if not. Respond with only 'true' or 'false'.",
"analysis_type": "engagement_analysis",
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/expire_vcon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "internal",
"policy_url": None,
"data_type": "vcon_metadata",
"transformation": "Scheduled vCon for expiry from cache",
"transformation_opts_key": "seconds",
"safe_opts_keys": ["seconds"],
}

default_options = {"seconds": 60 * 60 * 24}


Expand Down
9 changes: 9 additions & 0 deletions conserver/links/groq_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ def __init__(self, *args, **kwargs):
init_error_tracker()
logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "Groq",
"policy_url": "https://groq.com/privacy-policy",
"data_type": "audio_recording",
"model": "whisper-large-v3-turbo",
"transformation": "Transcribed audio to text",
"safe_opts_keys": ["minimum_duration"],
}

# Default configuration for the Whisper service
default_options = {
"minimum_duration": 30, # Minimum duration in seconds for audio to be transcribed
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/hugging_face_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
init_error_tracker()
logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "Hugging Face",
"policy_url": "https://huggingface.co/privacy",
"data_type": "audio_recording",
"model_key": "api_url",
"transformation": "Transcribed audio to text",
"safe_opts_keys": ["minimum_duration"],
}

# Default configuration for the Whisper service
default_options = {
"minimum_duration": 30, # Minimum duration in seconds for audio to be transcribed
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/hugging_llm_link/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from typing import Dict, Any, Optional
from transformers import pipeline

AUDIT_META = {
"third_party_service": "Hugging Face",
"policy_url": "https://huggingface.co/privacy",
"data_type": "transcript_text",
"model_key": "model_name",
"transformation": "Analyzed conversation with Hugging Face LLM",
"safe_opts_keys": ["model_name"],
}


class HuggingLLMLink:
"""A vCon link for analyzing conversations using HuggingFace models."""
Expand Down
9 changes: 9 additions & 0 deletions conserver/links/openai_transcribe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
# Set up a module-level logger
logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "OpenAI",
"policy_url": "https://openai.com/policies/data-processing-addendum",
"data_type": "audio_recording",
"model_key": "model",
"transformation": "Transcribed audio to text",
"safe_opts_keys": ["model"],
}

# Default options for Deepgram transcription link
# - minimum_duration: minimum length (in seconds) for a dialog to be considered for transcription
# - DEEPGRAM_KEY: API key for Deepgram
Expand Down
8 changes: 8 additions & 0 deletions conserver/links/scitt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "SCITT",
"policy_url": None,
"data_type": "vcon_hash",
"transformation": "Anchored vCon hash to SCITT transparency log",
"safe_opts_keys": ["scrapi_url", "issuer"],
}

# Increment for any API/attribute changes
link_version = "0.3.0"

Expand Down
26 changes: 26 additions & 0 deletions conserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from version import get_version_string, get_version_info
from dlq_utils import get_ingress_list_dlq_name
import hook
import after_link_hook
from lib.vcon_redis import VconRedis
from settings import VCON_DLQ_EXPIRY
from lib.context_utils import retrieve_context, store_context_sync, extract_otel_trace_context
from lib.tracing import init_tracing
Expand Down Expand Up @@ -510,11 +512,29 @@ def _process_link(self, links: list[str], link_index: int) -> bool:
module = imported_modules[module_name]
options = link.get("options")

# Extract parties for the after_link hook (tel + mailto from vCon parties array).
parties = []
try:
_vcon = VconRedis().get_vcon(self.vcon_id)
for party in (_vcon.parties or []) if _vcon else []:
tel = party.get("tel") if isinstance(party, dict) else getattr(party, "tel", None)
mailto = party.get("mailto") if isinstance(party, dict) else getattr(party, "mailto", None)
if tel:
parties.append(tel)
if mailto:
parties.append(mailto)
except Exception:
pass

link_hook_config = (options or {}).get("after_link", {})
try:
if link_index == 0:
self._process_tracers(self.vcon_id, self.vcon_id, links, -1)
started = time.time()
should_continue_chain = module.run(self.vcon_id, link_name, options)
after_link_hook.after_link(
self.vcon_id, link_name, module, options, link_hook_config, "success", None, parties
)
link_processing_time = round(time.time() - started, 3)
record_histogram(
"conserver.link.execution_time",
Expand Down Expand Up @@ -543,6 +563,12 @@ def _process_link(self, links: list[str], link_index: int) -> bool:
self._process_tracers(self.vcon_id, self.vcon_id, links, link_index)
return should_continue_chain
except Exception as e:
try:
after_link_hook.after_link(
self.vcon_id, link_name, module, options, link_hook_config, "error", e, parties
)
except Exception:
pass
# Record exception in the span
current_span = trace.get_current_span()
if current_span:
Expand Down
4 changes: 3 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ RUN uv venv --seed /opt/venv && \
ENV PATH="/opt/venv/bin:$PATH"

# Auto-install OTel instrumentation packages for the installed libraries.
RUN opentelemetry-bootstrap -a install
# || true: bootstrap raises on version conflicts between auto-detected packages;
# instrumentation still works for packages that installed cleanly.
RUN opentelemetry-bootstrap -a install || true

COPY . /app

Expand Down
4 changes: 3 additions & 1 deletion docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ RUN uv venv --seed /opt/venv && UV_PROJECT_ENVIRONMENT=/opt/venv uv sync --froze
ENV PATH="/opt/venv/bin:$PATH"

# Auto-install OTel instrumentation packages for the installed libraries.
RUN opentelemetry-bootstrap -a install
# || true: bootstrap raises on version conflicts between auto-detected packages;
# instrumentation still works for packages that installed cleanly.
RUN opentelemetry-bootstrap -a install || true

COPY common/ /app/common/
COPY api/ /app/api/
Expand Down
4 changes: 3 additions & 1 deletion docker/Dockerfile.conserver
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ RUN uv venv --seed /opt/venv && UV_PROJECT_ENVIRONMENT=/opt/venv uv sync --froze
ENV PATH="/opt/venv/bin:$PATH"

# Auto-install OTel instrumentation packages for the installed libraries.
RUN opentelemetry-bootstrap -a install
# || true: bootstrap raises on version conflicts between auto-detected packages;
# instrumentation still works for packages that installed cleanly.
RUN opentelemetry-bootstrap -a install || true

COPY common/ /app/common/
COPY conserver/ /app/conserver/
Expand Down
Loading