feat: support and flush chunks in the chat stream helper#1809
feat: support and flush chunks in the chat stream helper#1809zimeg merged 11 commits intofeat-ai-apps-thinking-stepsfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## feat-ai-apps-thinking-steps #1809 +/- ##
==============================================================
Coverage ? 83.90%
==============================================================
Files ? 116
Lines ? 13192
Branches ? 0
==============================================================
Hits ? 11069
Misses ? 2123
Partials ? 0 ☔ View full report in Codecov by Sentry. |
srtaalej
left a comment
There was a problem hiding this comment.
Thanks for adding this ⭐ ⭐ ⭐ left some thoughts on the empty append - would love to hear your thoughts
| channel: str, | ||
| ts: str, | ||
| markdown_text: str, | ||
| markdown_text: Optional[str] = None, |
There was a problem hiding this comment.
🌟 it is much easier to have both markdown and chunks optional!
| recipient_team_id="T0123456789", | ||
| recipient_user_id="U0123456789", | ||
| thread_ts="123.000", | ||
| ) |
There was a problem hiding this comment.
i tested this with an empty call to .append() and i think we should raise an error/ warning for when append is sent empty 🤔
There was a problem hiding this comment.
A warning is a nice idea. I wouldn't consider it a blocker for the PR, if we want to move forward and add the warning later.
mwbrooks
left a comment
There was a problem hiding this comment.
✅ Nice, thanks for updating the stream helper! 🙇🏻
🧪 Manual testing works! Below is the modified listeners/assistant/message.py that I used based on the PR test steps.
bolt-python-assistant-template/listeners/assistant/message.py:
from logging import Logger
from typing import Dict, List
from slack_bolt import BoltContext, Say, SetStatus
from slack_sdk import WebClient
from ai.llm_caller import call_llm
from ..views.feedback_block import create_feedback_block
import time
from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk, URLSource
def message(
client: WebClient,
context: BoltContext,
logger: Logger,
payload: dict,
say: Say,
set_status: SetStatus,
):
"""
Handles when users send messages or select a prompt in an assistant thread and generate AI responses:
Args:
client: Slack WebClient for making API calls
context: Bolt context containing channel and thread information
logger: Logger instance for error tracking
payload: Event payload with message details (channel, user, text, etc.)
say: Function to send messages to the thread
set_status: Function to update the assistant's status
"""
try:
channel_id = payload["channel"]
team_id = context.team_id
thread_ts = payload["thread_ts"]
user_id = context.user_id
set_status(
status="thinking...",
loading_messages=[
"Teaching the hamsters to type faster…",
"Untangling the internet cables…",
"Consulting the office goldfish…",
"Polishing up the response just for you…",
"Convincing the AI to stop overthinking…",
],
)
replies = client.conversations_replies(
channel=context.channel_id,
ts=context.thread_ts,
oldest=context.thread_ts,
limit=10,
)
messages_in_thread: List[Dict[str, str]] = []
for message in replies["messages"]:
role = "user" if message.get("bot_id") is None else "assistant"
messages_in_thread.append({"role": role, "content": message["text"]})
streamer = client.chat_stream(
channel=channel_id,
recipient_team_id=team_id,
recipient_user_id=user_id,
thread_ts=thread_ts,
)
streamer.append(
chunks=[
MarkdownTextChunk(
text="Hello.\nI have received the task. ",
),
MarkdownTextChunk(
text="This task appears manageable.\nThat is good.",
),
TaskUpdateChunk(
id="001",
title="Understanding the task...",
status="in_progress",
details="- Identifying the goal\n- Identifying constraints",
),
TaskUpdateChunk(
id="002",
title="Performing acrobatics...",
status="pending",
),
],
)
time.sleep(4)
streamer.append(
chunks=[
TaskUpdateChunk(
id="001",
title="Understanding the task...",
status="complete",
details="\n- Pretending this was obvious",
output="We'll continue to ramble now",
),
TaskUpdateChunk(
id="002",
title="Performing acrobatics...",
status="in_progress",
),
],
)
time.sleep(4)
streamer.stop(
chunks=[
TaskUpdateChunk(
id="12",
title="solved equation!",
status="complete",
sources=[
URLSource(
url="https://oeis.org",
text="The On-Line Encyclopedia of Integer Sequences (OEIS)",
),
],
),
MarkdownTextChunk(text="that computes."),
],
)
except Exception as e:
logger.exception(f"Failed to handle a user message event: {e}")
say(f":warning: Something went wrong! ({e})")| raise e.SlackRequestError("Failed to stop stream: stream not started") | ||
| self._stream_ts = str(response["ts"]) | ||
| self._state = "in_progress" | ||
| flushings: List[Union[Dict, Chunk]] = [] |
There was a problem hiding this comment.
😉 quite the visual name! 💩 Would chunks_to_flush also work? Perhaps too long.
| recipient_team_id="T0123456789", | ||
| recipient_user_id="U0123456789", | ||
| thread_ts="123.000", | ||
| ) |
There was a problem hiding this comment.
A warning is a nice idea. I wouldn't consider it a blocker for the PR, if we want to move forward and add the warning later.
WilliamBergamin
left a comment
There was a problem hiding this comment.
These changes look good to me, I left a few comments to make sure I understand the behavior properly but they should be non blocking 💯
| flushings: List[Union[Dict, Chunk]] = [] | ||
| if len(self._buffer) != 0: | ||
| flushings.append(MarkdownTextChunk(text=self._buffer)) | ||
| if chunks is not None: | ||
| flushings.extend(chunks) |
There was a problem hiding this comment.
Nice! This allows the markdown_text and chunks to be accepted and passed to the method using the chunks param 💯
There was a problem hiding this comment.
@WilliamBergamin The chunks argument might be preferred IIRC but I think markdown_text is still so useful for streaming LLM responses direct 👾
| markdown_text: Optional[str] = None, | ||
| chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, |
There was a problem hiding this comment.
Since both arguments are now optional what happens if a developer does not pass any of these arguments when using the method?
There was a problem hiding this comment.
@WilliamBergamin A markdown_text_or_chunks_required error should appear from the API but I think @srtaalej and @mwbrooks mention that a warning might be useful to include with the SDK helper...
📝 Noting that we should follow up with this change!
| if markdown_text is not None: | ||
| self._buffer += markdown_text | ||
| if len(self._buffer) >= self._buffer_size or chunks is not None: | ||
| return self._flush_buffer(chunks=chunks, **kwargs) |
There was a problem hiding this comment.
If I'm understanding this correctly if I do
streamer = client.chat_stream(
channel="C0123456789",
recipient_team_id="T0123456789",
recipient_user_id="U0123456789",
thread_ts="10101010101.010101",
)
streamer.append(markdown_text="\n")
streamer.append(
chunks=[
MarkdownTextChunk(
text="Hello.\nI have received the task. ",
),
],
)- The first
appendwith markdown will not be sent to Slack but rather added to the buffer - The second
appendwith the chunks will be sent to Slack along with what ever may be in the buffer regarless of if the buffer has exceeded the buffer_size?
There was a problem hiding this comment.
@WilliamBergamin Correct! Providing chunks is almost like a forced flush of the buffer 🪬
|
@srtaalej @mwbrooks @WilliamBergamin Oncemore the kind reviews are so appreciated! 🙏 ✨ I'm holding off on more changes to this PR for testing soonest, but will promise these to be included in upcoming commits. |
Summary
This PR updates the
chat_streamhelper to support and flush "chunks".Testing
The following snippet might be useful in testing:
Category
/docs(Documents)tests/integration_tests(Automated tests for this library)Requirements
python3 -m venv .venv && source .venv/bin/activate && ./scripts/run_validation.shafter making the changes.