Skip to content

Commit b90ab52

Browse files
authored
Merge pull request #136 from rootcodelabs/llm-310
Get update from llm-310 into llm/service-integration
2 parents d67214e + d647f86 commit b90ab52

12 files changed

Lines changed: 1722 additions & 135 deletions
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
# Context Workflow: Greeting Detection and Conversation History Analysis
2+
3+
## Overview
4+
5+
The **Context Workflow (Layer 2)** intercepts user queries that can be answered without searching the knowledge base. It handles two categories:
6+
7+
1. **Greetings** — Detects and responds to social exchanges (hello, goodbye, thanks) in multiple languages
8+
2. **Conversation history references** — Answers follow-up questions that refer to information already discussed in the session
9+
10+
When the context workflow can answer, a response is returned immediately, bypassing the RAG pipeline entirely. When it cannot answer, the query falls through to the RAG workflow (Layer 3).
11+
12+
---
13+
14+
## Architecture
15+
16+
### Position in the Classifier Chain
17+
18+
```
19+
User Query
20+
21+
Layer 1: SERVICE → External API calls
22+
↓ (cannot handle)
23+
Layer 2: CONTEXT → Greetings + conversation history ←── This document
24+
↓ (cannot handle)
25+
Layer 3: RAG → Knowledge base retrieval
26+
↓ (cannot handle)
27+
Layer 4: OOD → Out-of-domain fallback
28+
```
29+
30+
### Key Components
31+
32+
| Component | File | Responsibility |
33+
|-----------|------|----------------|
34+
| `ContextAnalyzer` | `src/tool_classifier/context_analyzer.py` | LLM-based greeting detection and context analysis |
35+
| `ContextWorkflowExecutor` | `src/tool_classifier/workflows/context_workflow.py` | Orchestrates the workflow, handles streaming/non-streaming |
36+
| `ToolClassifier` | `src/tool_classifier/classifier.py` | Invokes `ContextAnalyzer` during classification and routes to `ContextWorkflowExecutor` |
37+
| `greeting_constants.py` | `src/tool_classifier/greeting_constants.py` | Fallback greeting responses for Estonian and English |
38+
39+
---
40+
41+
## Full Request Flow
42+
43+
```
44+
User Query + Conversation History
45+
46+
ToolClassifier.classify()
47+
├─ Layer 1 (SERVICE): Embedding-based intent routing
48+
│ └─ If no service tool matches → route to CONTEXT workflow
49+
50+
└─ ClassificationResult(workflow=CONTEXT)
51+
52+
ToolClassifier.route_to_workflow()
53+
├─ Non-streaming → ContextWorkflowExecutor.execute_async()
54+
│ ├─ Phase 1: _detect() → context_analyzer.detect_context() [classification only]
55+
│ ├─ If greeting → return greeting OrchestrationResponse
56+
│ ├─ If can_answer → _generate_response_async() → context_analyzer.generate_context_response()
57+
│ └─ Otherwise → return None (RAG fallback)
58+
59+
└─ Streaming → ContextWorkflowExecutor.execute_streaming()
60+
├─ Phase 1: _detect() → context_analyzer.detect_context() [classification only]
61+
├─ If greeting → _stream_greeting() async generator
62+
├─ If can_answer → _create_history_stream() → context_analyzer.stream_context_response()
63+
└─ Otherwise → return None (RAG fallback)
64+
```
65+
66+
---
67+
68+
## Phase 1: Detection (Classify Only)
69+
70+
### LLM Task
71+
72+
Every query is checked against the **most recent 10 conversation turns** using a single LLM call (`detect_context()`). This phase **does not generate an answer** — it only classifies the query and extracts a relevant context snippet for Phase 2.
73+
74+
The `ContextDetectionSignature` DSPy signature instructs the LLM to:
75+
76+
1. Detect if the query is a greeting in any supported language
77+
2. Check if the query references something discussed in the last 10 turns
78+
3. If the query can be answered from history, extract the relevant snippet
79+
4. Do **not** generate the final answer here — detection only
80+
81+
### LLM Output Format
82+
83+
The LLM returns a JSON object parsed into `ContextDetectionResult`:
84+
85+
```json
86+
{
87+
"is_greeting": false,
88+
"can_answer_from_context": true,
89+
"reasoning": "User is asking about tax rate discussed earlier",
90+
"context_snippet": "Bot confirmed the flat rate is 20%, applying equally to all income brackets."
91+
}
92+
```
93+
94+
| Field | Type | Description |
95+
|-------|------|-------------|
96+
| `is_greeting` | `bool` | Whether the query is a greeting |
97+
| `can_answer_from_context` | `bool` | Whether the query can be answered from conversation history |
98+
| `reasoning` | `str` | Brief explanation of the detection decision |
99+
| `context_snippet` | `str \| null` | Relevant excerpt from history for use in Phase 2, or `null` |
100+
101+
> **Internal field**: `answered_from_summary` (bool, default `False`) is reserved for future summary-based detection paths.
102+
103+
### Decision After Phase 1
104+
105+
```
106+
is_greeting=True → Phase 2: return greeting response (no LLM call)
107+
can_answer_from_context=True AND snippet set → Phase 2: generate answer from snippet
108+
Otherwise → Fall back to RAG
109+
```
110+
111+
---
112+
113+
## Phase 2: Response Generation
114+
115+
### Non-Streaming (`_generate_response_async`)
116+
117+
Calls `generate_context_response(query, context_snippet)` which uses `ContextResponseGenerationSignature` to produce a complete answer in a single LLM call. Output guardrails are applied before returning the `OrchestrationResponse`.
118+
119+
### Streaming (`_create_history_stream``stream_context_response`)
120+
121+
Calls `stream_context_response(query, context_snippet)` which uses DSPy native streaming (`dspy.streamify`) with `ContextResponseGenerationSignature`. Tokens are yielded in real time and passed through NeMo Guardrails before being SSE-formatted.
122+
123+
---
124+
125+
---
126+
127+
## Greeting Detection
128+
129+
### Supported Languages
130+
131+
| Language | Code |
132+
|----------|------|
133+
| Estonian | `et` |
134+
| English | `en` |
135+
136+
### Supported Greeting Types
137+
138+
| Type | Estonian Examples | English Examples |
139+
|------|-------------------|-----------------|
140+
| `hello` | Tere, Hei, Tervist, Moi | Hello, Hi, Hey, Good morning |
141+
| `goodbye` | Nägemist, Tšau | Bye, Goodbye, See you, Good night |
142+
| `thanks` | Tänan, Aitäh, Tänud | Thank you, Thanks |
143+
| `casual` | Tere, Tervist | Hey |
144+
145+
### Greeting Response Generation
146+
147+
Greeting detection is handled in **Phase 1 (`detect_context`)**, where the LLM classifies whether the query is a greeting and, if so, identifies the language and greeting type. This phase does **not** generate the final natural-language reply.
148+
In **Phase 2**, `ContextWorkflowExecutor` calls `get_greeting_response(...)`, which returns a response based on predefined static templates in `greeting_constants.py`, ensuring the reply is in the detected language. If greeting detection fails or the greeting type is unsupported, the query falls through to the next workflow layer instead of attempting LLM-based greeting generation.
149+
**Greeting response templates (`greeting_constants.py`):**
150+
151+
```python
152+
GREETINGS_ET = {
153+
"hello": "Tere! Kuidas ma saan sind aidata?",
154+
"goodbye": "Nägemist! Head päeva!",
155+
"thanks": "Palun! Kui on veel küsimusi, küsi julgelt.",
156+
"casual": "Tere! Mida ma saan sinu jaoks teha?",
157+
}
158+
159+
GREETINGS_EN = {
160+
"hello": "Hello! How can I help you?",
161+
"goodbye": "Goodbye! Have a great day!",
162+
"thanks": "You're welcome! Feel free to ask if you have more questions.",
163+
"casual": "Hey! What can I do for you?",
164+
}
165+
```
166+
167+
The fallback greeting type is determined by keyword matching in `_detect_greeting_type()` — checking for `thank/tänan/aitäh`, `bye/goodbye/nägemist/tšau`, before defaulting to `hello`.
168+
169+
---
170+
171+
## Streaming Support
172+
173+
The context workflow supports both response modes:
174+
175+
### Non-Streaming (`execute_async`)
176+
177+
Returns a complete `OrchestrationResponse` object with the answer as a single string. Output guardrails are applied before the response is returned.
178+
179+
### Streaming (`execute_streaming`)
180+
181+
Returns an `AsyncIterator[str]` that yields SSE (Server-Sent Events) chunks.
182+
183+
**Greeting responses** are yielded as a single SSE chunk followed by `END`.
184+
185+
**History responses** use DSPy native streaming (`dspy.streamify`) with `ContextResponseGenerationSignature`. Tokens are emitted in real time as they arrive from the LLM, then passed through NeMo Guardrails (`stream_with_guardrails`) before being SSE-formatted. If a guardrail violation is detected in a chunk, streaming stops and the violation message is sent instead.
186+
187+
**SSE Format:**
188+
```
189+
data: {"chatId": "abc123", "payload": {"content": "Tere! Kuidas ma"}, "timestamp": "...", "sentTo": []}
190+
191+
data: {"chatId": "abc123", "payload": {"content": " saan sind aidata?"}, "timestamp": "...", "sentTo": []}
192+
193+
data: {"chatId": "abc123", "payload": {"content": "END"}, "timestamp": "...", "sentTo": []}
194+
```
195+
196+
---
197+
198+
## Cost Tracking
199+
200+
LLM token usage and cost is tracked via `get_lm_usage_since()` and stored in `costs_metric` within the workflow executor. Costs are logged via `orchestration_service.log_costs()` at the end of each execution path.
201+
202+
Two cost keys are tracked separately:
203+
204+
```python
205+
costs_metric = {
206+
"context_detection": {
207+
# Phase 1: detect_context() — single LLM call
208+
"total_cost": 0.0012,
209+
"total_tokens": 180,
210+
"total_prompt_tokens": 150,
211+
"total_completion_tokens": 30,
212+
"num_calls": 1,
213+
},
214+
"context_response": {
215+
# Phase 2: generate_context_response() or stream_context_response()
216+
"total_cost": 0.003,
217+
"total_tokens": 140,
218+
"total_prompt_tokens": 100,
219+
"total_completion_tokens": 40,
220+
"num_calls": 1,
221+
},
222+
}
223+
```
224+
225+
Greeting responses skip Phase 2, so only `"context_detection"` cost is populated.
226+
227+
---
228+
229+
---
230+
231+
## Error Handling and Fallback
232+
233+
| Failure Point | Behaviour |
234+
|---------------|-----------|
235+
| Phase 1 LLM call raises exception | `can_answer_from_context=False` → falls back to RAG |
236+
| Phase 1 returns invalid JSON | Logged as warning, all flags default to `False` → falls back to RAG |
237+
| Phase 2 LLM call raises exception | Logged as error, `_generate_response_async` returns `None` → falls back to RAG |
238+
| Phase 2 returns empty answer | Logged as warning → falls back to RAG |
239+
| Output guardrails fail | Logged as warning, response returned without guardrail check |
240+
| Guardrail violation in streaming | `OUTPUT_GUARDRAIL_VIOLATION_MESSAGE` sent, stream terminated |
241+
| `orchestration_service` unavailable | History streaming skipped → `None` returned → RAG fallback |
242+
| `guardrails_adapter` not a `NeMoRailsAdapter` | Logged as warning → cannot stream → RAG fallback |
243+
| Any unhandled exception in executor | Error logged, `execute_async/execute_streaming` returns `None` → RAG fallback via classifier |
244+
245+
---
246+
247+
## Logging
248+
249+
Key log entries emitted during a request:
250+
251+
| Level | Message | When |
252+
|-------|---------|------|
253+
| `INFO` | `CONTEXT WORKFLOW (NON-STREAMING) \| Query: '...'` | `execute_async()` entry |
254+
| `INFO` | `CONTEXT WORKFLOW (STREAMING) \| Query: '...'` | `execute_streaming()` entry |
255+
| `INFO` | `CONTEXT DETECTOR: Phase 1 \| Query: '...' \| History: N turns` | `detect_context()` entry |
256+
| `INFO` | `DETECTION RESULT \| Greeting: ... \| Can Answer: ... \| Has snippet: ...` | Phase 1 LLM response parsed |
257+
| `INFO` | `Detection cost \| Total: $... \| Tokens: N` | After Phase 1 cost tracked |
258+
| `INFO` | `Detection: greeting=... can_answer=...` | After `_detect()` returns in executor |
259+
| `INFO` | `CONTEXT GENERATOR: Phase 2 non-streaming \| Query: '...'` | `generate_context_response()` entry |
260+
| `INFO` | `CONTEXT GENERATOR: Phase 2 streaming \| Query: '...'` | `stream_context_response()` entry |
261+
| `INFO` | `Context response streaming complete (final Prediction received)` | DSPy streaming finished |
262+
| `WARNING` | `[chatId] Phase 2 empty answer — fallback to RAG` | Phase 2 returned no content |
263+
| `WARNING` | `[chatId] Guardrails violation in context streaming` | Violation detected mid-stream |
264+
| `WARNING` | `[chatId] Cannot answer from context — falling back to RAG` | Neither phase could answer |
265+
266+
---
267+
268+
## Data Models
269+
270+
### `ContextDetectionResult` (Phase 1 output)
271+
272+
```python
273+
class ContextDetectionResult(BaseModel):
274+
is_greeting: bool # True if query is a greeting
275+
can_answer_from_context: bool # True if query can be answered from last 10 turns
276+
reasoning: str # LLM's brief explanation
277+
answered_from_summary: bool # Reserved; always False in current workflow
278+
context_snippet: Optional[str] # Relevant excerpt for Phase 2 generation, or None
279+
```
280+
281+
### `ContextDetectionSignature` (DSPy — Phase 1)
282+
283+
| Field | Type | Description |
284+
|-------|------|-------------|
285+
| `conversation_history` | Input | Last 10 turns formatted as JSON |
286+
| `user_query` | Input | Current user query |
287+
| `detection_result` | Output | JSON with `is_greeting`, `can_answer_from_context`, `reasoning`, `context_snippet` |
288+
289+
> Detection only — **no answer generated here**.
290+
291+
### `ContextResponseGenerationSignature` (DSPy — Phase 2)
292+
293+
| Field | Type | Description |
294+
|-------|------|-------------|
295+
| `context_snippet` | Input | Relevant excerpt from Phase 1 |
296+
| `user_query` | Input | Current user query |
297+
| `answer` | Output | Natural language response in the same language as the query |
298+
299+
---
300+
301+
## Decision Summary Table
302+
303+
| Scenario | Phase 1 LLM Calls | Phase 2 LLM Calls | Outcome |
304+
|----------|--------------------|--------------------|---------|
305+
| Greeting detected | 1 (`detect_context`) | 0 (static response) | Context responds (greeting) |
306+
| Follow-up answerable from last 10 turns | 1 (`detect_context`) | 1 (`generate_context_response` or `stream_context_response`) | Context responds |
307+
| Cannot answer from last 10 turns | 1 (`detect_context`) | 0 | Falls back to RAG |
308+
| Phase 1 LLM error / JSON parse failure || 0 | Falls back to RAG |
309+
| Phase 2 LLM error or empty answer | 1 || Falls back to RAG |
310+
311+
---
312+
313+
## File Reference
314+
315+
| File | Purpose |
316+
|------|---------|
317+
| `src/tool_classifier/context_analyzer.py` | Core LLM analysis logic (all three steps) |
318+
| `src/tool_classifier/workflows/context_workflow.py` | Workflow executor (streaming + non-streaming) |
319+
| `src/tool_classifier/classifier.py` | Classification layer that invokes context analysis |
320+
| `src/tool_classifier/greeting_constants.py` | Static fallback greeting responses (ET/EN) |
321+
| `tests/test_context_analyzer.py` | Unit tests for `ContextAnalyzer` |
322+
| `tests/test_context_workflow.py` | Unit tests for `ContextWorkflowExecutor` |
323+
| `tests/test_context_workflow_integration.py` | Integration tests for the full classify → route → execute chain |

src/llm_orchestration_service.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,11 +639,13 @@ async def stream_orchestration_response(
639639
)
640640

641641
# Classify query to determine workflow
642+
start_time = time.time()
642643
classification = await self.tool_classifier.classify(
643644
query=request.message,
644645
conversation_history=request.conversationHistory,
645646
language=detected_language,
646647
)
648+
time_metric["classifier.classify"] = time.time() - start_time
647649

648650
logger.info(
649651
f"[{request.chatId}] [{stream_ctx.stream_id}] Classification: {classification.workflow.value} "
@@ -652,11 +654,14 @@ async def stream_orchestration_response(
652654

653655
# Route to appropriate workflow (streaming)
654656
# route_to_workflow returns AsyncIterator[str] when is_streaming=True
657+
start_time = time.time()
655658
stream_result = await self.tool_classifier.route_to_workflow(
656659
classification=classification,
657660
request=request,
658661
is_streaming=True,
662+
time_metric=time_metric,
659663
)
664+
time_metric["classifier.route"] = time.time() - start_time
660665

661666
async for sse_chunk in stream_result:
662667
yield sse_chunk

src/llm_orchestration_service_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
7171
if StreamConfig.RATE_LIMIT_ENABLED:
7272
app.state.rate_limiter = RateLimiter(
7373
requests_per_minute=StreamConfig.RATE_LIMIT_REQUESTS_PER_MINUTE,
74-
tokens_per_second=StreamConfig.RATE_LIMIT_TOKENS_PER_SECOND,
74+
tokens_per_minute=StreamConfig.RATE_LIMIT_TOKENS_PER_MINUTE,
7575
)
7676
logger.info("Rate limiter initialized successfully")
7777
else:

src/llm_orchestrator_config/stream_config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ class StreamConfig:
2121

2222
# Rate Limiting Configuration
2323
RATE_LIMIT_ENABLED: bool = True # Enable/disable rate limiting
24-
RATE_LIMIT_REQUESTS_PER_MINUTE: int = 10 # Max requests per user per minute
25-
RATE_LIMIT_TOKENS_PER_SECOND: int = (
26-
100 # Max tokens per user per second (burst control)
27-
)
24+
RATE_LIMIT_REQUESTS_PER_MINUTE: int = 20 # Max requests per user per minute
25+
RATE_LIMIT_TOKENS_PER_MINUTE: int = 40_000 # Max tokens per user per minute
2826
RATE_LIMIT_CLEANUP_INTERVAL: int = 300 # Cleanup old entries every 5 minutes
27+
RATE_LIMIT_TOKEN_WINDOW_SECONDS: int = 60 # Sliding window size for token tracking

0 commit comments

Comments
 (0)