Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions astrbot/core/agent/runners/tool_loop_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ async def step(self):
if self.stats.time_to_first_token == 0:
self.stats.time_to_first_token = time.time() - self.stats.start_time

# Handle usage from providers like MiniMax that send usage in chunk responses
if llm_response.usage:
self.stats.token_usage += llm_response.usage

if llm_response.result_chain:
yield AgentResponse(
type="streaming_delta",
Expand Down
92 changes: 89 additions & 3 deletions astrbot/core/provider/sources/openai_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,18 +528,42 @@ async def _query_stream(
del payloads[key]
self._apply_provider_specific_extra_body_overrides(extra_body)

# Build streaming options - enable stream_usage for providers that need it (e.g. MiniMax)
# Auto-detect MiniMax by api_base URL
api_base = self.provider_config.get("api_base", "") or ""
is_minimax = "minimaxi" in api_base.lower()
stream_options = None
if self.provider_config.get("enable_stream_usage", False) or is_minimax:
stream_options = {"include_usage": True}

stream = await self.client.chat.completions.create(
**payloads,
stream=True,
stream_options=stream_options,
extra_body=extra_body,
)

llm_response = LLMResponse("assistant", is_chunk=True)

state = ChatCompletionStreamState()

# Track partial thinking tags across chunks for MiniMax-style reasoning
thinking_buffer = ""
# Compile regex once outside the loop for efficiency
thinking_pattern = re.compile(r"<think>(.*?)</think>", re.DOTALL)

async for chunk in stream:
# Handle usage in chunks with empty choices (e.g., MiniMax sends usage in final chunk)
if not chunk.choices:
if chunk.usage:
# Create a separate response for usage-only chunks to avoid
# mutating the shared llm_response object that may have
# pending content in result_chain
usage_response = LLMResponse(role="assistant")
usage_response.id = chunk.id
usage_response.usage = self._extract_usage(chunk.usage)
usage_response.is_chunk = True
yield usage_response
continue
choice = chunk.choices[0]
delta = choice.delta
Expand Down Expand Up @@ -568,10 +592,28 @@ async def _query_stream(
if delta and delta.content:
# Don't strip streaming chunks to preserve spaces between words
completion_text = self._normalize_content(delta.content, strip=False)
llm_response.result_chain = MessageChain(
chain=[Comp.Plain(completion_text)],

# Handle partial   think... ‍ think tags that may span multiple chunks (MiniMax)
# Prepend any leftover thinking content from previous chunk
if thinking_buffer:
completion_text = thinking_buffer + completion_text
thinking_buffer = ""

completion_text, thinking_buffer, llm_response.reasoning_content, _y = (
self._extract_thinking_blocks(
completion_text,
thinking_buffer,
llm_response.reasoning_content,
thinking_pattern,
_y,
)
)
_y = True

if completion_text:
llm_response.result_chain = MessageChain(
chain=[Comp.Plain(completion_text)],
)
_y = True
if chunk.usage:
llm_response.usage = self._extract_usage(chunk.usage)
elif choice_usage := getattr(choice, "usage", None):
Expand All @@ -587,6 +629,50 @@ async def _query_stream(

yield llm_response

def _extract_thinking_blocks(
self,
completion_text: str,
thinking_buffer: str,
reasoning_content: str | None,
thinking_pattern: re.Pattern,
has_content: bool,
) -> tuple[str, str, str | None, bool]:
"""
Extract thinking blocks from completion text and handle partial blocks across chunks.

Returns:
tuple of (cleaned_text, new_thinking_buffer, updated_reasoning_content, found_content)
"""
# Extract complete thinking blocks
for match in thinking_pattern.finditer(completion_text):
think_content = match.group(1).strip()
if think_content:
if reasoning_content:
reasoning_content += "\n" + think_content
else:
reasoning_content = think_content
has_content = True

# Remove all complete thinking blocks from completion_text
completion_text = thinking_pattern.sub("", completion_text)

# Handle case where partial thinking tags span chunks
think_start = completion_text.rfind("<think>")
think_end = completion_text.rfind("</think>")

if think_start != -1 and (think_end == -1 or think_end < think_start):
# Buffer incomplete thinking block
thinking_buffer = completion_text[think_start:]
completion_text = completion_text[:think_start]
elif think_end != -1 and think_end > think_start:
# Clear buffer when thinking block closes
thinking_buffer = ""

# Strip leading whitespace
completion_text = completion_text.lstrip()

return completion_text, thinking_buffer, reasoning_content, has_content

def _extract_reasoning_content(
self,
completion: ChatCompletion | ChatCompletionChunk,
Expand Down