Skip to content

Commit 32ad510

Browse files
committed
Improve watcher idempotency for duplicate closure polls
1 parent f499953 commit 32ad510

File tree

4 files changed

+130
-8
lines changed

4 files changed

+130
-8
lines changed

packages/stackflow-agent/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,5 @@ const result = await agent.acceptIncomingTransfer({
132132
3. `HourlyClosureWatcher` supports two sources:
133133
- `getPipeState` (recommended): per-pipe read-only polling (`get-pipe`)
134134
- `listClosureEvents`: event scan mode
135-
4. For production hardening, add alerting, signer balance checks, and idempotency audit logs.
135+
4. Watcher retries are idempotent for already-disputed closures (same closure txid is skipped on later polls).
136+
5. For production hardening, add alerting, signer balance checks, and idempotency audit logs.

packages/stackflow-agent/src/db.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ export class AgentStateStore {
197197
dispute_txid = ?
198198
WHERE txid = ?
199199
`);
200+
this.getClosureStmt = this.db.prepare(`
201+
SELECT * FROM closures WHERE txid = ?
202+
`);
200203

201204
this.getCursorStmt = this.db.prepare(`
202205
SELECT last_block_height FROM watcher_cursor WHERE id = 1
@@ -386,6 +389,29 @@ export class AgentStateStore {
386389
);
387390
}
388391

392+
getClosure(txid) {
393+
this.assertOpen();
394+
const row = this.getClosureStmt.get(assertNonEmptyString(txid, "txid"));
395+
if (!row) {
396+
return null;
397+
}
398+
return {
399+
txid: row.txid,
400+
contractId: row.contract_id,
401+
pipeId: row.pipe_id,
402+
pipeKey: JSON.parse(row.pipe_key_json),
403+
eventName: row.event_name,
404+
nonce: row.nonce,
405+
closer: row.closer,
406+
blockHeight: row.block_height,
407+
expiresAt: row.expires_at,
408+
closureMyBalance: row.closure_my_balance,
409+
disputed: row.disputed === 1,
410+
disputeTxid: row.dispute_txid,
411+
createdAt: row.created_at,
412+
};
413+
}
414+
389415
getWatcherCursor() {
390416
this.assertOpen();
391417
const row = this.getCursorStmt.get();

packages/stackflow-agent/src/watcher.js

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ export class HourlyClosureWatcher {
205205

206206
let closuresFound = 0;
207207
let disputesSubmitted = 0;
208+
let skippedAlreadyDisputed = 0;
208209
let pipesScanned = 0;
209210
for (const trackedPipe of trackedPipes) {
210211
pipesScanned += 1;
@@ -231,7 +232,13 @@ export class HourlyClosureWatcher {
231232
}
232233

233234
closuresFound += 1;
235+
const existingClosure = this.agentService.stateStore.getClosure(closure.txid);
234236
this.agentService.stateStore.recordClosure(closure);
237+
if (existingClosure?.disputed) {
238+
skippedAlreadyDisputed += 1;
239+
continue;
240+
}
241+
235242
const disputeResult = await this.agentService.disputeClosure({
236243
closureEvent: closure,
237244
walletPassword: this.walletPassword,
@@ -247,6 +254,7 @@ export class HourlyClosureWatcher {
247254
pipesScanned,
248255
closuresFound,
249256
disputesSubmitted,
257+
skippedAlreadyDisputed,
250258
};
251259
} finally {
252260
this.running = false;
@@ -280,6 +288,7 @@ export class HourlyClosureWatcher {
280288

281289
let highestBlock = parseUnsignedBigInt(fromBlockHeight, "fromBlockHeight");
282290
let disputesSubmitted = 0;
291+
let skippedAlreadyDisputed = 0;
283292
let scanned = 0;
284293

285294
for (const rawEvent of events) {
@@ -290,14 +299,18 @@ export class HourlyClosureWatcher {
290299
continue;
291300
}
292301
scanned += 1;
302+
const existingClosure = this.agentService.stateStore.getClosure(closure.txid);
293303
this.agentService.stateStore.recordClosure(closure);
294-
295-
const disputeResult = await this.agentService.disputeClosure({
296-
closureEvent: closure,
297-
walletPassword: this.walletPassword,
298-
});
299-
if (disputeResult.submitted) {
300-
disputesSubmitted += 1;
304+
if (existingClosure?.disputed) {
305+
skippedAlreadyDisputed += 1;
306+
} else {
307+
const disputeResult = await this.agentService.disputeClosure({
308+
closureEvent: closure,
309+
walletPassword: this.walletPassword,
310+
});
311+
if (disputeResult.submitted) {
312+
disputesSubmitted += 1;
313+
}
301314
}
302315

303316
const block = parseUnsignedBigInt(closure.blockHeight, "blockHeight");
@@ -311,6 +324,7 @@ export class HourlyClosureWatcher {
311324
ok: true,
312325
scanned,
313326
disputesSubmitted,
327+
skippedAlreadyDisputed,
314328
fromBlockHeight,
315329
toBlockHeight: highestBlock.toString(10),
316330
};

tests/stackflow-agent.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,87 @@ describe("stackflow agent", () => {
208208
store.close();
209209
});
210210

211+
it("skips duplicate disputes for closures already marked disputed", async () => {
212+
const dbFile = tempDbFile("agent-duplicate-dispute");
213+
const store = new AgentStateStore({ dbFile });
214+
215+
const contractId = "ST1TESTABC.contract";
216+
const pipeKey = {
217+
"principal-1": "ST1LOCAL",
218+
"principal-2": "ST1OTHER",
219+
token: null,
220+
};
221+
const pipeId = buildPipeId({ contractId, pipeKey });
222+
223+
store.upsertTrackedPipe({
224+
pipeId,
225+
contractId,
226+
pipeKey,
227+
localPrincipal: "ST1LOCAL",
228+
counterpartyPrincipal: "ST1OTHER",
229+
token: null,
230+
});
231+
store.upsertSignatureState({
232+
contractId,
233+
pipeKey,
234+
forPrincipal: "ST1LOCAL",
235+
withPrincipal: "ST1OTHER",
236+
token: null,
237+
myBalance: "90",
238+
theirBalance: "10",
239+
nonce: "8",
240+
action: "1",
241+
actor: "ST1LOCAL",
242+
mySignature: "0x" + "11".repeat(65),
243+
theirSignature: "0x" + "22".repeat(65),
244+
secret: null,
245+
validAfter: null,
246+
beneficialOnly: false,
247+
});
248+
249+
let disputeCalls = 0;
250+
const agent = new StackflowAgentService({
251+
stateStore: store,
252+
signer: {
253+
async submitDispute() {
254+
disputeCalls += 1;
255+
return { txid: "0xdispute-dup" };
256+
},
257+
async sip018Sign() {
258+
return "0x" + "44".repeat(65);
259+
},
260+
async callContract() {
261+
return { ok: true };
262+
},
263+
},
264+
network: "devnet",
265+
disputeOnlyBeneficial: true,
266+
});
267+
268+
const watcher = new HourlyClosureWatcher({
269+
agentService: agent,
270+
getPipeState: async () => ({
271+
"balance-1": "20",
272+
"balance-2": "80",
273+
"expires-at": "200",
274+
nonce: "5",
275+
closer: "ST1OTHER",
276+
}),
277+
});
278+
279+
const first = await watcher.runOnce();
280+
expect(first.disputesSubmitted).toBe(1);
281+
expect(first.skippedAlreadyDisputed).toBe(0);
282+
283+
const second = await watcher.runOnce();
284+
expect(second.disputesSubmitted).toBe(0);
285+
expect(second.skippedAlreadyDisputed).toBe(1);
286+
expect(disputeCalls).toBe(1);
287+
288+
watcher.stop();
289+
store.close();
290+
});
291+
211292
it("validates and signs incoming transfer requests", async () => {
212293
const dbFile = tempDbFile("agent-sign");
213294
const store = new AgentStateStore({ dbFile });

0 commit comments

Comments
 (0)