Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dana/common/llm/providers/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,22 @@ async def chat(self, messages: list[LLMMessage], tools: list[dict] | None = None

except anthropic.APITimeoutError as e:
raise LLMTimeoutError(f"Anthropic API timeout: {e}") from e
except anthropic.BadRequestError as e:
# Map Anthropic prompt-too-long to typed PromptTooLongError so the
# caller-layer (llm_caller.py) can trigger reactive_compact + retry.
from dana.common.llm.types import PromptTooLongError

err_body: dict = {}
try:
err_body = (e.response.json() or {}).get("error", {}) if hasattr(e, "response") else {}
except Exception:
err_body = {}
err_type = err_body.get("type")
err_msg = err_body.get("message", "") or str(e)
if err_type == "invalid_request_error" and "prompt is too long" in err_msg.lower():
raise PromptTooLongError(f"Anthropic: {err_msg}") from e
logger.error("Anthropic API error", error=str(e))
raise
except Exception as e:
logger.error("Anthropic API error", error=str(e))
raise
Expand Down
11 changes: 11 additions & 0 deletions dana/common/llm/providers/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ async def chat(self, messages: list[LLMMessage], tools: list | None = None, **kw
if fr:
finish_reason = str(fr)

# Gemini silently truncates over-budget prompts via MAX_TOKENS finish
# reason — there is no SDK PTL error. Best-effort WARNING log only;
# reactive_compact cannot run here (indistinguishable from output-limit
# truncation). Ops must tune DANA_COMPACT_TRIGGER_TOKENS conservatively.
if finish_reason and "MAX_TOKENS" in finish_reason:
logger.warning(
"gemini_max_tokens_finish",
model=self.model,
note="may indicate context-window overflow; tune DANA_COMPACT_TRIGGER_TOKENS",
)

return LLMResponse(
content=content,
model=self.model,
Expand Down
21 changes: 21 additions & 0 deletions dana/common/llm/providers/openai_compatible_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,27 @@ async def chat(self, messages: list[LLMMessage], tools: list[dict] | None = None
)
raise
except Exception as e:
# Map OpenAI-compat context_length_exceeded to typed PromptTooLongError.
# Covers OpenAI, Azure, Moonshot uniformly.
try:
import openai as _openai

if isinstance(e, _openai.APIStatusError):
from dana.common.llm.types import PromptTooLongError

err_body: dict = {}
try:
resp = getattr(e, "response", None)
if resp is not None and hasattr(resp, "json"):
err_body = (resp.json() or {}).get("error", {}) or {}
except Exception:
err_body = {}
err_code = err_body.get("code") or getattr(e, "code", "") or ""
err_msg = err_body.get("message") or str(e)
if err_code == "context_length_exceeded":
raise PromptTooLongError(f"OpenAI-compat: {err_msg}") from e
except ImportError:
pass
logger.error("OpenAI-compatible API error", error=str(e), error_type=type(e).__name__, exc_info=True)
raise

Expand Down
23 changes: 23 additions & 0 deletions dana/common/llm/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,29 @@ class ProviderError(LLMError):
pass


class PromptTooLongError(ProviderError):
"""Raised when a provider signals the prompt exceeds its context window.

Providers map their native token-limit error to this type (Anthropic
`BadRequestError` with "prompt is too long"; OpenAI-compat
`APIStatusError` with `error.code='context_length_exceeded'`). Stays
OUT of LLMCaller `_TRANSIENT_KEYWORDS` so failover never retries it —
the caller-layer catches, calls `timeline.reactive_compact`, and retries.
"""

pass


class CompactCircuitOpenError(LLMError):
"""Raised when `reactive_compact` exhausts its retry budget.

Marks a session's compaction subsystem as temporarily disabled;
recovery via time-based cooldown + half-open probe.
"""

pass


class LLMTimeoutError(ProviderError):
"""Exception raised when an LLM API call times out.

Expand Down
109 changes: 101 additions & 8 deletions dana/core/agent/star_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dana.common.config import config_manager
from dana.common.llm import LLM
from dana.common.llm.types import LLMMessage
from dana.common.observable import observable
from dana.common.protocols import AgentProtocol, DictParams, Notifiable, ResourceProtocol, WorkflowProtocol
from dana.common.protocols.types import LearningPhase
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(
enable_assistant: bool = True,
identity_override: str | None = None,
compress_timeline: bool = True,
compress_trigger_tokens: int | None = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -160,13 +162,25 @@ def __init__(

# Determine storage_config for timeline and event_log

# Initialize timeline: use CompressedTimeline by default unless explicitly injected
# compress_timeline=False disables LLM-based compression (behaves like plain Timeline)
# Initialize timeline: use CompressedTimeline by default unless explicitly injected.
# compress_timeline=False disables LLM-based compression (behaves like plain Timeline).
# system/tools callbacks fold system-prompt + tools-schema size into needs_compression()
# estimate. Both use the existing len(str)//4 heuristic.
#
# Two independent knobs are threaded here:
# - max_context_tokens → LLM context-window BUDGET for to_llm_messages()
# - compress_trigger_tokens → compression TRIGGER (None → DANA_COMPACT_TRIGGER_TOKENS
# env var wins, so ops can retune without code changes).
# Historically these were aliased to the same value; the split lets ops set
# the trigger via env while agent authors still pick an appropriate context budget.
self._timeline = CompressedTimeline(
max_tokens_until_compression=max_context_tokens,
max_context_tokens=max_context_tokens,
max_tokens_until_compression=compress_trigger_tokens,
agent=self,
repository_factory=self._repository_factory,
compression_enabled=compress_timeline,
system_tokens_fn=self._estimate_system_prompt_tokens,
tools_tokens_fn=self._estimate_tools_tokens,
)

# Initialize EventLog API (only if observer AND codec provided)
Expand All @@ -183,6 +197,19 @@ def __init__(
# No observer or codec = no EventLog (events only come from Observer)
self._event_log = None

# CRITICAL-2 companion — auto-wire a resource that lets the LLM read
# back tool_results that were dumped to disk at ingest time. Opt out
# via env (used in tests that don't need a filesystem repository).
import os as _os

if _os.getenv("DANA_DISABLE_TOOL_RESULT_DUMP_RESOURCE") != "1":
try:
from dana.core.resource.tool_result_dump_resource import ToolResultDumpResource

self.with_resources(ToolResultDumpResource(agent=self, auto_register=False))
except Exception as _e: # pragma: no cover — don't block agent boot on resource wiring
logger.warning("tool_result_dump_resource_wire_failed", error=str(_e))

if enable_web_search:
try:
from dana.core.resource.simple_search import SimpleWebSearch
Expand Down Expand Up @@ -474,6 +501,31 @@ def magic_method(*args, **kwargs):
# TIMELINE COMPRESSION
# ============================================================================

def _estimate_system_prompt_tokens(self) -> int:
"""Return char/4 estimate of system prompt size for compression trigger."""
try:
prompt = self.system_prompt
except Exception:
return 0
if not prompt:
return 0
return len(str(prompt)) // 4

def _estimate_tools_tokens(self) -> int:
"""Return char/4 estimate of tools-schema size for compression trigger."""
try:
tools = None
runtime = getattr(self, "_runtime", None)
if runtime is not None and hasattr(runtime, "get_tools"):
tools = runtime.get_tools(self)
if not tools:
return 0
import json as _json

return len(_json.dumps(tools, default=str)) // 4
except Exception:
return 0

def _maybe_compress_timeline(self, timeline: Timeline) -> None:
"""
Compress timeline if it exceeds the configured threshold.
Expand Down Expand Up @@ -513,6 +565,12 @@ def _maybe_compress_timeline(self, timeline: Timeline) -> None:
summary_length=len(summary),
)
except Exception as e:
# PTL must propagate so the caller-layer (llm_caller.py) can trigger
# reactive_compact + retry — don't let this summary path swallow it.
from dana.common.llm.types import PromptTooLongError

if isinstance(e, PromptTooLongError):
raise
# Don't fail the main operation if compression fails
logger.warning("Timeline compression failed", error=str(e))

Expand Down Expand Up @@ -549,6 +607,10 @@ async def _maybe_compress_timeline_async(self, timeline: Timeline) -> None:
summary_length=len(summary),
)
except Exception as e:
from dana.common.llm.types import PromptTooLongError

if isinstance(e, PromptTooLongError):
raise
logger.warning("Timeline compression failed", error=str(e))

def _extract_compression_summary(self, content: str) -> str:
Expand Down Expand Up @@ -813,6 +875,19 @@ def _record_tool_results(self, tool_results: list) -> None:

result_content = json.dumps(result_content)

# CRITICAL-2: dump oversized content to a session-scoped file
# and leave a compact marker in the timeline. Prevents the
# "huge recent tool_result is unreclaimable" wedge where
# reactive_compact can't shed the offending entry because it's
# within the keep-recent window.
from dana.core.agent.tool_result_dump import maybe_dump_oversized_content, resolve_session_folder_for_agent

result_content = maybe_dump_oversized_content(
result_content,
tool_result.get("tool_call_id"),
resolve_session_folder_for_agent(self),
)

self._timeline.add_entry(
TimelineEntry(
entry_type=entry_type,
Expand Down Expand Up @@ -880,12 +955,19 @@ def _think(self, trace_percepts: DictParams) -> DictParams:
trace_percepts.pop("timeline", None)

self._maybe_compress_timeline(timeline)
llm_messages = self._runtime.build_prompt(self, timeline)

# Factory used by LLMCaller's PTL retry loop to rebuild messages after
# each reactive_compact so the retry observes the compacted timeline
# (CRITICAL-1 fix). Closes over self + timeline, not the messages list.
def _rebuild_llm_messages() -> list[LLMMessage]:
return self._runtime.build_prompt(self, timeline)

llm_messages = _rebuild_llm_messages()

response, reasoning, tool_calls, done, todo_list = None, None, [], None, None
output_state = "retry"
for attempt in range(self.MAX_THINK_RETRIES):
raw = self._runtime.call_llm(llm_messages)
raw = self._runtime.call_llm(llm_messages, messages_fn=_rebuild_llm_messages)
parsed = self._runtime.parse_response(raw)
response, reasoning, tool_calls, done, todo_list = (
parsed.response,
Expand Down Expand Up @@ -1022,17 +1104,28 @@ async def _think_async(self, trace_percepts: DictParams) -> DictParams:
trace_percepts.pop("timeline", None)

await self._maybe_compress_timeline_async(timeline)
llm_messages = self._runtime.build_prompt(self, timeline)

# Factory used by LLMCaller's PTL retry loop to rebuild messages after
# each reactive_compact so the retry observes the compacted timeline
# (CRITICAL-1 fix).
def _rebuild_llm_messages_async() -> list[LLMMessage]:
return self._runtime.build_prompt(self, timeline)

llm_messages = _rebuild_llm_messages_async()

response, reasoning, tool_calls, done, todo_list = None, None, [], None, None
output_state = "retry"
for attempt in range(self.MAX_THINK_RETRIES):
if hasattr(self._runtime, "call_llm_async"):
raw = await self._runtime.call_llm_async(llm_messages)
raw = await self._runtime.call_llm_async(llm_messages, messages_fn=_rebuild_llm_messages_async)
else:
import asyncio

raw = await asyncio.to_thread(self._runtime.call_llm, llm_messages)
raw = await asyncio.to_thread(
self._runtime.call_llm,
llm_messages,
messages_fn=_rebuild_llm_messages_async,
)
parsed = self._runtime.parse_response(raw)
response, reasoning, tool_calls, done, todo_list = (
parsed.response,
Expand Down
Loading
Loading