Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ repos:
--disable=C3001,
--disable=R1702,
--disable=R0912,
--disable=R0915,
--max-line-length=120,
--max-statements=75,
]
Expand Down
9 changes: 9 additions & 0 deletions docs/todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
1. 去掉复杂的save_load_cache
2. 去掉tool中的input和output参数
3. 新建自己的测试函数

4. base_context 增加dict函数
5. token 之前 self.llm
6. cache 增加reset path的能力

7. 完善文档
3 changes: 0 additions & 3 deletions docs/zh/guide/async_op_llm_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ import json

@C.register_op()
class QAOp(BaseAsyncOp):
file_path: str = __file__ # 必须设置,用于自动查找 prompt 文件

async def async_execute(self):
"""执行问答逻辑"""
# 1. 读取输入
Expand Down Expand Up @@ -156,7 +154,6 @@ if __name__ == "__main__":
6. **调用 LLM**:使用 `await self.llm.achat(messages=messages, ...)`
7. **处理响应**:使用 `callback_fn` 处理或转换响应,返回处理后的结果
8. **应用上下文**:必须在 `FlowLLMApp()` 上下文里调用
9. **file_path**:Op 类中必须设置 `file_path = __file__`,用于自动查找 prompt 文件

---

Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/cmd_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ from flowllm.core.op import BaseAsyncOp

@C.register_op()
class EchoOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
text = self.context.get("text", "")
self.context.response.answer = f"echo: {text}"
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/http_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ from flowllm.core.op import BaseAsyncOp

@C.register_op()
class EchoOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
text = self.context.get("text", "")
self.context.response.answer = f"echo: {text}"
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/http_stream_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ from flowllm.core.schema import FlowStreamChunk, Message

@C.register_op()
class StreamChatOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
messages = self.context.messages
system_prompt = self.context.system_prompt
Expand Down
1 change: 0 additions & 1 deletion docs/zh/guide/mcp_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ from flowllm.core.op import BaseAsyncOp
@C.register_op()
class MockSearchOp(BaseAsyncOp):
"""Mock search operation that uses LLM to generate realistic search results."""
file_path: str = __file__

async def async_execute(self):
query = self.context.query
Expand Down
14 changes: 12 additions & 2 deletions flowllm/core/llm/lite_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ def stream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"LiteLLM.stream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down Expand Up @@ -217,7 +222,12 @@ async def astream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"LiteLLM.astream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down
14 changes: 12 additions & 2 deletions flowllm/core/llm/openai_compatible_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ def stream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"OpenAICompatibleLLM.stream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down Expand Up @@ -208,7 +213,12 @@ async def astream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"OpenAICompatibleLLM.astream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down
70 changes: 6 additions & 64 deletions flowllm/core/op/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import copy
import inspect
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, List, Union
Expand Down Expand Up @@ -78,8 +79,6 @@ def execute(self):
```
"""

file_path: str = __file__

def __new__(cls, *args, **kwargs):
"""Create a new instance and save initialization arguments for copying.

Expand Down Expand Up @@ -149,7 +148,9 @@ def __init__(
self.raise_exception: bool = raise_exception
self.enable_multithread: bool = enable_multithread
self.language: str = language or C.language
default_prompt_path: str = self.file_path.replace("op.py", "prompt.yaml")

subclass_file_path: str = inspect.getfile(self.__class__)
default_prompt_path: str = subclass_file_path.replace("op.py", "prompt.yaml")
self.prompt_path: Path = Path(prompt_path if prompt_path else default_prompt_path)
self.prompt = PromptHandler(language=self.language).load_prompt_by_file(self.prompt_path)
self._llm: BaseLLM | str = llm
Expand Down Expand Up @@ -188,46 +189,12 @@ def cache(self):
self._cache = CacheHandler(self.cache_path.format(op_name=self.short_name))
return self._cache

def save_load_cache(self, key: str, fn: Callable, **kwargs):
"""Save or load from cache.

If caching is enabled, checks cache first. If not found, executes the
function and saves the result. Otherwise, executes the function directly.

Args:
key: Cache key for storing/retrieving the result
fn: Function to execute if cache miss
**kwargs: Additional arguments for cache load operation

Returns:
Cached result if available, otherwise result from function execution
"""
if self.enable_cache:
result = self.cache.load(key, **kwargs)
if result is None:
result = fn()
self.cache.save(key, result, expire_hours=self.cache_expire_hours)
else:
logger.info(f"load {key} from cache")
else:
result = fn()

return result

def before_execute(self):
"""Hook method called before execute(). Override in subclasses.

This method is called automatically by `call()` before executing
the main `execute()` method. Use this to perform any setup,
validation, or preprocessing needed before execution.

Example:
```python
def before_execute(self):
# Validate inputs
if not self.context.get("input"):
raise ValueError("Input is required")
```
"""

def after_execute(self):
Expand All @@ -236,22 +203,11 @@ def after_execute(self):
This method is called automatically by `call()` after successfully
executing the main `execute()` method. Use this to perform any
cleanup, post-processing, or result transformation.

Example:
```python
def after_execute(self):
# Post-process results
if self.context.response:
self.context.response.answer = self.context.response.answer.upper()
```
"""

@abstractmethod
def execute(self):
"""Main execution method. Must be implemented in subclasses.

Returns:
Execution result
"""

def default_execute(self, e: Exception = None, **kwargs):
Expand All @@ -260,24 +216,10 @@ def default_execute(self, e: Exception = None, **kwargs):
This method is called when `execute()` fails and `raise_exception`
is False. It provides a fallback mechanism to return a default result
instead of raising an exception.

Args:
e: The exception that was raised during execution (if any)
**kwargs: Additional keyword arguments

Returns:
Default execution result

Example:
```python
def default_execute(self, e: Exception = None, **kwargs):
logger.warning(f"Execution failed: {e}, returning default result")
return {"status": "error", "message": str(e)}
```
"""

@staticmethod
def build_context(context: FlowContext = None, **kwargs):
def build_context(context: FlowContext = None, **kwargs) -> FlowContext:
"""Build or update a flow context.

Args:
Expand Down Expand Up @@ -592,7 +534,7 @@ def vector_store(self) -> BaseVectorStore:
return self._vector_store

@property
def service_config_metadata(self) -> dict:
def service_metadata(self) -> dict:
"""Get the service config metadata for this operation.

Returns:
Expand Down
32 changes: 32 additions & 0 deletions flowllm/core/schema/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ def simple_dump(self, add_reasoning: bool = True) -> dict:

return result

def format_message(
self,
i: int | None = None,
add_time_created: bool = False,
use_name_first: bool = False,
add_reasoning_content: bool = True,
add_tool_calls: bool = True
) -> str:
content = ""
if i is not None:
content += f"round{i} "

if add_time_created:
content += f"[{self.time_created}] "

if use_name_first:
content += f"{self.name or self.role.value}:\n"
else:
content += f"{self.role.value}:\n"

if add_reasoning_content and self.reasoning_content:
content += self.reasoning_content + "\n"

if self.content:
content += self.content + "\n"

if add_tool_calls and self.tool_calls:
for tool_call in self.tool_calls:
content += f" - tool_call={tool_call.name} params={tool_call.arguments}\n"

return content.strip()


class Trajectory(BaseModel):
"""Represents a conversation trajectory with messages and optional scoring."""
Expand Down
19 changes: 18 additions & 1 deletion flowllm/core/storage/cache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ class CacheHandler:
- Recording and managing update timestamps
"""

def __init__(self, cache_dir: str = "cache"):
def __init__(self, cache_dir: Union[str, Path] = "cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "metadata.json"
self.metadata = {}
self._load_metadata()

def set_cache_dir(self, cache_dir: Union[str, Path]):
"""
Set a new cache directory and reload metadata

Args:
cache_dir: New cache directory path
"""
new_cache_dir = Path(cache_dir)
new_cache_dir.mkdir(parents=True, exist_ok=True)

self.cache_dir = new_cache_dir
self.metadata_file = self.cache_dir / "metadata.json"
self.metadata = {}
self._load_metadata()

logger.info(f"Cache directory changed to: {self.cache_dir}")

def _load_metadata(self):
"""Load metadata"""
if self.metadata_file.exists():
Expand Down
21 changes: 16 additions & 5 deletions flowllm/core/vector_store/pgvector_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,23 @@ def _build_sql_filters(
# Handle special keys that are stored as direct columns
if key == "unique_id":
# unique_id is a direct column, not in metadata JSONB
if use_async:
conditions.append(f"unique_id = ${param_idx}")
# Support both single value and list of values
if isinstance(filter_value, list):
if use_async:
placeholders = ", ".join(f"${param_idx + i}" for i in range(len(filter_value)))
conditions.append(f"unique_id IN ({placeholders})")
else:
placeholders = ", ".join(["%s"] * len(filter_value))
conditions.append(f"unique_id IN ({placeholders})")
params.extend([str(v) for v in filter_value])
param_idx += len(filter_value)
else:
conditions.append("unique_id = %s")
params.append(str(filter_value))
param_idx += 1
if use_async:
conditions.append(f"unique_id = ${param_idx}")
else:
conditions.append("unique_id = %s")
params.append(str(filter_value))
param_idx += 1
continue

# Strip "metadata." prefix if present (since we're already accessing metadata column)
Expand Down
21 changes: 19 additions & 2 deletions flowllm/core/vector_store/qdrant_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,26 @@ def _build_qdrant_filters(filter_dict: Optional[Dict[str, Any]] = None):
for key, filter_value in filter_dict.items():
# Handle special keys that are stored at payload root level
if key == "unique_id":
qdrant_key = "original_id"
# unique_id is stored as original_id in Qdrant payload
# Support both single value and list of values
if isinstance(filter_value, list):
conditions.append(
FieldCondition(
key="original_id",
match=MatchAny(any=filter_value),
),
)
else:
conditions.append(
FieldCondition(
key="original_id",
match=MatchValue(value=filter_value),
),
)
continue

# Handle nested keys by prefixing with metadata.
elif not key.startswith("metadata."):
if not key.startswith("metadata."):
qdrant_key = f"metadata.{key}"
else:
qdrant_key = key
Expand Down
2 changes: 0 additions & 2 deletions flowllm/extensions/file_tool/edit_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ class EditOp(BaseAsyncToolOp):
is specified. Supports creating new files when old_string is empty.
"""

file_path = __file__

def __init__(self, **kwargs):
kwargs.setdefault("raise_exception", False)
super().__init__(**kwargs)
Expand Down
Loading
Loading