Skip to content

Commit abbf62d

Browse files
author
StackMemory Bot (CLI)
committed
fix(sync): batch push queries + implement force flag
W2: Replace N+1 push handler with batch UNNEST upsert — single conflict check query + single upsert query instead of 2N round trips. Added 500-entity batch size limit. W4: Wire force flag through stack (MCP handler → manager → engine). Force push resets cloud_sync_state and cursors, re-pushes all data.
1 parent 79cd25e commit abbf62d

6 files changed

Lines changed: 85 additions & 39 deletions

File tree

packages/provenant-api/src/index.js

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,34 +79,70 @@ async function handlePush(request, sql, projectId, clientId) {
7979
});
8080
}
8181

82-
const rejected = [];
83-
let accepted = 0;
82+
if (entities.length > 500) {
83+
return json({ error: 'Batch too large (max 500)' }, 400);
84+
}
85+
86+
// Batch conflict check — single query instead of N+1
87+
const incomingIds = entities.map((e) => e.id);
88+
const incomingTables = entities.map((e) => e.table);
89+
const existingRows = await sql`
90+
SELECT id, table_name, version FROM sync_entities
91+
WHERE project_id = ${projectId}
92+
AND (id, table_name) IN (
93+
SELECT UNNEST(${incomingIds}::text[]), UNNEST(${incomingTables}::text[])
94+
)
95+
`;
96+
97+
const existingMap = new Map();
98+
for (const row of existingRows) {
99+
existingMap.set(`${row.table_name}:${row.id}`, Number(row.version));
100+
}
101+
102+
// Separate conflicts from upsertable entities
84103
const conflicts = [];
104+
const toUpsert = [];
85105

86106
for (const entity of entities) {
87-
try {
88-
// Check for conflict (newest_wins)
89-
const existing = await sql`
90-
SELECT version FROM sync_entities
91-
WHERE project_id = ${projectId}
92-
AND table_name = ${entity.table}
93-
AND id = ${entity.id}
94-
`;
107+
const key = `${entity.table}:${entity.id}`;
108+
const serverVersion = existingMap.get(key);
109+
if (serverVersion !== undefined && serverVersion > entity.version) {
110+
conflicts.push({
111+
id: entity.id,
112+
table: entity.table,
113+
serverVersion,
114+
clientVersion: entity.version,
115+
});
116+
} else {
117+
toUpsert.push(entity);
118+
}
119+
}
95120

96-
if (existing.length > 0 && existing[0].version > entity.version) {
97-
conflicts.push({
98-
id: entity.id,
99-
table: entity.table,
100-
serverVersion: Number(existing[0].version),
101-
clientVersion: entity.version,
102-
});
103-
continue;
104-
}
121+
// Batch upsert — single query
122+
const rejected = [];
123+
let accepted = 0;
124+
125+
if (toUpsert.length > 0) {
126+
try {
127+
const ids = toUpsert.map((e) => e.id);
128+
const tableNames = toUpsert.map((e) => e.table);
129+
const versions = toUpsert.map((e) => e.version);
130+
const tiers = toUpsert.map((e) => e.tier);
131+
const dataArr = toUpsert.map((e) => JSON.stringify(e.data));
132+
const clientIds = toUpsert.map(() => clientId);
133+
const projectIds = toUpsert.map(() => projectId);
105134

106-
// Upsert
107135
await sql`
108136
INSERT INTO sync_entities (id, project_id, table_name, version, tier, data, client_id)
109-
VALUES (${entity.id}, ${projectId}, ${entity.table}, ${entity.version}, ${entity.tier}, ${JSON.stringify(entity.data)}, ${clientId})
137+
SELECT * FROM UNNEST(
138+
${ids}::text[],
139+
${projectIds}::text[],
140+
${tableNames}::text[],
141+
${versions}::bigint[],
142+
${tiers}::text[],
143+
${dataArr}::jsonb[],
144+
${clientIds}::text[]
145+
)
110146
ON CONFLICT (project_id, table_name, id)
111147
DO UPDATE SET
112148
version = EXCLUDED.version,
@@ -115,9 +151,12 @@ async function handlePush(request, sql, projectId, clientId) {
115151
client_id = EXCLUDED.client_id,
116152
pushed_at = NOW()
117153
`;
118-
accepted++;
154+
accepted = toUpsert.length;
119155
} catch (err) {
120-
rejected.push({ id: entity.id, reason: String(err) });
156+
// If batch fails, report all as rejected
157+
for (const entity of toUpsert) {
158+
rejected.push({ id: entity.id, reason: String(err) });
159+
}
121160
}
122161
}
123162

scripts/gepa/.before-optimize.md

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,13 @@ When adding or renaming GitHub Actions workflows that should be triggerable via
111111

112112
| Workflow | Script path | Category |
113113
|---|---|---|
114-
| `weekly-start.yml` | `voyager/scripts/content-brief.mjs` + `voyager/scripts/content-audit.mjs` + `ops/fathom-social-content.mjs` + `ops/fathom-testimonial-scan.mjs` + `ops/perplexity-citation-audit.mjs` + `commit/profound-aeo-pulse.mjs` + `commit/citation-rank-tracker.mjs` + `diag/ranking-snapshot.mjs` + `voyager/scripts/generate-blog-scaffold.mjs` + `ops/ahrefs-firehose-digest.mjs` + `ops/export-dripify.mjs` + `commit/prospect-discovery.mjs` + `commit/growth-signal-leads.mjs` + `ops/repush-clay-leads.mjs` + `ops/snitcher-outreach.mjs` | GHA cron (Mon) |
115-
| `weekly-end.yml` | `diag/fathom-demo-scorecard.mjs` + `commit/feedback/collect-*.mjs` + `commit/feedback/collect-ops-feedback.mjs` + `diag/weekly-retro.mjs` | GHA cron (Fri) |
114+
| `weekly-start.yml` | `voyager/scripts/content-brief.mjs` + `voyager/scripts/content-audit.mjs` + `ops/fathom-social-content.mjs` + `ops/fathom-testimonial-scan.mjs` + `ops/perplexity-citation-audit.mjs` + `commit/profound-aeo-pulse.mjs` + `commit/citation-rank-tracker.mjs` + `diag/ranking-snapshot.mjs` + `voyager/scripts/generate-blog-scaffold.mjs` + `ops/ahrefs-firehose-digest.mjs` + `ops/export-dripify.mjs` + `commit/prospect-discovery.mjs` + `commit/growth-signal-leads.mjs` + `ops/repush-clay-leads.mjs` + `ops/snitcher-outreach.mjs` | GHA cron (1st Mon) |
115+
| `weekly-end.yml` | `diag/fathom-demo-scorecard.mjs` + `commit/feedback/collect-*.mjs` + `commit/feedback/collect-ops-feedback.mjs` + `diag/weekly-retro.mjs` + `commit/sync-llms-txt.mjs` | GHA cron (Fri) |
116116
| `anneal-keywords.yml` | `commit/anneal-keywords.mjs` | GHA cron (Sun) |
117-
| `g2-review-monitor.yml` | `ops/g2-to-senja.mjs` | GHA cron (Daily) |
118-
| `testimonial-pipeline.yml` | `commit/testimonial-pipeline.mjs` | Manual |
117+
| `daily-ops.yml` | `ops/slack-digest.mjs` + `ops/fathom-meeting-digest.mjs` + `ops/ops-daily-briefing.mjs` + `ops/g2-to-senja.mjs` + `ops/review-intercept.mjs` | GHA cron (weekdays) |
118+
| `midweek-ops.yml` | `ops/sequence-orchestrator.mjs` + `ops/push-drafts-to-instantly.mjs` | GHA cron (Tue/Thu) |
119+
| `monthly-ops.yml` | `diag/pagespeed-audit.mjs` + `commit/pagespeed-improvements.mjs` + `commit/icp-tune.mjs` | GHA cron (1st of month) |
119120
| `video-pipeline.yml` | `ops/video-clips.mjs` | Manual |
120-
| `pagespeed-audit.yml` | `diag/pagespeed-audit.mjs` + `commit/pagespeed-improvements.mjs` | GHA cron (1st of month) |
121-
| `daily-ops.yml` | `ops/slack-digest.mjs` + `ops/fathom-meeting-digest.mjs` + `ops/ops-daily-briefing.mjs` | GHA cron (weekdays) |
122121
| `indexnow-submit.yml` | (inline curl) | Push to master (voyager) / Manual |
123122

124123
## GitHub Actions (`.github/workflows/`)
@@ -133,12 +132,12 @@ When adding or renaming GitHub Actions workflows that should be triggerable via
133132
In `actions/github-script@v7`, `github.rest.issues.createComment` posts plain issue comments on PRs (PRs are issues in GitHub's API). For inline code suggestions on specific files/lines, use `github.rest.pulls.createReview` or `github.rest.pulls.createReviewComment` instead.
134133

135134
### Scheduled (cron)
136-
- `weekly-start.yml` — Mon 9am ET (content review, social content, testimonial scan, Perplexity audit, AEO pulse → blog scaffold, Ahrefs digest, Dripify export, prospect discovery, growth-signal leads → snitcher outreach)
137-
- `weekly-end.yml` — Fri 9am ET (demo scorecard + pipeline health)
135+
- `weekly-start.yml`1st Mon 9am EDT (content review, social content, testimonial scan, Perplexity audit, AEO pulse → blog scaffold, Ahrefs digest, Dripify export, prospect discovery, growth-signal leads → snitcher outreach)
136+
- `weekly-end.yml` — Fri 10am EDT / 9am EST (demo scorecard, pipeline health, llms.txt sync, freshness audit, SEO/AEO snapshot)
138137
- `anneal-keywords.yml` — Sun 11am ET (keyword annealing + kill pattern updates)
139-
- `g2-review-monitor.yml`Daily 10am ET
140-
- `pagespeed-audit.yml`1st of month 9am ET (PSI audit → Claude recommendations → PR)
141-
- `daily-ops.yml`Weekdays 10am ET (signal monitor, G2 reviews, review intercept, Slack digest → meeting digestdaily briefing)
138+
- `daily-ops.yml`Weekdays 10am EDT (signal monitor, G2 reviews, review intercept, Slack digest → meeting digest → daily briefing)
139+
- `midweek-ops.yml`Tue/Thu (sequence orchestrator + push drafts to Instantly)
140+
- `monthly-ops.yml`1st of month 10am EDT (PSI audit → Claude recommendationsPR, ICP tuning)
142141
- `indexnow-submit.yml` — On push to master (voyager pages) + manual (`/run indexnow urls=...`)
143142

144143
## Deployments

scripts/gepa/generations/current

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
/Users/jwu/Dev/stackmemory/scripts/gepa/generations/gen-002/baseline.md
1+
/Users/jwu/Dev/stackmemory/scripts/gepa/generations/gen-001/baseline.md

src/core/storage/cloud-sync-manager.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ export class CloudSyncManager extends EventEmitter {
9999
* Perform a push
100100
*/
101101
async performPush(
102-
trigger: 'manual' | 'frame-close' | 'session-end' | 'periodic'
102+
trigger: 'manual' | 'frame-close' | 'session-end' | 'periodic',
103+
force = false
103104
): Promise<CloudSyncPushResult> {
104-
const result = await this.engine.push();
105+
const result = await this.engine.push(force);
105106
this.emit('push', { trigger, result });
106107
return result;
107108
}

src/core/storage/cloud-sync.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,16 @@ export class CloudSyncEngine {
177177
/**
178178
* Push local changes to cloud
179179
*/
180-
async push(): Promise<CloudSyncPushResult> {
180+
async push(force = false): Promise<CloudSyncPushResult> {
181181
return this.mutex.withLock(async () => {
182182
try {
183+
if (force) {
184+
// Reset all sync state — re-push everything
185+
this.db.prepare(`DELETE FROM cloud_sync_state`).run();
186+
this.db
187+
.prepare(`DELETE FROM cloud_sync_cursors WHERE direction = 'push'`)
188+
.run();
189+
}
183190
const pending = this.collectPendingEntities();
184191
if (pending.length === 0) {
185192
return { success: true, pushed: 0, rejected: 0, conflicts: 0 };

src/integrations/mcp/handlers/cloud-sync-handlers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export class CloudSyncHandlers {
3838
force?: boolean;
3939
}): Promise<{ content: Array<{ type: string; text: string }> }> {
4040
const manager = this.ensureManager();
41-
const result = await manager.performPush('manual');
41+
const result = await manager.performPush('manual', args.force);
4242

4343
return {
4444
content: [

0 commit comments

Comments
 (0)