Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions temporalio/contrib/langsmith/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, ClassVar, NoReturn, Protocol

import langsmith
import langsmith.utils
import nexusrpc.handler
from langsmith import tracing_context
from langsmith.run_helpers import get_current_run_tree
Expand Down Expand Up @@ -43,6 +44,7 @@
}
)


# ---------------------------------------------------------------------------
# Context helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -457,14 +459,24 @@ def _maybe_run(
) -> Iterator[None]:
"""Create a LangSmith run, handling errors.

- If add_temporal_runs is False, yields None (no run created).
- If add_temporal_runs is False **or** ``langsmith.utils.tracing_is_enabled()``
returns False, yields None (no run created).
Context propagation is handled unconditionally by callers.
- When a run IS created, uses :class:`_ReplaySafeRunTree` for
replay and event loop safety, then sets it as ambient context via
``tracing_context(parent=run_tree)`` so ``get_current_run_tree()``
returns it and ``_inject_current_context()`` can inject it.
- On exception: marks run as errored (unless benign ApplicationError), re-raises.

Note on ``tracing_is_enabled()`` and cross-process traces:
``tracing_is_enabled()`` checks for an active run tree in context
*before* consulting the ``LANGSMITH_TRACING`` env var (langsmith
semantics). If a parent run is propagated into this worker via
headers from an upstream tracer, tracing continues regardless of
``LANGSMITH_TRACING=false``. This matches langsmith's "continue
mid-trace" model: the env var suppresses *new* local traces but
does not break an inbound parent trace.

Args:
client: LangSmith client instance.
name: Display name for the run.
Expand All @@ -477,7 +489,7 @@ def _maybe_run(
project_name: LangSmith project name override.
executor: ThreadPoolExecutor for background I/O.
"""
if not add_temporal_runs:
if not add_temporal_runs or not langsmith.utils.tracing_is_enabled():
yield None
return

Expand Down Expand Up @@ -717,12 +729,8 @@ async def execute_activity(
"temporalRunID": info.workflow_run_id or "",
"temporalActivityID": info.activity_id or "",
}
# Unconditionally set tracing context so @traceable functions inside
# activities inherit the plugin's client and parent, regardless of
# the add_temporal_runs toggle.
tracing_args: dict[str, Any] = {
"client": self._config._client,
"enabled": True,
"project_name": self._config._project_name,
"parent": parent,
}
Expand Down Expand Up @@ -786,7 +794,6 @@ def _workflow_maybe_run(
)
tracing_args: dict[str, Any] = {
"client": self._config._client,
"enabled": True,
"project_name": self._config._project_name,
"parent": tracing_parent,
}
Expand Down Expand Up @@ -947,7 +954,6 @@ async def execute_nexus_operation_start(
)
tracing_args: dict[str, Any] = {
"client": self._config._client,
"enabled": True,
"project_name": self._config._project_name,
"parent": parent,
}
Expand All @@ -967,7 +973,6 @@ async def execute_nexus_operation_cancel(
)
tracing_args: dict[str, Any] = {
"client": self._config._client,
"enabled": True,
"project_name": self._config._project_name,
"parent": parent,
}
Expand Down
31 changes: 31 additions & 0 deletions tests/contrib/langsmith/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,37 @@
from typing import Any
from unittest.mock import MagicMock

import pytest


@pytest.fixture(autouse=True)
def _clear_langsmith_env_cache() -> Any: # pyright: ignore[reportUnusedFunction]
"""Clear langsmith's lru_cache before and after each test.

Tests manipulate LANGSMITH_TRACING / LANGCHAIN_TRACING_V2 env vars.
langsmith.utils.get_env_var caches results, so stale values would
leak across tests (or into other test modules in the same session).
"""
import langsmith.utils

langsmith.utils.get_env_var.cache_clear() # type: ignore[attr-defined]
yield
langsmith.utils.get_env_var.cache_clear() # type: ignore[attr-defined]


@pytest.fixture(autouse=True)
def _enable_langsmith_tracing(monkeypatch: pytest.MonkeyPatch) -> None: # pyright: ignore[reportUnusedFunction]
"""Enable LangSmith tracing by default for all tests in this directory.

The plugin defers to ``langsmith.utils.tracing_is_enabled()``, which
requires ``LANGSMITH_TRACING=true`` (or equivalent). Without this
fixture, tests that expect runs would see zero.

Individual tests can override with ``monkeypatch.setenv("LANGSMITH_TRACING", "false")``
to verify disabled behavior.
"""
monkeypatch.setenv("LANGSMITH_TRACING", "true")


@dataclass
class _RunRecord:
Expand Down
5 changes: 2 additions & 3 deletions tests/contrib/langsmith/test_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,6 @@ async def test_false_still_propagates_context(
# (unconditionally, before _maybe_run)
mock_tracing_ctx.assert_called_once_with(
client=config._client,
enabled=True,
project_name=None,
parent=mock_extracted_parent,
)
Expand Down Expand Up @@ -1143,8 +1142,8 @@ async def test_false_activity_no_parent_no_context(
await act_interceptor.execute_activity(mock_act_input)

MockRunTree.assert_not_called()
# tracing_context called with client and enabled (no parent)
# tracing_context called with client (no parent)
mock_tracing_ctx.assert_called_once_with(
client=config._client, enabled=True, project_name=None, parent=None
client=config._client, project_name=None, parent=None
)
mock_act_next.execute_activity.assert_called_once()
243 changes: 243 additions & 0 deletions tests/contrib/langsmith/test_tracing_env_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
"""Tests that LangSmithPlugin defers to ``langsmith.utils.tracing_is_enabled()``.

Tracing requires the env to explicitly say so (``LANGSMITH_TRACING=true`` etc);
it is off when the env is unset or set to ``false``. Tests verify that
``LANGSMITH_TRACING=false`` produces zero runs and ``LANGSMITH_TRACING=true``
produces runs.
"""

from __future__ import annotations

import uuid
from datetime import timedelta

import pytest
from langsmith import traceable

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from tests.contrib.langsmith.test_integration import (
DirectTraceableNexusService,
NexusDirectTraceableWorkflow,
_make_client_and_collector,
)
from tests.helpers import new_worker
from tests.helpers.nexus import make_nexus_endpoint_name

# ---------------------------------------------------------------------------
# Sample workflow / activity
# ---------------------------------------------------------------------------


@traceable(name="inner_call")
async def _inner_call(prompt: str) -> str:
return f"response to: {prompt}"


@traceable
@activity.defn
async def env_override_activity() -> str:
result = await _inner_call("hello")
return result


@workflow.defn
class EnvOverrideWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
env_override_activity,
start_to_close_timeout=timedelta(seconds=10),
)


# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------


class TestTracingEnvOverride:
"""LangSmithPlugin must respect LANGSMITH_TRACING=false."""

async def test_no_runs_when_tracing_disabled_with_temporal_runs(
self,
client: Client,
env: WorkflowEnvironment, # type:ignore[reportUnusedParameter]
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""With LANGSMITH_TRACING=false and add_temporal_runs=True, no runs."""
monkeypatch.setenv("LANGSMITH_TRACING", "false")
monkeypatch.delenv("LANGCHAIN_TRACING_V2", raising=False)

temporal_client, collector, _ = _make_client_and_collector(
client, add_temporal_runs=True
)

async with new_worker(
temporal_client,
EnvOverrideWorkflow,
activities=[env_override_activity],
max_cached_workflows=0,
) as worker:
handle = await temporal_client.start_workflow(
EnvOverrideWorkflow.run,
id=f"env-override-temporal-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
result = await handle.result()

assert result == "response to: hello"
assert len(collector.runs) == 0, (
f"Expected zero LangSmith runs when LANGSMITH_TRACING=false, "
f"but got {len(collector.runs)}: "
f"{[r.name for r in collector.runs]}"
)

async def test_no_runs_when_tracing_disabled_without_temporal_runs(
self,
client: Client,
env: WorkflowEnvironment, # type:ignore[reportUnusedParameter]
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""With LANGSMITH_TRACING=false and add_temporal_runs=False, no runs."""
monkeypatch.setenv("LANGSMITH_TRACING", "false")
monkeypatch.delenv("LANGCHAIN_TRACING_V2", raising=False)

temporal_client, collector, _ = _make_client_and_collector(
client, add_temporal_runs=False
)

async with new_worker(
temporal_client,
EnvOverrideWorkflow,
activities=[env_override_activity],
max_cached_workflows=0,
) as worker:
handle = await temporal_client.start_workflow(
EnvOverrideWorkflow.run,
id=f"env-override-no-temporal-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
result = await handle.result()

assert result == "response to: hello"
assert len(collector.runs) == 0, (
f"Expected zero LangSmith runs when LANGSMITH_TRACING=false, "
f"but got {len(collector.runs)}: "
f"{[r.name for r in collector.runs]}"
)

async def test_no_runs_when_langchain_tracing_v2_disabled(
self,
client: Client,
env: WorkflowEnvironment, # type:ignore[reportUnusedParameter]
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""LANGCHAIN_TRACING_V2=false also suppresses runs (legacy env var)."""
monkeypatch.setenv("LANGCHAIN_TRACING_V2", "false")
monkeypatch.delenv("LANGSMITH_TRACING", raising=False)

temporal_client, collector, _ = _make_client_and_collector(
client, add_temporal_runs=True
)

async with new_worker(
temporal_client,
EnvOverrideWorkflow,
activities=[env_override_activity],
max_cached_workflows=0,
) as worker:
handle = await temporal_client.start_workflow(
EnvOverrideWorkflow.run,
id=f"env-override-v2-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
result = await handle.result()

assert result == "response to: hello"
assert len(collector.runs) == 0, (
f"Expected zero LangSmith runs when LANGCHAIN_TRACING_V2=false, "
f"but got {len(collector.runs)}: "
f"{[r.name for r in collector.runs]}"
)

async def test_no_runs_when_tracing_disabled_for_nexus_start(
self,
client: Client,
env: WorkflowEnvironment,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""With LANGSMITH_TRACING=false, nexus start handler emits no runs."""
if env.supports_time_skipping:
pytest.skip("Time-skipping server doesn't persist headers.")

monkeypatch.setenv("LANGSMITH_TRACING", "false")
monkeypatch.delenv("LANGCHAIN_TRACING_V2", raising=False)

temporal_client, collector, _ = _make_client_and_collector(
client, add_temporal_runs=True
)

task_queue = f"env-override-nexus-{uuid.uuid4()}"
async with new_worker(
temporal_client,
NexusDirectTraceableWorkflow,
nexus_service_handlers=[DirectTraceableNexusService()],
task_queue=task_queue,
max_cached_workflows=0,
) as worker:
await env.create_nexus_endpoint(
make_nexus_endpoint_name(worker.task_queue),
worker.task_queue,
)
handle = await temporal_client.start_workflow(
NexusDirectTraceableWorkflow.run,
id=f"env-override-nexus-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
result = await handle.result()

assert result == "response to: nexus-input"
assert len(collector.runs) == 0, (
f"Expected zero LangSmith runs when LANGSMITH_TRACING=false "
f"(nexus start path), but got {len(collector.runs)}: "
f"{[r.name for r in collector.runs]}"
)

# NOTE: test_no_runs_when_tracing_disabled_for_nexus_cancel is not
# included — cancelling an in-flight nexus operation requires non-trivial
# orchestration (long-running handler + external cancel signal). Flagged
# for a follow-up ticket.

async def test_runs_emitted_when_tracing_enabled(
self,
client: Client,
env: WorkflowEnvironment, # type:ignore[reportUnusedParameter]
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Positive control: with LANGSMITH_TRACING=true, runs ARE emitted."""
monkeypatch.setenv("LANGSMITH_TRACING", "true")
monkeypatch.delenv("LANGCHAIN_TRACING_V2", raising=False)

temporal_client, collector, _ = _make_client_and_collector(
client, add_temporal_runs=True
)

async with new_worker(
temporal_client,
EnvOverrideWorkflow,
activities=[env_override_activity],
max_cached_workflows=0,
) as worker:
handle = await temporal_client.start_workflow(
EnvOverrideWorkflow.run,
id=f"env-enabled-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
result = await handle.result()

assert result == "response to: hello"
assert (
len(collector.runs) > 0
), "Expected LangSmith runs when LANGSMITH_TRACING=true, but got none"
Loading