Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/core/src/__tests__/schemas.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,23 @@ describe("runMetaSchema", () => {
};
expect(() => runMetaSchema.parse(valid)).not.toThrow();
});

it("accepts partial process run meta with failed file count", () => {
const valid = {
runId: "20260401-a1b2",
projectId: "test",
rootPath: "/path",
createdAt: "2026-04-01T14:30:52.000Z",
completedAt: "2026-04-01T15:00:00.000Z",
type: "process",
phase: "partial",
processorConfig: {
agentType: "claude-agent-sdk",
model: "claude-opus-4-6",
modelConfig: { maxTurns: 50 },
},
stats: { filesProcessed: 3, filesFailed: 2, findingsCount: 1 },
};
expect(() => runMetaSchema.parse(valid)).not.toThrow();
});
});
2 changes: 1 addition & 1 deletion packages/core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export function readRunMeta(projectId: string, runId: string): RunMeta {
export function completeRun(
projectId: string,
runId: string,
phase: "done" | "error",
phase: "done" | "partial" | "error",
stats?: Partial<RunMeta["stats"]>,
): void {
const meta = readRunMeta(projectId, runId);
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export const runMetaSchema = z.object({
createdAt: z.string(),
completedAt: z.string().optional(),
type: z.enum(["scan", "process", "revalidate"]),
phase: z.enum(["running", "done", "error"]),
phase: z.enum(["running", "done", "partial", "error"]),
scannerConfig: z.object({ matcherSlugs: z.array(z.string()) }).optional(),
processorConfig: z
.object({
Expand All @@ -174,6 +174,7 @@ export const runMetaSchema = z.object({
filesScanned: z.number().optional(),
candidatesFound: z.number().optional(),
filesProcessed: z.number().optional(),
filesFailed: z.number().optional(),
findingsCount: z.number().optional(),
totalCostUsd: z.number().optional(),
totalInputTokens: z.number().optional(),
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface RunMeta {
createdAt: string;
completedAt?: string;
type: "scan" | "process" | "revalidate";
phase: "running" | "done" | "error";
phase: "running" | "done" | "partial" | "error";
scannerConfig?: {
matcherSlugs: string[];
};
Expand All @@ -20,6 +20,7 @@ export interface RunMeta {
filesScanned?: number;
candidatesFound?: number;
filesProcessed?: number;
filesFailed?: number;
findingsCount?: number;
totalCostUsd?: number;
totalInputTokens?: number;
Expand Down
7 changes: 6 additions & 1 deletion packages/deepsec/src/commands/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,13 @@ export async function processCommand(opts: {
onProgress: logProgress,
});

console.log(`${GREEN}Processing complete.${RESET} Run: ${BOLD}${result.runId}${RESET}`);
const phaseColor = result.phase === "partial" ? YELLOW : GREEN;
const phaseLabel = result.phase === "partial" ? "Processing partial." : "Processing complete.";
console.log(`${phaseColor}${phaseLabel}${RESET} Run: ${BOLD}${result.runId}${RESET}`);
console.log(` Analyses: ${result.analysisCount}`);
if (result.failedCount > 0) {
console.log(` Failed/Retryable: ${result.failedCount}`);
}
console.log(` Findings: ${result.findingCount}`);
console.log();
console.log(`Next:`);
Expand Down
45 changes: 45 additions & 0 deletions packages/processor/src/__tests__/process-revalidate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ describe("processor with stub agent", () => {

expect(result.findingCount).toBe(1);
expect(result.analysisCount).toBe(1);
expect(result.failedCount).toBe(0);
expect(result.phase).toBe("done");
expect(stub.calls.investigateCalls).toHaveLength(1);
expect(stub.calls.investigateCalls[0].batch).toHaveLength(1);

Expand Down Expand Up @@ -252,6 +254,49 @@ describe("processor with stub agent", () => {
expect(rec.analysisHistory[0].refusal?.reason).toBe("stub refusal");
});

it("process() marks batch retryable and run partial on agent error with zero output", async () => {
const fx = setupProject({ files: ["a.ts", "b.ts"] });
fx.writeRecord(pendingRecord(fx.projectId, "a.ts"));
fx.writeRecord(pendingRecord(fx.projectId, "b.ts"));

const stub = new StubAgent({
async *investigateImpl(params) {
return {
results: params.batch.map((r) => ({ filePath: r.filePath, findings: [] })),
meta: {
durationMs: 1,
hadErrors: true,
usage: {
inputTokens: 0,
outputTokens: 0,
cacheReadInputTokens: 0,
cacheCreationInputTokens: 0,
},
},
};
},
});
setLoadedConfig(
defineConfig({
projects: [{ id: fx.projectId, root: fx.targetRoot }],
plugins: [{ name: "stub-plugin", agents: [stub] }],
}),
);

const result = await processProject({
projectId: fx.projectId,
agentType: "stub",
concurrency: 1,
});

expect(result.analysisCount).toBe(0);
expect(result.findingCount).toBe(0);
expect(result.failedCount).toBe(2);
expect(result.phase).toBe("partial");
expect(fx.readRecord("a.ts").status).toBe("error");
expect(fx.readRecord("b.ts").status).toBe("error");
});

it("revalidate() attaches verdicts to existing findings", async () => {
const fx = setupProject({ files: ["app.ts"] });
const rec = pendingRecord(fx.projectId, "app.ts");
Expand Down
5 changes: 5 additions & 0 deletions packages/processor/src/agents/claude-agent-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin {
let toolUseCount = 0;
let sdkMeta: Partial<BatchMeta> = {};
let lastError = "";
let hadErrors = false;

for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
if (attempt > 1) {
Expand All @@ -89,6 +90,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin {
toolUseCount = 0;
sdkMeta = {};
lastError = "";
hadErrors = false;
}

try {
Expand Down Expand Up @@ -173,6 +175,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin {
}
} else {
lastError = String(msg.error ?? "unknown");
hadErrors = true;
yield {
type: "error" as const,
message: `Agent error: ${lastError.slice(0, 300)}`,
Expand All @@ -189,6 +192,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin {
}
} catch (sdkErr) {
lastError = sdkErr instanceof Error ? sdkErr.message : String(sdkErr);
hadErrors = true;
yield {
type: "error" as const,
message: `Agent SDK error: ${lastError.slice(0, 300)}`,
Expand Down Expand Up @@ -225,6 +229,7 @@ export class ClaudeAgentSdkPlugin implements AgentPlugin {
durationMs,
...sdkMeta,
refusal,
hadErrors,
},
};
}
Expand Down
7 changes: 7 additions & 0 deletions packages/processor/src/agents/codex-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {
let toolUseCount = 0;
let sdkMeta: Partial<BatchMeta> = {};
let lastError = "";
let hadErrors = false;

for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
if (attempt > 1) {
Expand All @@ -573,6 +574,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {
toolUseCount = 0;
sdkMeta = {};
lastError = "";
hadErrors = false;
}

try {
Expand Down Expand Up @@ -632,6 +634,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {

case "turn.failed":
lastError = event.error?.message ?? "turn.failed";
hadErrors = true;
yield {
type: "error" as const,
message: `Codex turn failed: ${lastError.slice(0, 300)}`,
Expand All @@ -640,6 +643,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {

case "error":
lastError = event.message;
hadErrors = true;
yield {
type: "error" as const,
message: `Codex stream error: ${lastError.slice(0, 300)}`,
Expand All @@ -649,6 +653,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {
}
} catch (sdkErr) {
lastError = sdkErr instanceof Error ? sdkErr.message : String(sdkErr);
hadErrors = true;
yield {
type: "error" as const,
message: `Codex SDK error: ${lastError.slice(0, 300)}`,
Expand Down Expand Up @@ -696,6 +701,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {
const wasSilent = (sdkMeta.usage?.outputTokens ?? 0) === 0;
const stderrTail = wasSilent ? readStderrTail(invocation.stderrLog) : undefined;
if (wasSilent && stderrTail) {
hadErrors = true;
yield {
type: "error" as const,
message: `Codex silent-exit stderr: ${stderrTail.slice(0, 1500)}`,
Expand Down Expand Up @@ -737,6 +743,7 @@ export class CodexAgentSdkPlugin implements AgentPlugin {
durationMs,
...sdkMeta,
refusal,
hadErrors,
},
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/processor/src/agents/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ export interface BatchMeta {
* Empty/undefined for non-codex backends and successful codex runs.
*/
codexStderr?: string;
/**
* True when the agent backend emitted one or more runtime errors while
* handling the batch (quota/rate-limit/provider failures etc).
*/
hadErrors?: boolean;
}

export interface InvestigateOutput {
Expand Down
56 changes: 50 additions & 6 deletions packages/processor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ export async function process(params: {
/** Skip files whose candidate slugs are ALL in this set (files with any other slug still get processed) */
skipSlugs?: string[];
onProgress?: (progress: ProcessProgress) => void;
}): Promise<{ runId: string; analysisCount: number; findingCount: number }> {
}): Promise<{
runId: string;
analysisCount: number;
findingCount: number;
failedCount: number;
phase: "done" | "partial";
}> {
const { projectId, agentType = "claude-agent-sdk", config = {}, reinvestigate = false } = params;
// We deliberately don't default `promptTemplate` to DEFAULT_PROMPT_TEMPLATE
// here — when the caller doesn't pass one, we use the modular assembler
Expand Down Expand Up @@ -214,7 +220,7 @@ export async function process(params: {
type: "all_complete",
message: `Run ${runId} already completed`,
});
return { runId, analysisCount: 0, findingCount: 0 };
return { runId, analysisCount: 0, findingCount: 0, failedCount: 0, phase: "done" };
}
} else {
// Create new run
Expand Down Expand Up @@ -317,7 +323,7 @@ export async function process(params: {
message: "No files to process",
});
completeRun(projectId, runId, "done", { filesProcessed: 0 });
return { runId, analysisCount: 0, findingCount: 0 };
return { runId, analysisCount: 0, findingCount: 0, failedCount: 0, phase: "done" };
}

// Apply path filter
Expand All @@ -344,8 +350,10 @@ export async function process(params: {
let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalDurationMs = 0;
let totalFailedAnalyses = 0;
let batchesCompleted = 0;
let batchesInFlight = 0;
let hadPartialFailures = false;
const concurrency = params.concurrency ?? defaultConcurrency();

async function processBatch(batch: FileRecord[], i: number) {
Expand Down Expand Up @@ -387,13 +395,37 @@ export async function process(params: {

const output = result.value as InvestigateOutput;
const { results, meta: batchMeta } = output;
const zeroOutput = (batchMeta.usage?.outputTokens ?? 0) === 0;
const degradedBatch = Boolean(batchMeta.hadErrors && zeroOutput);

// Accumulate run-level stats
totalCostUsd += batchMeta.costUsd ?? 0;
totalInputTokens += batchMeta.usage?.inputTokens ?? 0;
totalOutputTokens += batchMeta.usage?.outputTokens ?? 0;
totalDurationMs += batchMeta.durationMs;

// If the agent reported runtime failures and produced zero output
// tokens, treat this batch as retryable failure instead of a clean
// "analyzed with 0 findings" pass.
if (degradedBatch) {
hadPartialFailures = true;
for (const record of batch) {
record.status = "error";
record.lockedByRunId = undefined;
writeFileRecord(record);
totalFailedAnalyses++;
}
batchesInFlight--;
batchesCompleted++;
emitProgress({
type: "batch_complete",
message: `Batch ${i + 1}/${batches.length} partial: agent error with empty output; marked ${batch.length} file(s) retryable (${batchesInFlight} in flight, ${batchesCompleted}/${batches.length} done)`,
batchIndex: i,
totalBatches: batches.length,
});
return;
}

// Update file records with results + metadata.
//
// Re-investigation always *merges* — existing findings are preserved
Expand Down Expand Up @@ -451,6 +483,8 @@ export async function process(params: {
record.status = "error";
record.lockedByRunId = undefined;
writeFileRecord(record);
totalFailedAnalyses++;
hadPartialFailures = true;
}
}

Expand All @@ -469,7 +503,9 @@ export async function process(params: {
record.status = "error";
record.lockedByRunId = undefined;
writeFileRecord(record);
totalFailedAnalyses++;
}
hadPartialFailures = true;
emitProgress({
type: "batch_complete",
message: `Batch ${i + 1}/${batches.length} failed: ${err instanceof Error ? err.message : String(err)} (${batchesInFlight} in flight, ${batchesCompleted}/${batches.length} done)`,
Expand Down Expand Up @@ -497,8 +533,10 @@ export async function process(params: {
await Promise.all(workers);
}

completeRun(projectId, runId, "done", {
const finalPhase = hadPartialFailures ? "partial" : "done";
completeRun(projectId, runId, finalPhase, {
filesProcessed: totalAnalyses,
filesFailed: totalFailedAnalyses,
findingsCount: totalFindings,
totalCostUsd,
totalInputTokens,
Expand All @@ -508,10 +546,16 @@ export async function process(params: {

emitProgress({
type: "all_complete",
message: `Processing complete: ${totalAnalyses} analyses, ${totalFindings} findings`,
message: `Processing complete: ${totalAnalyses} analyses, ${totalFindings} findings${totalFailedAnalyses > 0 ? `, ${totalFailedAnalyses} failed/retryable` : ""}`,
});

return { runId, analysisCount: totalAnalyses, findingCount: totalFindings };
return {
runId,
analysisCount: totalAnalyses,
findingCount: totalFindings,
failedCount: totalFailedAnalyses,
phase: finalPhase,
};
}

// --- Revalidation ---
Expand Down