Skip to content
274 changes: 200 additions & 74 deletions cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ async function sleep(ms: number): Promise<void> {
// ============================================================================

export async function runImportMarkdown(
ctx: { embedder?: import("./src/embedder.js").Embedder; store: MemoryStore },
ctx: { embedder?: import("./src/embedder.js").Embedder; store: MemoryStore; retriever: MemoryRetriever },
workspaceGlob: string | undefined,
options: {
dryRun?: boolean;
Expand All @@ -497,8 +497,9 @@ export async function runImportMarkdown(
dedup?: boolean;
minTextLength?: string;
importance?: string;
batchSize?: string;
}
): Promise<{ imported: number; skipped: number; foundFiles: number }> {
): Promise<{ imported: number; skipped: number; foundFiles: number; skippedShort: number; skippedDedup: number; errorCount: number; elapsedMs: number }> {
const openclawHome = options.openclawHome
? path.resolve(options.openclawHome)
: path.join(homedir(), ".openclaw");
Expand All @@ -507,6 +508,9 @@ export async function runImportMarkdown(
let imported = 0;
let skipped = 0;
let foundFiles = 0;
let skippedShort = 0;
let skippedDedup = 0;
let errorCount = 0;

if (!ctx.embedder) {
// [FIXED P1] Throw instead of process.exit(1) so CLI handler can catch it
Expand Down Expand Up @@ -649,99 +653,213 @@ export async function runImportMarkdown(
}

if (mdFiles.length === 0) {
return { imported: 0, skipped: 0, foundFiles: 0 };
return { imported: 0, skipped: 0, foundFiles: 0, skippedShort: 0, skippedDedup: 0, errorCount: 0, elapsedMs: 0 };
}

// NaN-safe parsing with bounds — invalid input falls back to defaults instead of
// silently passing NaN (e.g. "--min-text-length abc" would otherwise make every
// length check behave unexpectedly).
// ── Phase 1: parse all files → flat entry list ─────────────────────────────
// NaN-safe parsing with bounds
const minTextLength = clampInt(parseInt(options.minTextLength ?? "5", 10), 1, 10000);
const importanceDefault = Number.isFinite(parseFloat(options.importance ?? "0.7"))
? Math.max(0, Math.min(1, parseFloat(options.importance ?? "0.7")))
: 0.7;
const dedupEnabled = !!options.dedup;
// Commander.js --dedup=false is rejected as "unknown option", but the test helper
// passes string "false" directly. Both !== false (boolean) and !== "false" (string)
// must be checked so dedup is correctly disabled in both CLI and test contexts.
const dedupEnabled = (options.dedup as unknown as string | undefined | boolean) !== false
&& (options.dedup as unknown as string | undefined | boolean) !== "false";
Comment on lines +668 to +669
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep dedup off when --dedup is not provided

This condition enables deduplication when options.dedup is undefined, because only false and "false" are treated as disabled. The command defines --dedup as an optional boolean flag, so omitted values can be unset; with the current check, dedup runs by default and changes import behavior (unexpected duplicate skipping and much heavier retrieval work) even when users did not ask for dedup.

Useful? React with 👍 / 👎.

const batchSize = clampInt(parseInt(options.batchSize ?? "10", 10), 1, Infinity);
const FLUSH_THRESHOLD = 100; // bulkStore flush interval

type ParsedEntry = {
text: string;
effectiveScope: string;
filePath: string;
discoveredScope: string;
};

// Parse each file for memory entries (lines starting with "- ")
for (const { filePath, scope: discoveredScope } of mdFiles) {
let content: string;
try {
// 已在收集時用 withFileTypes: true 過濾,直接讀取
foundFiles++;
content = await fsPromises.readFile(filePath, "utf-8");
} catch (err) {
// I/O errors (permissions, corruption, etc.)
console.warn(` [skip] read failed: ${filePath}: ${(err as Error).message}`);
skipped++;
continue;
}
// (fix(import-markdown): CI測試登記 + .md目錄skip保護)
// Strip UTF-8 BOM (e.g. from Windows Notepad-saved files)
content = content.replace(/^\uFEFF/, "");
// Normalize line endings: handle both CRLF (\r\n) and LF (\n)
const lines = content.split(/\r?\n/);
const allEntries: ParsedEntry[] = [];
let parseErrors = 0;

// ── Phase 1a: parallel file reads ─────────────────────────────────────────
const fileContents = await Promise.all(
mdFiles.map(async ({ filePath, scope: discoveredScope }) => {
try {
const content = await fsPromises.readFile(filePath, "utf-8");
foundFiles++; // only count files we successfully read
return { filePath, discoveredScope, content, error: null };
} catch (err) {
console.warn(` [skip] read failed: ${filePath}: ${(err as Error).message}`);
parseErrors++;
return { filePath, discoveredScope, content: "", error: err };
}
})
);

// ── Phase 1b: extract bullet lines from each file ─────────────────────────
for (const { filePath, discoveredScope, content, error } of fileContents) {
if (error) { continue; } // parse error already counted in Phase 1a
// Strip UTF-8 BOM (e.g. from Windows Notepad-saved files)
const cleaned = content.replace(/^\uFEFF/, "");
const lines = cleaned.split(/\r?\n/);
for (const line of lines) {
// Skip non-memory lines
// Supports: "- text", "* text", "+ text" (standard Markdown bullet formats)
if (!/^[-*+]\s/.test(line)) continue;
const text = line.slice(2).trim();
if (text.length < minTextLength) { skipped++; continue; }

// Use --scope if provided, otherwise fall back to per-file discovered scope.
// This prevents cross-workspace leakage: without --scope, each workspace
// writes to its own scope instead of collapsing everything into "global".
if (text.length < minTextLength) { skipped++; skippedShort++; continue; }
const effectiveScope = options.scope || discoveredScope;
allEntries.push({ text, effectiveScope, filePath, discoveredScope });
}
}

// ── Deduplication check (scope-aware exact match) ───────────────────
// Run even in dry-run so --dry-run --dedup reports accurate counts
if (dedupEnabled) {
try {
const existing = await ctx.store.bm25Search(text, 5, [effectiveScope]);
if (existing.length > 0 && existing[0].entry.text === text) {
skipped++;
if (!options.dryRun) {
console.log(` [skip] already imported: ${text.slice(0, 60)}${text.length > 60 ? "..." : ""}`);
}
continue;
}
} catch (err) {
// [FIXED P2] Log warning so dedup failure is visible instead of silent
console.warn(` [import-markdown] dedup check failed (${err}), proceeding with import: ${text.slice(0, 60)}...`);
}
}
console.log(`[import] parsed ${allEntries.length} entries from ${mdFiles.length} file(s)`);

if (options.dryRun) {
console.log(` [dry-run] would import: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);
imported++;
continue;
}
if (allEntries.length === 0) {
return { imported: 0, skipped, foundFiles, skippedShort, skippedDedup: 0, errorCount: parseErrors, elapsedMs: 0 };
}

try {
const vector = await ctx.embedder!.embedPassage(text);
await ctx.store.store({
text,
vector,
importance: importanceDefault,
category: "other",
scope: effectiveScope,
metadata: JSON.stringify({ importedFrom: filePath, sourceScope: discoveredScope }),
});
imported++;
} catch (err) {
console.warn(` Failed to import: ${text.slice(0, 60)}... — ${err}`);
skipped++;
const t0 = Date.now();

// ── Phase 2a: dedup check — parallel retrieve() in chunks ──────────────────
// Uses retriever.retrieve() (vector + bm25 hybrid) instead of raw bm25Search.
// CHUNK=50 prevents overwhelming the retriever with too many parallel requests.
// On dedup hit: skip + count. On miss or error: proceed with import.
// [P2 fix] Runs even in dry-run so --dry-run --dedup shows accurate preview.
console.log(`[import] dedup check: ${dedupEnabled ? "enabled" : "disabled"}`);
const pendingEntries: ParsedEntry[] = [];
const dryRunDedupSkipped: ParsedEntry[] = [];

if (dedupEnabled) {
const CHUNK = 50;
for (let c = 0; c < allEntries.length; c += CHUNK) {
const chunk = allEntries.slice(c, c + CHUNK);
const results = await Promise.all(
chunk.map((e) =>
ctx.retriever
.retrieve({ query: e.text, limit: 20, scopeFilter: [e.effectiveScope] })
.then((hits) => ({ e, hits, ok: true as const }))
.catch((err) => {
console.warn(` [dedup] check failed: ${err}`);
return { e, hits: [] as any[], ok: false as const };
})
)
);
for (const { e, hits, ok } of results) {
if (!ok || hits.length === 0 || hits[0].entry.text !== e.text) {
pendingEntries.push(e);
} else {
// dedup hit — skippedDedup tracks it; do NOT count toward skipped (Phase 1b short entries only)
skippedDedup++;
dryRunDedupSkipped.push(e);
console.log(` [skip] already imported: ${e.text.slice(0, 60)}${e.text.length > 60 ? "..." : ""}`);
}
}
if (allEntries.length > 10) {
console.log(`[import] dedup check: ${Math.min(c + CHUNK, allEntries.length)}/${allEntries.length}`);
}
}
} else {
pendingEntries.push(...allEntries);
}

// ── Dry-run shortcut ───────────────────────────────────────────────────────
// [P2 fix] dedup has already run above; here we just report without importing.
if (options.dryRun) {
console.log(`\nDRY RUN — found ${foundFiles} files, ${imported} entries would be imported, ${skipped} skipped${dedupEnabled ? " [dedup enabled]" : ""}`);
} else {
console.log(`\nImport complete: ${imported} imported, ${skipped} skipped (scanned ${foundFiles} files)${dedupEnabled ? " [dedup enabled]" : ""}`);
const elapsed = Date.now() - t0;
for (const e of pendingEntries) {
console.log(` [dry-run] would import: ${e.text.slice(0, 80)}${e.text.length > 80 ? "..." : ""}`);
imported++;
}
for (const e of dryRunDedupSkipped) {
console.log(` [dry-run] would skip [dedup]: ${e.text.slice(0, 80)}${e.text.length > 80 ? "..." : ""}`);
}
console.log(`\nDRY RUN — found ${foundFiles} files, ${imported} entries would be imported, ${dryRunDedupSkipped.length} skipped [dedup enabled]`);
console.log(`\u2022 Skipped (short): ${skippedShort}`);
console.log(`\u2022 Skipped (dedup): ${dryRunDedupSkipped.length}`);
console.log(`\u2022 Elapsed: ${elapsed}ms`);
return { imported, skipped, foundFiles, skippedShort, skippedDedup: dryRunDedupSkipped.length, errorCount: parseErrors, elapsedMs: elapsed };
}

console.log(`[import] ${pendingEntries.length} entries need embedding (${skippedDedup} dedup hits)`);
if (pendingEntries.length === 0) {
const elapsed = Date.now() - t0;
console.log(`\nImport complete: 0 imported, ${skipped} skipped (scanned ${foundFiles} files)`);
return { imported: 0, skipped, foundFiles, skippedShort, skippedDedup, errorCount: parseErrors, elapsedMs: elapsed };
}
return { imported, skipped, foundFiles };

// ── Phase 2b: batch embed + bulkStore pipeline ────────────────────────────
// batchSize: number of texts sent to embedBatchPassage() per call (default: 10).
// Lower = less memory pressure on embedder; higher = fewer API round-trips.
// FLUSH_THRESHOLD=100: accumulated entries before calling bulkStore() once.
// Single lock acquisition per bulkStore call reduces lock contention.
let flushCount = 0;
const pendingFlush: Array<Omit<import("./src/store.js").MemoryEntry, "id" | "timestamp">> = [];

async function flushPending(): Promise<void> {
// splice out current batch; remaining entries stay in queue for next flush
if (pendingFlush.length === 0) return;
const batch = pendingFlush.splice(0, pendingFlush.length);
try {
await ctx.store.bulkStore(batch);
flushCount++;
imported += batch.length;
console.log(`[import] stored batch ${flushCount} (${batch.length} entries, total: ${imported})`);
} catch (err) {
// [P1 fix] count failed batch and continue so remaining batches are processed
errorCount += batch.length;
console.warn(`[import] bulkStore batch ${flushCount + 1} failed (${err}), ${batch.length} entries not stored`);
}
}

for (let i = 0; i < pendingEntries.length; i += batchSize) {
const batch = pendingEntries.slice(i, i + batchSize);
const batchIdx = Math.floor(i / batchSize) + 1;
const totalBatches = Math.ceil(pendingEntries.length / batchSize);

// Embed batch
const texts = batch.map((e) => e.text);
let vectors: number[][];
try {
vectors = await ctx.embedder!.embedBatchPassage(texts);
} catch (err) {
console.warn(`[import] batch ${batchIdx} embed failed (${err}), skipping ${batch.length} entries`);
skipped += batch.length;
errorCount += batch.length;
continue;
}

// Build entries and queue for bulkStore
for (let j = 0; j < batch.length; j++) {
const e = batch[j];
pendingFlush.push({
text: e.text,
vector: vectors[j],
importance: importanceDefault,
category: "other",
scope: e.effectiveScope,
metadata: JSON.stringify({ importedFrom: e.filePath, sourceScope: e.discoveredScope }),
});
}

// Flush when threshold reached or last batch
const isLastBatch = i + batchSize >= pendingEntries.length;
if (pendingFlush.length >= FLUSH_THRESHOLD || isLastBatch) {
await flushPending();
} else {
console.log(`[import] embedded batch ${batchIdx}/${totalBatches} (${batch.length} entries)`);
}
}

const elapsed = Date.now() - t0;

console.log(`\nImport complete: ${imported} imported, ${skipped} skipped (scanned ${foundFiles} files)${dedupEnabled ? " [dedup enabled]" : ""}`);
console.log(`\u2022 Skipped (short): ${skippedShort}`);
console.log(`\u2022 Skipped (dedup): ${skippedDedup}`);
console.log(`\u2022 Embed batches: ${Math.ceil(pendingEntries.length / batchSize)}`);
console.log(`\u2022 bulkStore calls: ${flushCount}`);
console.log(`\u2022 Elapsed: ${elapsed}ms`);

return { imported, skipped, foundFiles, skippedShort, skippedDedup, errorCount: errorCount + parseErrors, elapsedMs: elapsed };
}


export function registerMemoryCLI(program: Command, context: CLIContext): void {
let lastSearchDiagnostics: ReturnType<MemoryRetriever["getLastDiagnostics"]> =
Expand Down Expand Up @@ -1438,7 +1556,11 @@ export function registerMemoryCLI(program: Command, context: CLIContext): void {
)
.option(
"--dedup",
"Skip entries already in store (scope-aware exact match, requires store.bm25Search)",
"Enable dedup (default: enabled)",
)
.option(
"--no-dedup",
"Disable dedup",
)
.option(
"--min-text-length <n>",
Expand All @@ -1450,14 +1572,18 @@ export function registerMemoryCLI(program: Command, context: CLIContext): void {
"Importance score for imported entries, 0.0-1.0 (default: 0.7)",
"0.7",
)
.option(
"--batch-size <n>",
"Embedding batch size for batch import (default: 10)",
"10",
)
.action(async (workspaceGlob, options) => {
// [FIXED P1] Wrap with try/catch — runImportMarkdown now throws instead of process.exit(1)
try {
const result = await runImportMarkdown(context, workspaceGlob, options);
if (result.foundFiles === 0) {
console.log("No Markdown memory files found.");
}
// Summary is printed inside runImportMarkdown (removed duplicate output)
} catch (err) {
console.error(`import-markdown failed: ${err}`);
process.exit(1);
Expand Down
Loading
Loading