Skip to content

Commit c226c2a

Browse files
committed
Harden watcher error isolation and retry cursor behavior
1 parent 32ad510 commit c226c2a

File tree

3 files changed

+303
-20
lines changed

3 files changed

+303
-20
lines changed

packages/stackflow-agent/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,6 @@ const result = await agent.acceptIncomingTransfer({
133133
- `getPipeState` (recommended): per-pipe read-only polling (`get-pipe`)
134134
- `listClosureEvents`: event scan mode
135135
4. Watcher retries are idempotent for already-disputed closures (same closure txid is skipped on later polls).
136-
5. For production hardening, add alerting, signer balance checks, and idempotency audit logs.
136+
5. Read-only polling isolates per-pipe failures (`getPipeState` errors on one pipe do not stop others).
137+
6. Event scan mode intentionally holds the cursor when any dispute submission errors occur, so failed disputes are retried on next run.
138+
7. For production hardening, add alerting, signer balance checks, and idempotency audit logs.

packages/stackflow-agent/src/watcher.js

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,24 @@ export class HourlyClosureWatcher {
145145
this.running = false;
146146
}
147147

148+
reportError(error, context = null) {
149+
if (!error) {
150+
return;
151+
}
152+
if (this.onError) {
153+
if (context && error instanceof Error && !error.context) {
154+
error.context = context;
155+
}
156+
this.onError(error);
157+
return;
158+
}
159+
console.error(
160+
`[stackflow-agent] watcher error${
161+
context ? ` (${context})` : ""
162+
}: ${error instanceof Error ? error.message : String(error)}`,
163+
);
164+
}
165+
148166
start() {
149167
if (this.timer) {
150168
return;
@@ -200,23 +218,38 @@ export class HourlyClosureWatcher {
200218
pipesScanned: 0,
201219
closuresFound: 0,
202220
disputesSubmitted: 0,
221+
skippedAlreadyDisputed: 0,
222+
fetchErrors: 0,
223+
disputeErrors: 0,
203224
};
204225
}
205226

206227
let closuresFound = 0;
207228
let disputesSubmitted = 0;
208229
let skippedAlreadyDisputed = 0;
230+
let fetchErrors = 0;
231+
let disputeErrors = 0;
209232
let pipesScanned = 0;
210233
for (const trackedPipe of trackedPipes) {
211234
pipesScanned += 1;
212-
const pipeState = await this.getPipeState({
213-
contractId: trackedPipe.contractId,
214-
token: trackedPipe.token ?? null,
215-
pipeKey: trackedPipe.pipeKey,
216-
forPrincipal: trackedPipe.localPrincipal,
217-
withPrincipal: trackedPipe.counterpartyPrincipal,
218-
pipeId: trackedPipe.pipeId,
219-
});
235+
let pipeState;
236+
try {
237+
pipeState = await this.getPipeState({
238+
contractId: trackedPipe.contractId,
239+
token: trackedPipe.token ?? null,
240+
pipeKey: trackedPipe.pipeKey,
241+
forPrincipal: trackedPipe.localPrincipal,
242+
withPrincipal: trackedPipe.counterpartyPrincipal,
243+
pipeId: trackedPipe.pipeId,
244+
});
245+
} catch (error) {
246+
fetchErrors += 1;
247+
this.reportError(
248+
error,
249+
`getPipeState:${trackedPipe.contractId}:${trackedPipe.pipeId}`,
250+
);
251+
continue;
252+
}
220253
const rawClosure = toClosureFromPipeState({
221254
trackedPipe,
222255
pipeState,
@@ -239,10 +272,17 @@ export class HourlyClosureWatcher {
239272
continue;
240273
}
241274

242-
const disputeResult = await this.agentService.disputeClosure({
243-
closureEvent: closure,
244-
walletPassword: this.walletPassword,
245-
});
275+
let disputeResult;
276+
try {
277+
disputeResult = await this.agentService.disputeClosure({
278+
closureEvent: closure,
279+
walletPassword: this.walletPassword,
280+
});
281+
} catch (error) {
282+
disputeErrors += 1;
283+
this.reportError(error, `disputeClosure:${closure.txid}`);
284+
continue;
285+
}
246286
if (disputeResult.submitted) {
247287
disputesSubmitted += 1;
248288
}
@@ -255,6 +295,8 @@ export class HourlyClosureWatcher {
255295
closuresFound,
256296
disputesSubmitted,
257297
skippedAlreadyDisputed,
298+
fetchErrors,
299+
disputeErrors,
258300
};
259301
} finally {
260302
this.running = false;
@@ -281,6 +323,8 @@ export class HourlyClosureWatcher {
281323
ok: true,
282324
scanned: 0,
283325
disputesSubmitted: 0,
326+
skippedAlreadyDisputed: 0,
327+
disputeErrors: 0,
284328
fromBlockHeight,
285329
toBlockHeight: fromBlockHeight,
286330
};
@@ -289,6 +333,8 @@ export class HourlyClosureWatcher {
289333
let highestBlock = parseUnsignedBigInt(fromBlockHeight, "fromBlockHeight");
290334
let disputesSubmitted = 0;
291335
let skippedAlreadyDisputed = 0;
336+
let disputeErrors = 0;
337+
let hasDisputeErrors = false;
292338
let scanned = 0;
293339

294340
for (const rawEvent of events) {
@@ -304,11 +350,18 @@ export class HourlyClosureWatcher {
304350
if (existingClosure?.disputed) {
305351
skippedAlreadyDisputed += 1;
306352
} else {
307-
const disputeResult = await this.agentService.disputeClosure({
308-
closureEvent: closure,
309-
walletPassword: this.walletPassword,
310-
});
311-
if (disputeResult.submitted) {
353+
let disputeResult;
354+
try {
355+
disputeResult = await this.agentService.disputeClosure({
356+
closureEvent: closure,
357+
walletPassword: this.walletPassword,
358+
});
359+
} catch (error) {
360+
disputeErrors += 1;
361+
hasDisputeErrors = true;
362+
this.reportError(error, `disputeClosure:${closure.txid}`);
363+
}
364+
if (disputeResult?.submitted) {
312365
disputesSubmitted += 1;
313366
}
314367
}
@@ -319,14 +372,21 @@ export class HourlyClosureWatcher {
319372
}
320373
}
321374

322-
this.agentService.stateStore.setWatcherCursor(highestBlock.toString(10));
375+
const toBlockHeight = hasDisputeErrors
376+
? fromBlockHeight
377+
: highestBlock.toString(10);
378+
if (!hasDisputeErrors) {
379+
this.agentService.stateStore.setWatcherCursor(toBlockHeight);
380+
}
381+
323382
return {
324383
ok: true,
325384
scanned,
326385
disputesSubmitted,
327386
skippedAlreadyDisputed,
387+
disputeErrors,
328388
fromBlockHeight,
329-
toBlockHeight: highestBlock.toString(10),
389+
toBlockHeight,
330390
};
331391
} finally {
332392
this.running = false;

0 commit comments

Comments
 (0)