Skip to content

Commit 7af7a22

Browse files
authored
fix(knowledge): route connector doc processing through queue instead of fire-and-forget (#3754)
* fix(knowledge): route connector doc processing through queue instead of fire-and-forget * fix(knowledge): rename jobIds to batchIds in processDocumentsWithTrigger return type * improvement(knowledge): add Trigger.dev tags for connector sync and document processing tasks * fix(knowledge): move completeSyncLog after doc enqueue, handle NULL processingStartedAt in stuck doc query
1 parent 228578e commit 7af7a22

File tree

2 files changed

+133
-75
lines changed

2 files changed

+133
-75
lines changed

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 93 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import {
66
knowledgeConnectorSyncLog,
77
} from '@sim/db/schema'
88
import { createLogger } from '@sim/logger'
9-
import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm'
9+
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
1010
import { decryptApiKey } from '@/lib/api-key/crypto'
1111
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
12+
import type { DocumentData } from '@/lib/knowledge/documents/service'
1213
import {
1314
hardDeleteDocuments,
1415
isTriggerAvailable,
15-
processDocumentAsync,
16+
processDocumentsWithQueue,
1617
} from '@/lib/knowledge/documents/service'
1718
import { StorageService } from '@/lib/uploads'
1819
import { deleteFile } from '@/lib/uploads/core/storage-service'
@@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error {
3940
const SYNC_BATCH_SIZE = 5
4041
const MAX_PAGES = 500
4142
const MAX_SAFE_TITLE_LENGTH = 200
43+
const STALE_PROCESSING_MINUTES = 45
44+
const RETRY_WINDOW_DAYS = 7
4245

4346
/** Sanitizes a document title for use in S3 storage keys. */
4447
function sanitizeStorageTitle(title: string): string {
@@ -147,11 +150,14 @@ export async function dispatchSync(
147150
const requestId = options?.requestId ?? crypto.randomUUID()
148151

149152
if (isTriggerAvailable()) {
150-
await knowledgeConnectorSync.trigger({
151-
connectorId,
152-
fullSync: options?.fullSync,
153-
requestId,
154-
})
153+
await knowledgeConnectorSync.trigger(
154+
{
155+
connectorId,
156+
fullSync: options?.fullSync,
157+
requestId,
158+
},
159+
{ tags: [`connector:${connectorId}`] }
160+
)
155161
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
156162
} else {
157163
executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => {
@@ -395,6 +401,8 @@ export async function executeSync(
395401

396402
const seenExternalIds = new Set<string>()
397403

404+
const pendingProcessing: DocumentData[] = []
405+
398406
const pendingOps: DocOp[] = []
399407
for (const extDoc of externalDocs) {
400408
seenExternalIds.add(extDoc.externalId)
@@ -503,6 +511,7 @@ export async function executeSync(
503511
for (let j = 0; j < settled.length; j++) {
504512
const outcome = settled[j]
505513
if (outcome.status === 'fulfilled') {
514+
pendingProcessing.push(outcome.value)
506515
if (batch[j].type === 'add') result.docsAdded++
507516
else result.docsUpdated++
508517
} else {
@@ -537,9 +546,14 @@ export async function executeSync(
537546
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
538547
}
539548

540-
// Retry stuck documents that failed or never completed processing.
549+
// Retry stuck documents that failed, never started, or were abandoned mid-processing.
541550
// Only retry docs uploaded BEFORE this sync — docs added in the current sync
542551
// are still processing asynchronously and would cause a duplicate processing race.
552+
// Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered
553+
// abandoned (e.g. the Trigger.dev task process exited before processing completed).
554+
// Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried.
555+
const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000)
556+
const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000)
543557
const stuckDocs = await db
544558
.select({
545559
id: document.id,
@@ -552,8 +566,18 @@ export async function executeSync(
552566
.where(
553567
and(
554568
eq(document.connectorId, connectorId),
555-
inArray(document.processingStatus, ['pending', 'failed']),
569+
or(
570+
inArray(document.processingStatus, ['pending', 'failed']),
571+
and(
572+
eq(document.processingStatus, 'processing'),
573+
or(
574+
isNull(document.processingStartedAt),
575+
lt(document.processingStartedAt, staleProcessingCutoff)
576+
)
577+
)
578+
),
556579
lt(document.uploadedAt, syncStartedAt),
580+
gt(document.uploadedAt, retryCutoff),
557581
eq(document.userExcluded, false),
558582
isNull(document.archivedAt),
559583
isNull(document.deletedAt)
@@ -562,36 +586,68 @@ export async function executeSync(
562586

563587
if (stuckDocs.length > 0) {
564588
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
565-
for (const doc of stuckDocs) {
566-
processDocumentAsync(
567-
connector.knowledgeBaseId,
568-
doc.id,
569-
{
589+
try {
590+
await processDocumentsWithQueue(
591+
stuckDocs.map((doc) => ({
592+
documentId: doc.id,
570593
filename: doc.filename ?? 'document.txt',
571594
fileUrl: doc.fileUrl ?? '',
572595
fileSize: doc.fileSize ?? 0,
573596
mimeType: doc.mimeType ?? 'text/plain',
574-
},
575-
{}
576-
).catch((error) => {
577-
logger.warn('Failed to retry stuck document', {
578-
documentId: doc.id,
579-
error: error instanceof Error ? error.message : String(error),
580-
})
597+
})),
598+
connector.knowledgeBaseId,
599+
{},
600+
crypto.randomUUID()
601+
)
602+
} catch (error) {
603+
logger.warn('Failed to enqueue stuck documents for reprocessing', {
604+
connectorId,
605+
count: stuckDocs.length,
606+
error: error instanceof Error ? error.message : String(error),
607+
})
608+
}
609+
}
610+
611+
// Enqueue all added/updated documents for processing in a single batch
612+
if (pendingProcessing.length > 0) {
613+
try {
614+
await processDocumentsWithQueue(
615+
pendingProcessing,
616+
connector.knowledgeBaseId,
617+
{},
618+
crypto.randomUUID()
619+
)
620+
} catch (error) {
621+
logger.warn('Failed to enqueue documents for processing — will retry on next sync', {
622+
connectorId,
623+
count: pendingProcessing.length,
624+
error: error instanceof Error ? error.message : String(error),
581625
})
582626
}
583627
}
584628

585629
await completeSyncLog(syncLogId, 'completed', result)
586630

631+
const [{ count: actualDocCount }] = await db
632+
.select({ count: sql<number>`count(*)::int` })
633+
.from(document)
634+
.where(
635+
and(
636+
eq(document.connectorId, connectorId),
637+
eq(document.userExcluded, false),
638+
isNull(document.archivedAt),
639+
isNull(document.deletedAt)
640+
)
641+
)
642+
587643
const now = new Date()
588644
await db
589645
.update(knowledgeConnector)
590646
.set({
591647
status: 'active',
592648
lastSyncAt: now,
593649
lastSyncError: null,
594-
lastSyncDocCount: externalDocs.length,
650+
lastSyncDocCount: actualDocCount,
595651
nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes),
596652
consecutiveFailures: 0,
597653
updatedAt: now,
@@ -711,7 +767,7 @@ async function addDocument(
711767
connectorType: string,
712768
extDoc: ExternalDocument,
713769
sourceConfig?: Record<string, unknown>
714-
): Promise<void> {
770+
): Promise<DocumentData> {
715771
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
716772
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
717773
}
@@ -773,23 +829,13 @@ async function addDocument(
773829
throw error
774830
}
775831

776-
processDocumentAsync(
777-
knowledgeBaseId,
832+
return {
778833
documentId,
779-
{
780-
filename: processingFilename,
781-
fileUrl,
782-
fileSize: contentBuffer.length,
783-
mimeType: 'text/plain',
784-
},
785-
{}
786-
).catch((error) => {
787-
logger.error('Failed to process connector document', {
788-
documentId,
789-
connectorId,
790-
error: error instanceof Error ? error.message : String(error),
791-
})
792-
})
834+
filename: processingFilename,
835+
fileUrl,
836+
fileSize: contentBuffer.length,
837+
mimeType: 'text/plain',
838+
}
793839
}
794840

795841
/**
@@ -803,7 +849,7 @@ async function updateDocument(
803849
connectorType: string,
804850
extDoc: ExternalDocument,
805851
sourceConfig?: Record<string, unknown>
806-
): Promise<void> {
852+
): Promise<DocumentData> {
807853
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
808854
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
809855
}
@@ -894,21 +940,11 @@ async function updateDocument(
894940
}
895941
}
896942

897-
processDocumentAsync(
898-
knowledgeBaseId,
899-
existingDocId,
900-
{
901-
filename: processingFilename,
902-
fileUrl,
903-
fileSize: contentBuffer.length,
904-
mimeType: 'text/plain',
905-
},
906-
{}
907-
).catch((error) => {
908-
logger.error('Failed to re-process updated connector document', {
909-
documentId: existingDocId,
910-
connectorId,
911-
error: error instanceof Error ? error.message : String(error),
912-
})
913-
})
943+
return {
944+
documentId: existingDocId,
945+
filename: processingFilename,
946+
fileUrl,
947+
fileSize: contentBuffer.length,
948+
mimeType: 'text/plain',
949+
}
914950
}

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ export interface DocumentData {
114114
}
115115

116116
export interface ProcessingOptions {
117-
chunkSize: number
118-
minCharactersPerChunk: number
119-
recipe: string
120-
lang: string
121-
chunkOverlap: number
117+
chunkSize?: number
118+
minCharactersPerChunk?: number
119+
recipe?: string
120+
lang?: string
121+
chunkOverlap?: number
122122
}
123123

124124
export interface DocumentJobData {
@@ -668,27 +668,40 @@ export function isTriggerAvailable(): boolean {
668668
export async function processDocumentsWithTrigger(
669669
documents: DocumentProcessingPayload[],
670670
requestId: string
671-
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
671+
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
672672
if (!isTriggerAvailable()) {
673673
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
674674
}
675675

676676
try {
677677
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
678678

679-
const jobPromises = documents.map(async (document) => {
680-
const job = await tasks.trigger('knowledge-process-document', document)
681-
return job.id
682-
})
683-
684-
const jobIds = await Promise.all(jobPromises)
679+
const MAX_BATCH_SIZE = 1000
680+
const batchIds: string[] = []
681+
682+
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
683+
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
684+
const batchResult = await tasks.batchTrigger(
685+
'knowledge-process-document',
686+
chunk.map((doc) => ({
687+
payload: doc,
688+
options: {
689+
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
690+
tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`],
691+
},
692+
}))
693+
)
694+
batchIds.push(batchResult.batchId)
695+
}
685696

686-
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
697+
logger.info(
698+
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
699+
)
687700

688701
return {
689702
success: true,
690703
message: `${documents.length} document processing jobs triggered`,
691-
jobIds,
704+
batchIds,
692705
}
693706
} catch (error) {
694707
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
@@ -1590,10 +1603,19 @@ export async function retryDocumentProcessing(
15901603
chunkOverlap: kbConfig.overlap,
15911604
}
15921605

1593-
processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch(
1594-
(error: unknown) => {
1595-
logger.error(`[${requestId}] Background retry processing error:`, error)
1596-
}
1606+
await processDocumentsWithQueue(
1607+
[
1608+
{
1609+
documentId,
1610+
filename: docData.filename,
1611+
fileUrl: docData.fileUrl,
1612+
fileSize: docData.fileSize,
1613+
mimeType: docData.mimeType,
1614+
},
1615+
],
1616+
knowledgeBaseId,
1617+
processingOptions,
1618+
requestId
15971619
)
15981620

15991621
logger.info(`[${requestId}] Document retry initiated: ${documentId}`)

0 commit comments

Comments
 (0)