Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 223 additions & 27 deletions apps/memos-local-plugin/adapters/openclaw/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import type {

const TOOL_RESULT_ROLES = new Set([
"toolResult", // pi-ai canonical
"toolresult", // lower-case gateway/UI normalizer variants
"tool", // OpenAI legacy
"tool_result", // some Anthropic SDKs / older bridges
"tool_response", // older variants
Expand Down Expand Up @@ -156,13 +157,13 @@ export function flattenMessages(input: unknown[] | undefined): FlatMessage[] {
textBuf += (textBuf ? "\n" : "") + b.text;
} else if (type === "thinking" && typeof b.thinking === "string") {
thinkingBuf += (thinkingBuf ? "\n\n" : "") + b.thinking;
} else if (type === "toolCall") {
} else if (isToolCallBlockType(type)) {
inlineToolCalls.push({
role: "tool_call",
content: "",
toolName: typeof b.name === "string" ? b.name : "unknown",
toolCallId: typeof b.id === "string" ? b.id : undefined,
toolInput: b.arguments,
toolCallId: pickToolCallId(b, m),
toolInput: pickToolInput(b),
ts,
});
} else if (!type && typeof b.text === "string") {
Expand Down Expand Up @@ -246,6 +247,63 @@ export function flattenMessages(input: unknown[] | undefined): FlatMessage[] {
return out;
}

function isToolCallBlockType(type: string): boolean {
const normalized = type.trim().toLowerCase();
return (
normalized === "toolcall" ||
normalized === "tool_call" ||
normalized === "tooluse" ||
normalized === "tool_use" ||
normalized === "functioncall" ||
normalized === "function_call"
);
}

function pickToolCallId(
block: Record<string, unknown>,
message?: Record<string, unknown>,
): string | undefined {
return firstString(
block.id,
block.toolCallId,
block.tool_call_id,
block.callId,
block.call_id,
block.toolUseId,
block.tool_use_id,
message?.toolCallId,
message?.tool_call_id,
);
}

function firstString(...values: unknown[]): string | undefined {
for (const value of values) {
if (typeof value === "string" && value.trim()) return value;
}
return undefined;
}

function pickToolInput(block: Record<string, unknown>): unknown {
if ("arguments" in block) return block.arguments;
if ("args" in block) return block.args;
if ("input" in block) return block.input;
if (typeof block.partialJson === "string") {
try {
return JSON.parse(block.partialJson);
} catch {
return block.partialJson;
}
}
if (typeof block.partialArgs === "string") {
try {
return JSON.parse(block.partialArgs);
} catch {
return block.partialArgs;
}
}
return undefined;
}

/**
* Extract the visible text from a `Message.content` value, supporting
* both the pi-ai shapes (string OR `(TextContent|ImageContent)[]`) and
Expand Down Expand Up @@ -526,9 +584,26 @@ export function extractTurn(messages: FlatMessage[], now: number): CapturedTurn
const userText = messages[lastUserIdx].content.trim();
const tail = messages.slice(lastUserIdx + 1);

const pendingCalls = new Map<string, Partial<ToolCallDTO> & { _id?: string }>();
type PendingToolCall = Partial<ToolCallDTO> & { _id?: string };
const pendingCalls = new Map<string, PendingToolCall[]>();
const toolCalls: ToolCallDTO[] = [];

const enqueuePendingCall = (key: string, stub: PendingToolCall): void => {
const queue = pendingCalls.get(key);
if (queue) {
queue.push(stub);
} else {
pendingCalls.set(key, [stub]);
}
};
const takePendingCall = (key: string): PendingToolCall | undefined => {
const queue = pendingCalls.get(key);
if (!queue || queue.length === 0) return undefined;
const stub = queue.shift();
if (queue.length === 0) pendingCalls.delete(key);
return stub;
};

// Two separate buffers accumulate content not yet assigned to a tool.
//
// `pendingThinking`: Claude extended-thinking blocks (`ThinkingContent`)
Expand Down Expand Up @@ -561,7 +636,7 @@ export function extractTurn(messages: FlatMessage[], now: number): CapturedTurn
pendingAssistant = [];

const key = m.toolCallId ?? m.toolName;
pendingCalls.set(key, {
enqueuePendingCall(key, {
_id: m.toolCallId,
name: m.toolName,
input: m.toolInput,
Expand All @@ -572,7 +647,7 @@ export function extractTurn(messages: FlatMessage[], now: number): CapturedTurn
}
if (m.role === "tool_result") {
const key = m.toolCallId ?? m.toolName ?? "";
const stub = pendingCalls.get(key);
const stub = key ? takePendingCall(key) : undefined;
const errorCode = stub
? m.errorCode ?? (m.isError ? "tool_error" : undefined)
: m.errorCode ?? (m.isError ? "tool_error" : undefined);
Expand All @@ -581,25 +656,28 @@ export function extractTurn(messages: FlatMessage[], now: number): CapturedTurn
input: stub?.input,
output: m.content || undefined,
errorCode,
toolCallId: stub?._id ?? m.toolCallId,
startedAt: stub?.startedAt ?? (m.ts ?? now),
endedAt: m.ts ?? now,
thinkingBefore: stub?.thinkingBefore,
});
if (key) pendingCalls.delete(key);
continue;
}
}

for (const stub of pendingCalls.values()) {
if (!stub.name) continue;
toolCalls.push({
name: stub.name,
input: stub.input,
output: undefined,
startedAt: stub.startedAt ?? now,
endedAt: now,
thinkingBefore: stub.thinkingBefore,
});
for (const queue of pendingCalls.values()) {
for (const stub of queue) {
if (!stub.name) continue;
toolCalls.push({
name: stub.name,
input: stub.input,
output: undefined,
toolCallId: stub._id,
startedAt: stub.startedAt ?? now,
endedAt: now,
thinkingBefore: stub.thinkingBefore,
});
}
}

const agentThinking = pendingThinking.join("\n\n").trim();
Expand All @@ -611,6 +689,56 @@ export function extractTurn(messages: FlatMessage[], now: number): CapturedTurn
};
}

function mergeToolCalls(
captured: readonly ToolCallDTO[],
observed: readonly ToolCallDTO[],
): ToolCallDTO[] {
if (observed.length === 0) return [...captured];
const out = captured.map((tc) => ({ ...tc }));
for (const obs of observed) {
const idx = out.findIndex((existing) => toolCallsMatch(existing, obs));
if (idx >= 0) {
out[idx] = mergeToolCall(out[idx]!, obs);
} else {
out.push({ ...obs });
}
}
return out.sort((a, b) => {
const at = a.startedAt ?? a.endedAt ?? 0;
const bt = b.startedAt ?? b.endedAt ?? 0;
return at - bt;
});
}

function mergeToolCall(existing: ToolCallDTO, observed: ToolCallDTO): ToolCallDTO {
return {
...observed,
...existing,
input: existing.input ?? observed.input,
output: existing.output ?? observed.output,
errorCode: existing.errorCode ?? observed.errorCode,
toolCallId: existing.toolCallId ?? observed.toolCallId,
startedAt: existing.startedAt ?? observed.startedAt,
endedAt: existing.endedAt ?? observed.endedAt,
thinkingBefore: existing.thinkingBefore ?? observed.thinkingBefore,
assistantTextBefore: existing.assistantTextBefore ?? observed.assistantTextBefore,
};
}

function toolCallsMatch(a: ToolCallDTO, b: ToolCallDTO): boolean {
if (a.toolCallId && b.toolCallId) return a.toolCallId === b.toolCallId;
if (a.toolCallId || b.toolCallId) return false;
return a.name === b.name && stableStringify(a.input) === stableStringify(b.input);
}

function stableStringify(value: unknown): string {
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}

// ─── Session identity ──────────────────────────────────────────────────────

/**
Expand Down Expand Up @@ -781,7 +909,16 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
let episodeBindingSeq = 0;
// Per-toolCallId start timestamps so `after_tool_call` can compute duration
// when the host doesn't populate `durationMs`.
const toolCallStartedAt = new Map<string, { ts: number; sessionId: SessionId }>();
const toolCallStartedAt = new Map<string, {
ts: number;
sessionId: SessionId;
runId?: string;
toolName?: string;
params?: Record<string, unknown>;
}>();
type ObservedToolCall = ToolCallDTO & { runId?: string; order: number };
const observedToolCallsBySession = new Map<SessionId, ObservedToolCall[]>();
let observedToolCallSeq = 0;
const spawnedSubagents = new Map<string, {
event: SubagentSpawnedEvent;
ctx: PluginHookSubagentContext;
Expand All @@ -791,6 +928,40 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
}>();
const pendingSubagentSessions = new Set<SessionId>();

function rememberObservedToolCall(
sessionId: SessionId,
runId: string | undefined,
tc: ToolCallDTO,
): void {
const list = observedToolCallsBySession.get(sessionId) ?? [];
list.push({ ...tc, runId, order: ++observedToolCallSeq });
observedToolCallsBySession.set(sessionId, list.slice(-200));
}

function takeObservedToolCalls(
sessionId: SessionId,
runId: string | undefined,
): ToolCallDTO[] {
const list = observedToolCallsBySession.get(sessionId) ?? [];
if (list.length === 0) return [];

const matched: ObservedToolCall[] = [];
const rest: ObservedToolCall[] = [];
for (const tc of list) {
const sameRun = runId ? tc.runId === runId || !tc.runId : true;
if (sameRun) matched.push(tc);
else rest.push(tc);
}

if (rest.length > 0) observedToolCallsBySession.set(sessionId, rest);
else observedToolCallsBySession.delete(sessionId);

return matched
.slice()
.sort((a, b) => (a.startedAt ?? a.order) - (b.startedAt ?? b.order))
.map(({ runId: _runId, order: _order, ...tc }) => tc);
}

async function ensureSession(
agentId: string | undefined,
sessionKey: string | undefined,
Expand Down Expand Up @@ -1052,8 +1223,12 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
});
return;
}
const toolCalls = mergeToolCalls(
turn.toolCalls,
takeObservedToolCalls(sessionId, ctx.runId),
);
const isSubagentAnnouncement = isOpenClawSubagentAnnouncementPrompt(turn.userText);
const hasSubagentSpawn = turn.toolCalls.some((tc) => tc.name === "sessions_spawn");
const hasSubagentSpawn = toolCalls.some((tc) => tc.name === "sessions_spawn");

// Resolve (or lazily open) the target episode. Three cases:
// 1. `before_prompt_build` already ran this turn → we have the
Expand Down Expand Up @@ -1088,7 +1263,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
episodeId,
agentText: turn.agentText,
agentThinking: turn.agentThinking,
toolCalls: turn.toolCalls,
toolCalls,
reflection: turn.reflection,
contextHints: { namespace },
ts: now(),
Expand All @@ -1101,7 +1276,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
sessionId,
traceId: res.traceId,
episodeId: res.episodeId,
tools: turn.toolCalls.length,
tools: toolCalls.length,
success: event.success,
durationMs: event.durationMs,
});
Expand All @@ -1120,6 +1295,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
await opts.core.closeSession(sessionId);
messageCursor.delete(sessionId);
forgetSessionBindings(sessionId);
observedToolCallsBySession.delete(sessionId);
lastUserTextBySession.delete(sessionId);
}
} catch (err) {
Expand All @@ -1131,13 +1307,20 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
}

function handleBeforeToolCall(
_event: BeforeToolCallEvent,
event: BeforeToolCallEvent,
ctx: PluginHookToolContext,
): void {
if (!ctx.toolCallId) return;
const toolCallId = ctx.toolCallId ?? event.toolCallId;
if (!toolCallId) return;
if (isEphemeralSessionKey(ctx.sessionKey)) return;
const sessionId = bridgeSessionId(ctx.agentId ?? "main", ctx.sessionKey ?? "default");
toolCallStartedAt.set(ctx.toolCallId, { ts: now(), sessionId });
toolCallStartedAt.set(toolCallId, {
ts: now(),
sessionId,
runId: ctx.runId ?? event.runId,
toolName: ctx.toolName ?? event.toolName,
params: event.params,
});
}

async function handleAfterToolCall(
Expand All @@ -1147,8 +1330,9 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
if (isEphemeralSessionKey(ctx.sessionKey)) return;
try {
const sessionId = bridgeSessionId(ctx.agentId ?? "main", ctx.sessionKey ?? "default");
const started = ctx.toolCallId ? toolCallStartedAt.get(ctx.toolCallId) : undefined;
if (ctx.toolCallId) toolCallStartedAt.delete(ctx.toolCallId);
const toolCallId = ctx.toolCallId ?? event.toolCallId;
const started = toolCallId ? toolCallStartedAt.get(toolCallId) : undefined;
if (toolCallId) toolCallStartedAt.delete(toolCallId);

const endedAt = now();
const durationMs =
Expand All @@ -1157,11 +1341,22 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
: started
? Math.max(0, endedAt - started.ts)
: 0;
const toolName = event.toolName || started?.toolName || ctx.toolName || "unknown";
const startedAt = started?.ts;
rememberObservedToolCall(sessionId, ctx.runId ?? event.runId ?? started?.runId, {
name: toolName,
input: event.params ?? started?.params,
output: event.result,
errorCode: event.error,
toolCallId,
startedAt,
endedAt,
});

opts.core.recordToolOutcome({
sessionId,
episodeId: currentEpisodeId(sessionId),
tool: event.toolName,
tool: toolName,
success: !event.error,
errorCode: event.error,
durationMs,
Expand Down Expand Up @@ -1211,6 +1406,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
await opts.core.closeSession(sessionId);
messageCursor.delete(sessionId);
forgetSessionBindings(sessionId);
observedToolCallsBySession.delete(sessionId);
lastUserTextBySession.delete(sessionId);
opts.log.debug("memos.session.ended", {
sessionId: event.sessionId,
Expand Down
Loading