Skip to content

Commit 32a04dc

Browse files
committed
fix(knowledge): enqueue connector docs per-batch to survive sync timeouts
1 parent 7a1a460 commit 32a04dc

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,6 @@ export async function executeSync(
401401

402402
const seenExternalIds = new Set<string>()
403403

404-
const pendingProcessing: DocumentData[] = []
405-
406404
const pendingOps: DocOp[] = []
407405
for (const extDoc of externalDocs) {
408406
seenExternalIds.add(extDoc.externalId)
@@ -508,10 +506,11 @@ export async function executeSync(
508506
})
509507
)
510508

509+
const batchDocs: DocumentData[] = []
511510
for (let j = 0; j < settled.length; j++) {
512511
const outcome = settled[j]
513512
if (outcome.status === 'fulfilled') {
514-
pendingProcessing.push(outcome.value)
513+
batchDocs.push(outcome.value)
515514
if (batch[j].type === 'add') result.docsAdded++
516515
else result.docsUpdated++
517516
} else {
@@ -524,6 +523,27 @@ export async function executeSync(
524523
})
525524
}
526525
}
526+
527+
// Enqueue each batch for processing immediately after insertion.
528+
// This ensures that if the sync task times out (MAX_DURATION_EXCEEDED),
529+
// all documents inserted up to this point are already queued as independent
530+
// processing tasks and won't be orphaned.
531+
if (batchDocs.length > 0) {
532+
try {
533+
await processDocumentsWithQueue(
534+
batchDocs,
535+
connector.knowledgeBaseId,
536+
{},
537+
crypto.randomUUID()
538+
)
539+
} catch (error) {
540+
logger.warn('Failed to enqueue batch for processing — will retry on next sync', {
541+
connectorId,
542+
count: batchDocs.length,
543+
error: error instanceof Error ? error.message : String(error),
544+
})
545+
}
546+
}
527547
}
528548

529549
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
@@ -608,24 +628,6 @@ export async function executeSync(
608628
}
609629
}
610630

611-
// Enqueue all added/updated documents for processing in a single batch
612-
if (pendingProcessing.length > 0) {
613-
try {
614-
await processDocumentsWithQueue(
615-
pendingProcessing,
616-
connector.knowledgeBaseId,
617-
{},
618-
crypto.randomUUID()
619-
)
620-
} catch (error) {
621-
logger.warn('Failed to enqueue documents for processing — will retry on next sync', {
622-
connectorId,
623-
count: pendingProcessing.length,
624-
error: error instanceof Error ? error.message : String(error),
625-
})
626-
}
627-
}
628-
629631
await completeSyncLog(syncLogId, 'completed', result)
630632

631633
const [{ count: actualDocCount }] = await db

0 commit comments

Comments
 (0)