Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 264 additions & 28 deletions docs/API_TOOL_CALLING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
## Overview

API Tool Calling enables the LLM module to discover and invoke external API endpoints
in response to user queries. endpoints are
registered, semantically indexed in Qdrant, and retrieved at query time using hybrid search.
in response to user queries. Endpoints are registered, semantically indexed in Qdrant,
and retrieved at query time using hybrid search. Once matched, a multi-turn agentic
loop collects all required parameters from the user before the API call is made.


| component | What it does | Status |
| Component | What it does | Status |
|---|---|---|
| **Indexing pipeline** | Takes an endpoint definition → enriches it with LLM context → stores hybrid vectors in Qdrant | ✅ Complete |
| **Tool classifier** | At query time, routes to the best matching endpoint via hybrid search + LLM disambiguation | ✅ Complete |
| **Workflow executor** | Surfaces the matched endpoint; full agentic loop (param collection → API call) planned | 🔧 Partial (Task 10) |
| **Agentic loop** | Multi-turn parameter collection with session persistence, language-aware clarifying questions, param correction, continuation prompt, and intent-switch detection | ✅ Complete |
| **API caller** | Execute the collected params against the real API endpoint and format the response | 🔧 Planned (next task) |

---

Expand All @@ -35,6 +37,10 @@ APISemanticSearcher (src/tool_classifier/api_semantic_searcher.py)
ToolClassifier._try_api_tool_classification()
↓ ClassificationResult(workflow=API_TOOL_CALLING)
APIToolWorkflowExecutor (src/tool_classifier/workflows/api_tool_workflow.py)
↓ multi-turn param collection
AgenticLoop (src/tool_classifier/agentic_loop.py)
↓ session state
APIToolSessionStore (Redis, keyed by chat_id, 30-min TTL)
```

---
Expand Down Expand Up @@ -380,51 +386,281 @@ Defined in [src/tool_classifier/workflows/api_tool_workflow.py](../src/tool_clas
Handles `WorkflowType.API_TOOL_CALLING` after `ToolClassifier.classify()` has set
`matched_endpoint` in the context dict.

**Current behaviour (Task 4.1):**
**Responsibilities:**

Reads `context["matched_endpoint"]` and returns a simple confirmation response:
- **Turn 1 (new session):** reads `context["matched_endpoint"]`, creates a new
`APIToolSession` in Redis, runs the first agentic loop turn.
- **Turn 2-N (resume):** loads the existing session from Redis, runs the next turn.
- **Fast path:** if the endpoint has no required params, immediately returns the
completed JSON without starting a session.
- **Completion:** when all params are collected, deletes the session and returns a
JSON response with `status=params_collected`.
- **Max turns:** deletes the session and returns `None` to trigger RAG fallback.
- **Streaming:** wraps the short clarifying-question response in a single SSE frame
+ `END` marker.

**Completed response format:**

```json
{
"status": "params_collected",
"endpoint": { "name": "get_public_holidays" },
"collected_params": {
"countryIsoCode": "EE",
"validFrom": "2026-01-01",
"validTo": "2026-12-31"
}
}
```

The actual API call and response formatting are handled by the next planned task.

---

## Part 3 — Agentic Loop (Multi-Turn Parameter Collection)

### Overview

Defined in [src/tool_classifier/agentic_loop.py](../src/tool_classifier/agentic_loop.py).

`AgenticLoop` is **stateless** — it carries no internal state between HTTP requests.
All state is passed in as arguments (loaded from Redis by the workflow executor before
calling `run_turn`) and saved back to Redis inside `run_turn` before returning.

### Session Model: `APIToolSession`

Defined in [src/models/session_models.py](../src/models/session_models.py).

Stored in Redis keyed by `chat_id` with a **30-minute sliding TTL**.

| Field | Type | Description |
|---|---|---|
| `chat_id` | str | Unique conversation identifier |
| `state` | str | Current state (`collecting_params`, etc.) |
| `selected_endpoint` | dict | Full endpoint payload from Qdrant |
| `collected_params` | dict | Parameters collected so far |
| `turn_count` | int | Number of turns elapsed |
| `max_turns` | int | Max turns before fallback (default: 5) |
| `awaiting_continuation` | bool | True when continuation prompt has been shown |
| `detected_language` | str | Language from first message (`en`, `et`, `ru`) — persisted so all clarifying questions use the same language |

### Turn Flow

```
APIToolWorkflowExecutor._run()
├─ Load session from Redis (or create new)
└─ AgenticLoop.run_turn(
user_message, conversation_history,
params_schema, collected_params,
turn_count, max_turns, awaiting_continuation,
session_language
)
├─ AWAITING_CONTINUATION_DECISION?
│ yes → parse yes/no from user_message
│ yes → clear flag, continue collecting
│ no → return MAX_TURNS_REACHED (RAG fallback)
├─ ParamExtractionModule.forward()
│ → DSPy extracts params from user_message + conversation_history
│ → uses session_language for all questions
│ → new values OVERWRITE old (allows corrections)
├─ All required params present? → COMPLETED
├─ turn_count reached CONTINUATION_TURN (default: 3)?
│ → set awaiting_continuation=True
│ → return AWAITING_CONTINUATION_DECISION
│ → question = localized CONTINUATION_QUESTION (EN/ET/RU)
└─ else → generate clarifying question for next missing param
→ return NEEDS_INPUT
└─ Save updated session to Redis
```

### Key Behaviours

**Language persistence:**
The language is detected once from the user's first message and stored in
`APIToolSession.detected_language`. All subsequent clarifying questions and the
continuation prompt are generated in that language, even when follow-up replies
like "yes" or "2026-01-01" are too short to re-detect reliably.

Supported: `en` (default), `et` (Estonian), `ru` (Russian).

**Parameter correction:**
If the user says "No, use Russia instead of Estonia", the extractor overwrites the
previously collected `countryIsoCode` value. There is no guard preventing
re-extraction of already-collected params — new values always win.

**Continuation prompt:**
After `CONTINUATION_TURN` turns without completing, the loop asks the user whether
to continue. If the user says no (or anything not in the yes-list), the session is
abandoned and the request falls back to the RAG workflow.

Localized continuation questions are defined in
[src/tool_classifier/constants.py](../src/tool_classifier/constants.py):
`CONTINUATION_QUESTION`, `CONTINUATION_QUESTION_ET`, `CONTINUATION_QUESTION_RU`.

**History isolation:**
On turn 0 (first turn of a new session), `conversation_history=[]` is passed to the
extractor regardless of what the API sends. This prevents parameter values from a
previous completed session from being re-used for the new request.

**Constants** (in `src/tool_classifier/constants.py`):

| Constant | Value | Description |
|---|---|---|
| `CONTINUATION_TURN` | `3` | Turn at which the continuation prompt is shown |

---

## Part 4 — Session Management & Intent Switch Detection

### `APIToolSessionStore`

Defined in [src/utils/api_tool_session_store.py](../src/utils/api_tool_session_store.py).

Redis-backed store. Key format: `session:{chat_id}`. TTL resets on every `update()`.

Operations: `save()`, `get()`, `update()`, `delete()`.

### Session Lifecycle

```
Turn 1: new query matches API tool endpoint
→ session CREATED (state=collecting_params)
→ clarifying question returned

Turn 2-N: user replies
→ session LOADED → loop runs → session UPDATED

Final turn: all params collected
→ session DELETED
→ completed JSON returned

OR: max turns reached / user says "no" to continuation
→ session DELETED
→ None returned → RAG fallback
```
**{name}**: {description}

URL: {url}
### Intent Switch Detection

Defined in `ToolClassifier.classify()` — the session-resume short-circuit block.

Before resuming an active session, the classifier runs `_try_api_tool_classification()`
on the new message. If it matches a **different** endpoint with sufficient confidence,
the old session is abandoned and the new query starts fresh:

```python
new_api_match = await self._try_api_tool_classification(query, request)
if (
new_api_match is not None
and new_api_match.metadata["matched_endpoint"]["name"] != endpoint_name
):
await session_store.delete(request.chatId)
return new_api_match # start new session for different endpoint
```

**Planned (Task 10):** Full agentic loop —
session management → parameter collection dialog → external API call → response formatting.
### Test Endpoint Behaviour (`/orchestrate/test`)

The test endpoint hardcodes `chatId="test-session"` for all requests. Because every
test user shares this ID, any incomplete session would be resumed by the next
unrelated test query.

**Fix:** the test endpoint deletes `"test-session"` from Redis at the **start** of
every request, before classification runs. This makes each test query a fresh
single-turn request.

**Trade-off:** multi-turn API tool flows cannot be tested via the test-LLM page.
The session is wiped before turn 2 can use it. To test multi-turn flows, use the
production `/orchestrate/stream` endpoint (which uses unique `chatId` per tab) or
the integration test script.

---

### End-to-End Flow (Query Time)

```
User: "What are the public holidays in Estonia?"
Turn 1 — User: "What are the public holidays in Estonia?"
ToolClassifier.classify()
├─ No active session in Redis for this chat_id
├─ Dense search (intent_collections) → low cosine → below threshold
└─ _try_api_tool_classification()
└─ APISemanticSearcher.search()
├─ Dense: get_national_holidays cosine=0.87
├─ Hybrid: get_national_holidays ranked #1 (RRF)
├─ effective_gap large → confidence="high"
└─ return [APIToolSearchResult(name="get_national_holidays", ...)]
└─ ClassificationResult(
workflow=API_TOOL_CALLING,
metadata={"matched_endpoint": {...}}
)
├─ Dense: get_public_holidays cosine=0.87 → high confidence
└─ return [APIToolSearchResult(name="get_public_holidays", ...)]
└─ ClassificationResult(workflow=API_TOOL_CALLING, metadata={matched_endpoint: {...}})
ToolClassifier._execute_with_fallback_async()
APIToolWorkflowExecutor._run()
├─ No existing session → create new APIToolSession (turn_count=0, language=en)
└─ AgenticLoop.run_turn(turn_count=0, history=[])
├─ ParamExtractionModule: no params in "What are the public holidays in Estonia?"
│ but countryIsoCode=EE can be inferred → extracted
├─ Missing: validFrom, validTo
└─ NEEDS_INPUT → "Which date range would you like? (validFrom, validTo)"
└─ APIToolWorkflowExecutor.execute_async(context={"matched_endpoint": {...}})
└─ OrchestrationResponse(content="**get_national_holidays**: ...")
Session saved to Redis
Bot: "Which date range would you like? Please provide validFrom and validTo (YYYY-MM-DD)."

---

Turn 2 — User: "This year, 2026-01-01 to 2026-12-31"
ToolClassifier.classify()
├─ Active session found for chat_id → run intent-switch check
├─ _try_api_tool_classification("This year, 2026-01-01 to 2026-12-31")
│ → cosine=0.12 < threshold → no new API tool match
└─ Same endpoint → resume session → ClassificationResult(reason=active_session_resume)
APIToolWorkflowExecutor._run()
└─ AgenticLoop.run_turn(turn_count=1, collected_params={countryIsoCode: "EE"})
├─ ParamExtractionModule: extracts validFrom=2026-01-01, validTo=2026-12-31
├─ All required params present
└─ COMPLETED
Session DELETED from Redis
Bot: {"status": "params_collected", "endpoint": {"name": "get_public_holidays"}, "collected_params": {"countryIsoCode": "EE", "validFrom": "2026-01-01", "validTo": "2026-12-31"}}
```

---
---

## Part 5 — Integration Testing

### Test Script

Defined in [tests/api_tool_eval/integration_test_agentic_loop.py](../tests/api_tool_eval/integration_test_agentic_loop.py).

Runs end-to-end against the live service at `http://localhost:8100` via `/orchestrate`.
Each scenario uses a unique `chatId` (UUID) so sessions are fully isolated.

```bash
uv run --no-project --with requests python tests/api_tool_eval/integration_test_agentic_loop.py \
--no-fail-fast \
--output tests/api_tool_eval/integration-results.json
```

### Covered Scenarios

| # | Scenario | Turns | What it validates |
|---|---|---|---|
| 1 | Single-turn complete | 1 | Vehicle tax with plate number in first message → immediate completion |
| 2 | Multi-turn EN | 2 | Public holidays, country extracted turn 1, dates provided turn 2 |
| 3 | Multi-turn ET | 2 | School holidays in Estonian → language-aware classification |
| 4 | No-params fast path | 1 | Parliament votings endpoint has no required params → instant completion |
| 5 | Address search | 2 | Two-turn address lookup |
| 6 | Electricity prices | 2 | Datetime params across two turns |
| 7 | Session isolation | 2 | Two different chat IDs — no param leak between sessions |
| 8 | AWAITING_CONTINUATION → yes | 4+ | User says "yes" at continuation prompt → loop resumes |
| 9 | MAX_TURNS_REACHED | 5+ | User never provides params → falls back to RAG |
6 changes: 6 additions & 0 deletions src/llm_orchestration_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ def __init__(self) -> None:
# This allows components to be initialized per-request with proper context
self.tool_classifier = None

# Redis-backed session store for API Tool Calling agentic loop.
# Set to None here; the FastAPI lifespan injects the live store after
# Redis initialises (app.state.orchestration_service.session_store = ...).
# Workflow executors access it via self.orchestration_service.session_store.
self.session_store: Any = None

# Shared BM25 search index pre-warmed at startup.
# Populated by _prewarm_shared_bm25() which is called from the FastAPI
# lifespan so it runs inside the async event loop. Until then it is None
Expand Down
15 changes: 15 additions & 0 deletions src/llm_orchestration_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.warning(f"Redis session store unavailable, continuing without it: {e}")
app.state.session_store = None

# Expose session_store on the orchestration service so workflow executors
# (e.g. APIToolWorkflowExecutor) can reach it via self.orchestration_service.
if (
hasattr(app.state, "orchestration_service")
and app.state.orchestration_service is not None
):
app.state.orchestration_service.session_store = app.state.session_store

yield

# Shutdown
Expand Down Expand Up @@ -383,6 +391,13 @@ async def test_orchestrate_llm_request(
else None,
)

# test-LLM is single-turn only (no conversationHistory, no multi-turn loops).
# Clear any stale API tool session so each request starts fresh and never
# accidentally resumes a parameter-collection loop from a previous test query.
session_store = getattr(http_request.app.state, "session_store", None)
if session_store is not None:
await session_store.delete("test-session")

logger.info(f"This is full request constructed for testing: {full_request}")

# Process the request using the same logic
Expand Down
8 changes: 8 additions & 0 deletions src/models/session_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ class APIToolSession(BaseModel):
"to the RAG workflow."
),
)
detected_language: str = Field(
default="en",
description=(
"Language detected from the user's first message ('en', 'et', 'ru'). "
"Persisted so all subsequent clarifying questions use the same language, "
"even when follow-up messages are too short to reliably re-detect."
),
)
Loading
Loading