Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions lib/crewai/src/crewai/cli/reset_memories_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,30 @@

import click

from crewai.cli.utils import get_crews
from crewai.cli.utils import get_crews, get_flows
from crewai.flow import Flow


def _reset_flow_memory(flow: Flow) -> None:
"""Reset memory for a single flow instance.

Handles Memory, MemoryScope (both have .reset()), and MemorySlice
(delegates to the underlying ._memory). Silently succeeds when the
storage directory does not exist yet (nothing to reset).

Args:
flow: The flow instance whose memory should be reset.
"""
mem = flow.memory
if mem is None:
return
try:
if hasattr(mem, "reset"):
mem.reset()
elif hasattr(mem, "_memory") and hasattr(mem._memory, "reset"):
mem._memory.reset()
except (FileNotFoundError, OSError):
Comment thread
joaomdmoura marked this conversation as resolved.
Dismissed
pass


def reset_memories_command(
Expand All @@ -12,7 +35,7 @@ def reset_memories_command(
kickoff_outputs: bool,
all: bool,
) -> None:
"""Reset the crew memories.
"""Reset the crew and flow memories.

Args:
memory: Whether to reset the unified memory.
Expand All @@ -29,8 +52,11 @@ def reset_memories_command(
return

crews = get_crews()
if not crews:
raise ValueError("No crew found.")
flows = get_flows()

if not crews and not flows:
raise ValueError("No crew or flow found.")

for crew in crews:
if all:
crew.reset_memories(command_type="all")
Expand Down Expand Up @@ -59,6 +85,20 @@ def reset_memories_command(
f"[Crew ({crew.name if crew.name else crew.id})] Agents knowledge has been reset."
)

for flow in flows:
flow_name = flow.name or flow.__class__.__name__
if all:
_reset_flow_memory(flow)
click.echo(
f"[Flow ({flow_name})] Reset memories command has been completed."
)
continue
if memory:
_reset_flow_memory(flow)
click.echo(
f"[Flow ({flow_name})] Memory has been reset."
)

except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while resetting the memories: {e}", err=True)
click.echo(e.output, err=True)
Expand Down
103 changes: 103 additions & 0 deletions lib/crewai/src/crewai/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,109 @@ def fetch_crews(module_attr: Any) -> list[Crew]:
return crew_instances


def get_flow_instance(module_attr: Any) -> Flow | None:
"""Check if a module attribute is a user-defined Flow subclass and return an instance.

Args:
module_attr: An attribute from a loaded module.

Returns:
A Flow instance if the attribute is a valid user-defined Flow subclass,
None otherwise.
"""
if (
isinstance(module_attr, type)
and issubclass(module_attr, Flow)
and module_attr is not Flow
):
try:
return module_attr()
except Exception:
return None
return None


_SKIP_DIRS = frozenset(
{".venv", "venv", ".git", "__pycache__", "node_modules", ".tox", ".nox"}
)


def get_flows(flow_path: str = "main.py") -> list[Flow]:
"""Get the flow instances from project files.

Walks the project directory looking for files matching ``flow_path``
(default ``main.py``), loads each module, and extracts Flow subclass
instances. Directories that are clearly not user source code (virtual
environments, ``.git``, etc.) are pruned to avoid noisy import errors.

Args:
flow_path: Filename to search for (default ``main.py``).

Returns:
A list of discovered Flow instances.
"""
flow_instances: list[Flow] = []
try:
current_dir = os.getcwd()
if current_dir not in sys.path:
sys.path.insert(0, current_dir)

src_dir = os.path.join(current_dir, "src")
if os.path.isdir(src_dir) and src_dir not in sys.path:
sys.path.insert(0, src_dir)

search_paths = [".", "src"] if os.path.isdir("src") else ["."]

for search_path in search_paths:
for root, dirs, files in os.walk(search_path):
dirs[:] = [
d
for d in dirs
if d not in _SKIP_DIRS and not d.startswith(".")
]
if flow_path in files and "cli/templates" not in root:
file_os_path = os.path.join(root, flow_path)
try:
spec = importlib.util.spec_from_file_location(
"flow_module", file_os_path
)
if not spec or not spec.loader:
continue

module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module

try:
spec.loader.exec_module(module)

for attr_name in dir(module):
module_attr = getattr(module, attr_name)
try:
if flow_instance := get_flow_instance(
module_attr
):
flow_instances.append(flow_instance)
except Exception: # noqa: S112
continue

if flow_instances:
break

except Exception: # noqa: S112
continue

except (ImportError, AttributeError):
continue

if flow_instances:
break

except Exception: # noqa: S110
pass

return flow_instances


def is_valid_tool(obj: Any) -> bool:
from crewai.tools.base_tool import Tool

Expand Down
46 changes: 46 additions & 0 deletions lib/crewai/src/crewai/events/types/flow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,52 @@ class FlowPlotEvent(FlowEvent):
type: str = "flow_plot"


class FlowInputRequestedEvent(FlowEvent):
"""Event emitted when a flow requests user input via ``Flow.ask()``.

This event is emitted before the flow suspends waiting for user input,
allowing UI frameworks and observability tools to know when a flow
needs user interaction.

Attributes:
flow_name: Name of the flow requesting input.
method_name: Name of the flow method that called ``ask()``.
message: The question or prompt being shown to the user.
metadata: Optional metadata sent with the question (e.g., user ID,
channel, session context).
"""

method_name: str
message: str
metadata: dict[str, Any] | None = None
type: str = "flow_input_requested"


class FlowInputReceivedEvent(FlowEvent):
"""Event emitted when user input is received after ``Flow.ask()``.

This event is emitted after the user provides input (or the request
times out), allowing UI frameworks and observability tools to track
input collection.

Attributes:
flow_name: Name of the flow that received input.
method_name: Name of the flow method that called ``ask()``.
message: The original question or prompt.
response: The user's response, or None if timed out / unavailable.
metadata: Optional metadata sent with the question.
response_metadata: Optional metadata from the provider about the
response (e.g., who responded, thread ID, timestamps).
"""

method_name: str
message: str
response: str | None = None
metadata: dict[str, Any] | None = None
response_metadata: dict[str, Any] | None = None
type: str = "flow_input_received"


class HumanFeedbackRequestedEvent(FlowEvent):
"""Event emitted when human feedback is requested.

Expand Down
3 changes: 3 additions & 0 deletions lib/crewai/src/crewai/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
from crewai.flow.visualization import (
FlowStructure,
Expand All @@ -22,6 +23,8 @@
"HumanFeedbackPending",
"HumanFeedbackProvider",
"HumanFeedbackResult",
"InputProvider",
"InputResponse",
"PendingFeedbackContext",
"and_",
"build_flow_structure",
Expand Down
85 changes: 76 additions & 9 deletions lib/crewai/src/crewai/flow/async_feedback/providers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Default provider implementations for human feedback.
"""Default provider implementations for human feedback and user input.

This module provides the ConsoleProvider, which is the default synchronous
provider that collects feedback via console input.
provider that collects both feedback (for ``@human_feedback``) and user input
(for ``Flow.ask()``) via console.
"""

from __future__ import annotations
Expand All @@ -16,30 +17,44 @@


class ConsoleProvider:
"""Default synchronous console-based feedback provider.
"""Default synchronous console-based provider for feedback and input.

This provider blocks execution and waits for console input from the user.
It displays the method output with formatting and prompts for feedback.
It serves two purposes:

- **Feedback** (``request_feedback``): Used by ``@human_feedback`` to
display method output and collect review feedback.
- **Input** (``request_input``): Used by ``Flow.ask()`` to prompt the
user with a question and collect a response.

This is the default provider used when no custom provider is specified
in the @human_feedback decorator.
in the ``@human_feedback`` decorator or on the Flow's ``input_provider``.

Example:
Example (feedback):
```python
from crewai.flow.async_feedback import ConsoleProvider


# Explicitly use console provider
@human_feedback(
message="Review this:",
provider=ConsoleProvider(),
)
def my_method(self):
return "Content to review"
```

Example (input):
```python
from crewai.flow import Flow, start

class MyFlow(Flow):
@start()
def gather_info(self):
topic = self.ask("What topic should we research?")
return topic
```
"""

def __init__(self, verbose: bool = True):
def __init__(self, verbose: bool = True) -> None:
"""Initialize the console provider.

Args:
Expand Down Expand Up @@ -124,3 +139,55 @@ def request_feedback(
finally:
# Resume live updates
formatter.resume_live_updates()

def request_input(
self,
message: str,
flow: Flow[Any],
metadata: dict[str, Any] | None = None,
) -> str | None:
"""Request user input via console (blocking).

Displays the prompt message with formatting and waits for the user
to type their response. Used by ``Flow.ask()``.

Unlike ``request_feedback``, this method does not display an
"OUTPUT FOR REVIEW" panel or emit feedback-specific events (those
are handled by ``ask()`` itself).

Args:
message: The question or prompt to display to the user.
flow: The Flow instance requesting input.
metadata: Optional metadata from the caller. Ignored by the
console provider (console has no concept of user routing).

Returns:
The user's input as a stripped string. Returns empty string
if user presses Enter without input. Never returns None
(console input is always available).
"""
from crewai.events.event_listener import event_listener

# Pause live updates during human input
formatter = event_listener.formatter
formatter.pause_live_updates()

try:
console = formatter.console

if self.verbose:
console.print()
console.print(message, style="yellow")
console.print()

response = input(">>> \n").strip()
else:
response = input(f"{message} ").strip()

# Add line break after input so formatter output starts clean
console.print()

return response
finally:
# Resume live updates
formatter.resume_live_updates()
Loading