Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Thumbs.db
.env
.env.local
.env.*.local
data-agent-backend/src/main/resources/application.properties

# Logs
logs
Expand All @@ -76,16 +77,18 @@ log/
*.bak
*.cache

# AI
.claude
.spec-workflow

# ==============================
# Frontend (Node/Vue)
# ==============================
node_modules/
dist/
dist-ssr/
*.local

# Editor directories and files
*.suo
*.ntvs*
*.njsproj
*.sln
Expand All @@ -103,3 +106,6 @@ yarn-error.log
yarn-debug.log
yarn-integrity
.yarn-integrity

.codegraph

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class AgentService {
private final ModelProperties modelProperties;
private final SessionService sessionService;
private final SkillLoaderService skillLoaderService;

private Toolkit toolkit;
private SkillBox skillBox;

Expand All @@ -69,14 +70,12 @@ public String chat(String sessionId, String userInput) {
ReActAgent agent = createAgent();

Session session = sessionService.getOrCreateSession(sessionId);
agent.loadIfExists(session, sessionId);
agent.loadIfExists(session, sessionService.buildNamespacedSessionId(sessionId));

Msg userMsg = Msg.builder().textContent(userInput).build();

Msg response = agent.call(userMsg).block();

agent.saveTo(session, sessionId);

agent.saveTo(session, sessionService.buildNamespacedSessionId(sessionId));
return MsgUtils.getTextContent(response);
}

Expand All @@ -85,7 +84,7 @@ public Flux<ChatStreamEvent> chatStream(
ReActAgent agent = createAgent();

Session session = sessionService.getOrCreateSession(sessionId);
agent.loadIfExists(session, sessionId);
agent.loadIfExists(session, sessionService.buildNamespacedSessionId(sessionId));

Msg userMsg;
if (toolResults != null && !toolResults.isEmpty()) {
Expand Down Expand Up @@ -117,19 +116,50 @@ public Flux<ChatStreamEvent> chatStream(

return agent.stream(userMsg, streamOptions)
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> agent.saveTo(session, sessionId))
.doFinally(
signalType ->
agent.saveTo(
session,
sessionService.buildNamespacedSessionId(sessionId)))
.flatMapIterable(EventConverter::map);
}

private ReActAgent createAgent() {
return ReActAgent.builder()
.name("DataAgent")
.sysPrompt("你是一个数据助手,可以帮助用户查询数据库中的数据。")
.sysPrompt(
"""
你是一个数据分析助手,负责通过工具查询数据库并回答用户问题。

工作流程:
1. 先调用 get_tables,获取当前可见表的结构化列表。每个表已附带它与其他表的 relations(关系),不要再到其他工具找关系。
2. get_tables 返回分页数据时,必须检查 data.hasNext、data.totalPages 和 data.items。
如果当前页不足以覆盖候选表,必须继续翻页,直到拿到足够的表与关系信息,再决定下一步。
3. 根据用户问题选择相关表,再调用 get_table_schema 获取目标表的列结构(columns)。
4. get_table_schema 中的 columns 是分页数据。
必须检查 columns.hasNext、columns.totalPages,必要时继续翻页,直到拿到足够的列信息。
5. 基于 get_tables 中的 relations 与 get_table_schema 中的 columns 生成合适的 SELECT SQL。
6. 调用 execute_sql 执行 SQL。
7. 根据查询结果回答用户问题。

工具返回协议:
1. 所有工具都返回 success/data/error 三段结构。
2. 必须先检查 success。
3. 只有 success=true 时,才能使用 data 中的内容继续推理。
4. 如果 success=false,必须读取 error.code 和 error.message,先修正问题、换表、重试或向用户解释原因,不能把 error 当成正常数据继续生成 SQL。
5. 如果 data 是分页结构,必须优先检查 items、hasNext、totalPages、page、pageSize,不能把单页数据误当成全量数据。
6. 当分页数据不足以支撑结论时,必须显式继续翻页,不能基于不完整 schema 猜测表、列或关系。

SQL 约束:
1. 只允许 SELECT。
2. 生成 SQL 之前,必须先查看相关表结构,确认表名、列名、类型与可见范围。
3. 如果工具返回失败,不要跳过失败直接猜测表结构或继续执行 SQL。
""")
.model(modelFactory.getInstance(modelProperties))
.toolkit(toolkit)
.skillBox(skillBox)
.memory(new InMemoryMemory())
.maxIters(10)
.maxIters(16)
.enablePendingToolRecovery(true)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
@Service
public class SessionService {

private static final String SESSION_DATABASE_NAME = "data_agent";
private static final String SESSION_TABLE_NAME = "agentscope_sessions";
private static final String SESSION_KEY_PREFIX = "data-agent:";

private final DataSource dataSource;
private final Map<String, Session> sessionCache = new ConcurrentHashMap<>();

Expand All @@ -62,26 +66,36 @@ public SessionService(DataSource dataSource) {
public Session getOrCreateSession(String sessionId) {
return sessionCache.computeIfAbsent(
sessionId,
k -> new MysqlSession(dataSource, "data_agent", "agentscope_sessions", false));
k ->
new MysqlSession(
dataSource, SESSION_DATABASE_NAME, SESSION_TABLE_NAME, false));
}

public String buildNamespacedSessionId(String sessionId) {
return SESSION_KEY_PREFIX + sessionId;
}

public SimpleSessionKey namespacedSessionKey(String sessionId) {
return SimpleSessionKey.of(buildNamespacedSessionId(sessionId));
}

public List<Msg> getSessionDebug(String sessionId) {
Session session = getOrCreateSession(sessionId);
if (!session.exists(SimpleSessionKey.of(sessionId))) {
if (!session.exists(namespacedSessionKey(sessionId))) {
return Collections.emptyList();
}
return session.getList(SimpleSessionKey.of(sessionId), "memory_messages", Msg.class);
return session.getList(namespacedSessionKey(sessionId), "memory_messages", Msg.class);
}

public List<TurnItem> getSessionHistory(String sessionId) {
Session session = getOrCreateSession(sessionId);

if (!session.exists(SimpleSessionKey.of(sessionId))) {
if (!session.exists(namespacedSessionKey(sessionId))) {
return Collections.emptyList();
}

List<Msg> messages =
session.getList(SimpleSessionKey.of(sessionId), "memory_messages", Msg.class);
session.getList(namespacedSessionKey(sessionId), "memory_messages", Msg.class);

List<TurnItem> turns = new ArrayList<>();
List<ContentBlock> agentBlocks = null;
Expand Down Expand Up @@ -195,7 +209,7 @@ private static ChatStreamEvent thinkingEvent(String thinking) {
public void clearSession(String sessionId) {
Session session = sessionCache.remove(sessionId);
if (session != null) {
session.delete(SimpleSessionKey.of(sessionId));
session.delete(namespacedSessionKey(sessionId));
}
}

Expand All @@ -205,7 +219,7 @@ public void clearAllSessions() {

public List<SessionInfo> listSessions() {
MysqlSession session =
new MysqlSession(dataSource, "data_agent", "agentscope_sessions", false);
new MysqlSession(dataSource, SESSION_DATABASE_NAME, SESSION_TABLE_NAME, false);
Set<SessionKey> keys = session.listSessionKeys();

if (keys == null || keys.isEmpty()) {
Expand All @@ -229,7 +243,11 @@ public List<SessionInfo> listSessions() {
List<SessionInfo> result = new ArrayList<>();
for (SessionKey key : keys) {
String sid = key.toIdentifier();
if (!sid.startsWith(SESSION_KEY_PREFIX)) {
continue;
}
List<Msg> messages = session.getList(key, "memory_messages", Msg.class);
String sessionId = sid.substring(SESSION_KEY_PREFIX.length());

String title = "";
if (messages != null) {
Expand All @@ -244,11 +262,11 @@ public List<SessionInfo> listSessions() {
}
}
if (title.isEmpty()) {
title = sid.length() > 20 ? sid.substring(0, 20) : sid;
title = sessionId.length() > 20 ? sessionId.substring(0, 20) : sessionId;
}

String[] times = timestamps.getOrDefault(sid, new String[] {"", ""});
result.add(new SessionInfo(sid, title, times[0], times[1]));
result.add(new SessionInfo(sessionId, title, times[0], times[1]));
}

result.sort((a, b) -> b.lastActiveAt().compareTo(a.lastActiveAt()));
Expand Down
Loading
Loading