Skip to content

Commit 0e8a2d7

Browse files
committed
fix(knowledge): route connector doc processing through queue instead of fire-and-forget
1 parent cdea240 commit 0e8a2d7

File tree

2 files changed

+120
-69
lines changed

2 files changed

+120
-69
lines changed

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 82 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import {
66
knowledgeConnectorSyncLog,
77
} from '@sim/db/schema'
88
import { createLogger } from '@sim/logger'
9-
import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm'
9+
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
1010
import { decryptApiKey } from '@/lib/api-key/crypto'
1111
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
12+
import type { DocumentData } from '@/lib/knowledge/documents/service'
1213
import {
1314
hardDeleteDocuments,
1415
isTriggerAvailable,
15-
processDocumentAsync,
16+
processDocumentsWithQueue,
1617
} from '@/lib/knowledge/documents/service'
1718
import { StorageService } from '@/lib/uploads'
1819
import { deleteFile } from '@/lib/uploads/core/storage-service'
@@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error {
3940
const SYNC_BATCH_SIZE = 5
4041
const MAX_PAGES = 500
4142
const MAX_SAFE_TITLE_LENGTH = 200
43+
const STALE_PROCESSING_MINUTES = 45
44+
const RETRY_WINDOW_DAYS = 7
4245

4346
/** Sanitizes a document title for use in S3 storage keys. */
4447
function sanitizeStorageTitle(title: string): string {
@@ -395,6 +398,8 @@ export async function executeSync(
395398

396399
const seenExternalIds = new Set<string>()
397400

401+
const pendingProcessing: DocumentData[] = []
402+
398403
const pendingOps: DocOp[] = []
399404
for (const extDoc of externalDocs) {
400405
seenExternalIds.add(extDoc.externalId)
@@ -503,6 +508,7 @@ export async function executeSync(
503508
for (let j = 0; j < settled.length; j++) {
504509
const outcome = settled[j]
505510
if (outcome.status === 'fulfilled') {
511+
pendingProcessing.push(outcome.value)
506512
if (batch[j].type === 'add') result.docsAdded++
507513
else result.docsUpdated++
508514
} else {
@@ -537,9 +543,14 @@ export async function executeSync(
537543
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
538544
}
539545

540-
// Retry stuck documents that failed or never completed processing.
546+
// Retry stuck documents that failed, never started, or were abandoned mid-processing.
541547
// Only retry docs uploaded BEFORE this sync — docs added in the current sync
542548
// are still processing asynchronously and would cause a duplicate processing race.
549+
// Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered
550+
// abandoned (e.g. the Trigger.dev task process exited before processing completed).
551+
// Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried.
552+
const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000)
553+
const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000)
543554
const stuckDocs = await db
544555
.select({
545556
id: document.id,
@@ -552,8 +563,15 @@ export async function executeSync(
552563
.where(
553564
and(
554565
eq(document.connectorId, connectorId),
555-
inArray(document.processingStatus, ['pending', 'failed']),
566+
or(
567+
inArray(document.processingStatus, ['pending', 'failed']),
568+
and(
569+
eq(document.processingStatus, 'processing'),
570+
lt(document.processingStartedAt, staleProcessingCutoff)
571+
)
572+
),
556573
lt(document.uploadedAt, syncStartedAt),
574+
gt(document.uploadedAt, retryCutoff),
557575
eq(document.userExcluded, false),
558576
isNull(document.archivedAt),
559577
isNull(document.deletedAt)
@@ -562,36 +580,68 @@ export async function executeSync(
562580

563581
if (stuckDocs.length > 0) {
564582
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
565-
for (const doc of stuckDocs) {
566-
processDocumentAsync(
567-
connector.knowledgeBaseId,
568-
doc.id,
569-
{
583+
try {
584+
await processDocumentsWithQueue(
585+
stuckDocs.map((doc) => ({
586+
documentId: doc.id,
570587
filename: doc.filename ?? 'document.txt',
571588
fileUrl: doc.fileUrl ?? '',
572589
fileSize: doc.fileSize ?? 0,
573590
mimeType: doc.mimeType ?? 'text/plain',
574-
},
575-
{}
576-
).catch((error) => {
577-
logger.warn('Failed to retry stuck document', {
578-
documentId: doc.id,
579-
error: error instanceof Error ? error.message : String(error),
580-
})
591+
})),
592+
connector.knowledgeBaseId,
593+
{},
594+
crypto.randomUUID()
595+
)
596+
} catch (error) {
597+
logger.warn('Failed to enqueue stuck documents for reprocessing', {
598+
connectorId,
599+
count: stuckDocs.length,
600+
error: error instanceof Error ? error.message : String(error),
581601
})
582602
}
583603
}
584604

585605
await completeSyncLog(syncLogId, 'completed', result)
586606

607+
// Enqueue all added/updated documents for processing in a single batch
608+
if (pendingProcessing.length > 0) {
609+
try {
610+
await processDocumentsWithQueue(
611+
pendingProcessing,
612+
connector.knowledgeBaseId,
613+
{},
614+
crypto.randomUUID()
615+
)
616+
} catch (error) {
617+
logger.warn('Failed to enqueue documents for processing', {
618+
connectorId,
619+
count: pendingProcessing.length,
620+
error: error instanceof Error ? error.message : String(error),
621+
})
622+
}
623+
}
624+
625+
const [{ count: actualDocCount }] = await db
626+
.select({ count: sql<number>`count(*)::int` })
627+
.from(document)
628+
.where(
629+
and(
630+
eq(document.connectorId, connectorId),
631+
eq(document.userExcluded, false),
632+
isNull(document.archivedAt),
633+
isNull(document.deletedAt)
634+
)
635+
)
636+
587637
const now = new Date()
588638
await db
589639
.update(knowledgeConnector)
590640
.set({
591641
status: 'active',
592642
lastSyncAt: now,
593643
lastSyncError: null,
594-
lastSyncDocCount: externalDocs.length,
644+
lastSyncDocCount: actualDocCount,
595645
nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes),
596646
consecutiveFailures: 0,
597647
updatedAt: now,
@@ -711,7 +761,7 @@ async function addDocument(
711761
connectorType: string,
712762
extDoc: ExternalDocument,
713763
sourceConfig?: Record<string, unknown>
714-
): Promise<void> {
764+
): Promise<DocumentData> {
715765
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
716766
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
717767
}
@@ -773,23 +823,13 @@ async function addDocument(
773823
throw error
774824
}
775825

776-
processDocumentAsync(
777-
knowledgeBaseId,
826+
return {
778827
documentId,
779-
{
780-
filename: processingFilename,
781-
fileUrl,
782-
fileSize: contentBuffer.length,
783-
mimeType: 'text/plain',
784-
},
785-
{}
786-
).catch((error) => {
787-
logger.error('Failed to process connector document', {
788-
documentId,
789-
connectorId,
790-
error: error instanceof Error ? error.message : String(error),
791-
})
792-
})
828+
filename: processingFilename,
829+
fileUrl,
830+
fileSize: contentBuffer.length,
831+
mimeType: 'text/plain',
832+
}
793833
}
794834

795835
/**
@@ -803,7 +843,7 @@ async function updateDocument(
803843
connectorType: string,
804844
extDoc: ExternalDocument,
805845
sourceConfig?: Record<string, unknown>
806-
): Promise<void> {
846+
): Promise<DocumentData> {
807847
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
808848
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
809849
}
@@ -894,21 +934,11 @@ async function updateDocument(
894934
}
895935
}
896936

897-
processDocumentAsync(
898-
knowledgeBaseId,
899-
existingDocId,
900-
{
901-
filename: processingFilename,
902-
fileUrl,
903-
fileSize: contentBuffer.length,
904-
mimeType: 'text/plain',
905-
},
906-
{}
907-
).catch((error) => {
908-
logger.error('Failed to re-process updated connector document', {
909-
documentId: existingDocId,
910-
connectorId,
911-
error: error instanceof Error ? error.message : String(error),
912-
})
913-
})
937+
return {
938+
documentId: existingDocId,
939+
filename: processingFilename,
940+
fileUrl,
941+
fileSize: contentBuffer.length,
942+
mimeType: 'text/plain',
943+
}
914944
}

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ export interface DocumentData {
114114
}
115115

116116
export interface ProcessingOptions {
117-
chunkSize: number
118-
minCharactersPerChunk: number
119-
recipe: string
120-
lang: string
121-
chunkOverlap: number
117+
chunkSize?: number
118+
minCharactersPerChunk?: number
119+
recipe?: string
120+
lang?: string
121+
chunkOverlap?: number
122122
}
123123

124124
export interface DocumentJobData {
@@ -676,19 +676,31 @@ export async function processDocumentsWithTrigger(
676676
try {
677677
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
678678

679-
const jobPromises = documents.map(async (document) => {
680-
const job = await tasks.trigger('knowledge-process-document', document)
681-
return job.id
682-
})
683-
684-
const jobIds = await Promise.all(jobPromises)
679+
const MAX_BATCH_SIZE = 1000
680+
const batchIds: string[] = []
681+
682+
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
683+
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
684+
const batchResult = await tasks.batchTrigger(
685+
'knowledge-process-document',
686+
chunk.map((doc) => ({
687+
payload: doc,
688+
options: {
689+
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
690+
},
691+
}))
692+
)
693+
batchIds.push(batchResult.batchId)
694+
}
685695

686-
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
696+
logger.info(
697+
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
698+
)
687699

688700
return {
689701
success: true,
690702
message: `${documents.length} document processing jobs triggered`,
691-
jobIds,
703+
jobIds: batchIds,
692704
}
693705
} catch (error) {
694706
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
@@ -1590,10 +1602,19 @@ export async function retryDocumentProcessing(
15901602
chunkOverlap: kbConfig.overlap,
15911603
}
15921604

1593-
processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch(
1594-
(error: unknown) => {
1595-
logger.error(`[${requestId}] Background retry processing error:`, error)
1596-
}
1605+
await processDocumentsWithQueue(
1606+
[
1607+
{
1608+
documentId,
1609+
filename: docData.filename,
1610+
fileUrl: docData.fileUrl,
1611+
fileSize: docData.fileSize,
1612+
mimeType: docData.mimeType,
1613+
},
1614+
],
1615+
knowledgeBaseId,
1616+
processingOptions,
1617+
requestId
15971618
)
15981619

15991620
logger.info(`[${requestId}] Document retry initiated: ${documentId}`)

0 commit comments

Comments
 (0)