Skip to content

Implement user input handling in Flows#4490

Merged
joaomdmoura merged 8 commits intomainfrom
joaomdmoura/conversation-flow
Feb 16, 2026
Merged

Implement user input handling in Flows#4490
joaomdmoura merged 8 commits intomainfrom
joaomdmoura/conversation-flow

Conversation

@joaomdmoura
Copy link
Collaborator

Add self.ask() for conversational flows

Flows can now request user input mid-execution via self.ask(). This enables conversational patterns like gathering requirements, interactive troubleshooting, multi-turn dialogs, without breaking the flow lifecycle and requiring HITL.

What it does

class ResearchFlow(Flow):
    @start()
    def gather_and_research(self):
        topic = self.ask("What topic should we research?")
        depth = self.ask("How deep? (brief/detailed/exhaustive)")
        return crew.kickoff(inputs={"topic": topic, "depth": depth})
  • self.ask(message, timeout=None, metadata=None) -- pauses the method, prompts the user, returns their response as a string. Works in both sync and async flow methods.
  • Timeout -- returns None when expired, enabling while (msg := self.ask("You:", timeout=300)) is not None: loop patterns that always terminate.
  • Metadata -- bidirectional. Send context to the provider (metadata={"user_id": "u123"}), receive context back from the provider via InputResponse(text="answer", metadata={"responded_by": "u456"}). Thread-safe by design (no shared mutable state).
  • Auto-checkpoint -- persists self.state before each ask() call when persistence is configured, so previously gathered data survives crashes.
  • Non-blocking -- the flow framework runs sync methods in a thread pool (asyncio.to_thread), so ask() never blocks the event loop. Parallel listeners execute concurrently even when one is waiting for input.

How it differs from @human_feedback

self.ask() @human_feedback
When Mid-method, inline Post-method, wraps return value
Purpose Gather input Review/approve output
Can be conditional, looped, called N times Yes No (once per decorated method)
Routes to listeners via emit No Yes
Persist and reload flow execution No Yes

They coexist -- same flow can use both.

joaomdmoura and others added 6 commits February 16, 2026 11:50
- Introduced `FlowInputRequestedEvent` and `FlowInputReceivedEvent` to manage user input requests and responses during flow execution.
- Added `InputProvider` protocol and `InputResponse` dataclass for customizable input handling.
- Enhanced `Flow` class with `ask()` method to request user input, including timeout handling and state checkpointing.
- Updated `FlowConfig` to support custom input providers.
- Created `input_provider.py` for default input provider implementations, including a console-based provider.
- Added comprehensive tests for `ask()` functionality, covering basic usage, timeout behavior, and integration with flow machinery.
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
- Removed unnecessary variable assignments for the result of `flow.kickoff()` in two test cases, improving code clarity.
- Updated assertions to ensure the expected execution log entries are present after the flow kickoff, enhancing test reliability.
- Introduced a new context variable, `current_flow_method_name`, to store the name of the currently executing flow method, defaulting to "unknown".
- Updated the Flow class to set and reset this context variable during method execution, enhancing the ability to track method calls without stack inspection.
- Removed the obsolete `_resolve_calling_method_name` method, streamlining the code and improving clarity.
- Introduced a new `InputHistoryEntry` TypedDict to structure user input history for the `ask()` method, capturing details such as the question, user response, method name, timestamp, and associated metadata.
- Updated the `_input_history` attribute in the Flow class to utilize the new `InputHistoryEntry` type, improving type safety and clarity in input history management.
- Updated the `ask()` method to improve timeout management by manually managing the `ThreadPoolExecutor`, preventing potential deadlocks when the provider call exceeds the timeout duration.
- Added clarifications in the documentation regarding the behavior of the timeout and the underlying request handling, ensuring better understanding for users.
@joaomdmoura joaomdmoura force-pushed the joaomdmoura/conversation-flow branch from 9223c32 to c6e7d70 Compare February 16, 2026 19:50
- Introduced flow memory reset capabilities in the `reset_memories_command`, allowing for both crew and flow memory resets.
- Added a new utility function `_reset_flow_memory` to handle memory resets for individual flow instances, improving modularity and clarity.
- Updated the `get_flows` utility to discover flow instances from project files, enhancing the CLI's ability to manage flow states.
- Expanded test coverage to validate the new flow memory reset features, ensuring robust functionality and error handling.
@joaomdmoura joaomdmoura merged commit 84d57c7 into main Feb 16, 2026
44 checks passed
@joaomdmoura joaomdmoura deleted the joaomdmoura/conversation-flow branch February 16, 2026 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants