Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 93 additions & 57 deletions apps/sim/lib/knowledge/connectors/sync-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import {
knowledgeConnectorSyncLog,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm'
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
import { decryptApiKey } from '@/lib/api-key/crypto'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import type { DocumentData } from '@/lib/knowledge/documents/service'
import {
hardDeleteDocuments,
isTriggerAvailable,
processDocumentAsync,
processDocumentsWithQueue,
} from '@/lib/knowledge/documents/service'
import { StorageService } from '@/lib/uploads'
import { deleteFile } from '@/lib/uploads/core/storage-service'
Expand All @@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error {
const SYNC_BATCH_SIZE = 5
const MAX_PAGES = 500
const MAX_SAFE_TITLE_LENGTH = 200
const STALE_PROCESSING_MINUTES = 45
const RETRY_WINDOW_DAYS = 7

/** Sanitizes a document title for use in S3 storage keys. */
function sanitizeStorageTitle(title: string): string {
Expand Down Expand Up @@ -147,11 +150,14 @@ export async function dispatchSync(
const requestId = options?.requestId ?? crypto.randomUUID()

if (isTriggerAvailable()) {
await knowledgeConnectorSync.trigger({
connectorId,
fullSync: options?.fullSync,
requestId,
})
await knowledgeConnectorSync.trigger(
{
connectorId,
fullSync: options?.fullSync,
requestId,
},
{ tags: [`connector:${connectorId}`] }
)
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
} else {
executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => {
Expand Down Expand Up @@ -395,6 +401,8 @@ export async function executeSync(

const seenExternalIds = new Set<string>()

const pendingProcessing: DocumentData[] = []

const pendingOps: DocOp[] = []
for (const extDoc of externalDocs) {
seenExternalIds.add(extDoc.externalId)
Expand Down Expand Up @@ -503,6 +511,7 @@ export async function executeSync(
for (let j = 0; j < settled.length; j++) {
const outcome = settled[j]
if (outcome.status === 'fulfilled') {
pendingProcessing.push(outcome.value)
if (batch[j].type === 'add') result.docsAdded++
else result.docsUpdated++
} else {
Expand Down Expand Up @@ -537,9 +546,14 @@ export async function executeSync(
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
}

// Retry stuck documents that failed or never completed processing.
// Retry stuck documents that failed, never started, or were abandoned mid-processing.
// Only retry docs uploaded BEFORE this sync — docs added in the current sync
// are still processing asynchronously and would cause a duplicate processing race.
// Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered
// abandoned (e.g. the Trigger.dev task process exited before processing completed).
// Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried.
const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000)
const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000)
const stuckDocs = await db
.select({
id: document.id,
Expand All @@ -552,8 +566,18 @@ export async function executeSync(
.where(
and(
eq(document.connectorId, connectorId),
inArray(document.processingStatus, ['pending', 'failed']),
or(
inArray(document.processingStatus, ['pending', 'failed']),
and(
eq(document.processingStatus, 'processing'),
or(
isNull(document.processingStartedAt),
lt(document.processingStartedAt, staleProcessingCutoff)
)
)
),
lt(document.uploadedAt, syncStartedAt),
gt(document.uploadedAt, retryCutoff),
eq(document.userExcluded, false),
isNull(document.archivedAt),
isNull(document.deletedAt)
Expand All @@ -562,36 +586,68 @@ export async function executeSync(

if (stuckDocs.length > 0) {
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
for (const doc of stuckDocs) {
processDocumentAsync(
connector.knowledgeBaseId,
doc.id,
{
try {
await processDocumentsWithQueue(
stuckDocs.map((doc) => ({
documentId: doc.id,
filename: doc.filename ?? 'document.txt',
fileUrl: doc.fileUrl ?? '',
fileSize: doc.fileSize ?? 0,
mimeType: doc.mimeType ?? 'text/plain',
},
{}
).catch((error) => {
logger.warn('Failed to retry stuck document', {
documentId: doc.id,
error: error instanceof Error ? error.message : String(error),
})
})),
connector.knowledgeBaseId,
{},
crypto.randomUUID()
)
} catch (error) {
logger.warn('Failed to enqueue stuck documents for reprocessing', {
connectorId,
count: stuckDocs.length,
error: error instanceof Error ? error.message : String(error),
})
}
}

// Enqueue all added/updated documents for processing in a single batch
if (pendingProcessing.length > 0) {
try {
await processDocumentsWithQueue(
pendingProcessing,
connector.knowledgeBaseId,
{},
crypto.randomUUID()
)
} catch (error) {
logger.warn('Failed to enqueue documents for processing — will retry on next sync', {
connectorId,
count: pendingProcessing.length,
error: error instanceof Error ? error.message : String(error),
})
}
}

await completeSyncLog(syncLogId, 'completed', result)

const [{ count: actualDocCount }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(document)
.where(
and(
eq(document.connectorId, connectorId),
eq(document.userExcluded, false),
isNull(document.archivedAt),
isNull(document.deletedAt)
)
)

const now = new Date()
await db
.update(knowledgeConnector)
.set({
status: 'active',
lastSyncAt: now,
lastSyncError: null,
lastSyncDocCount: externalDocs.length,
lastSyncDocCount: actualDocCount,
nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes),
consecutiveFailures: 0,
updatedAt: now,
Expand Down Expand Up @@ -711,7 +767,7 @@ async function addDocument(
connectorType: string,
extDoc: ExternalDocument,
sourceConfig?: Record<string, unknown>
): Promise<void> {
): Promise<DocumentData> {
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
}
Expand Down Expand Up @@ -773,23 +829,13 @@ async function addDocument(
throw error
}

processDocumentAsync(
knowledgeBaseId,
return {
documentId,
{
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
},
{}
).catch((error) => {
logger.error('Failed to process connector document', {
documentId,
connectorId,
error: error instanceof Error ? error.message : String(error),
})
})
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
}
}

/**
Expand All @@ -803,7 +849,7 @@ async function updateDocument(
connectorType: string,
extDoc: ExternalDocument,
sourceConfig?: Record<string, unknown>
): Promise<void> {
): Promise<DocumentData> {
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
}
Expand Down Expand Up @@ -894,21 +940,11 @@ async function updateDocument(
}
}

processDocumentAsync(
knowledgeBaseId,
existingDocId,
{
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
},
{}
).catch((error) => {
logger.error('Failed to re-process updated connector document', {
documentId: existingDocId,
connectorId,
error: error instanceof Error ? error.message : String(error),
})
})
return {
documentId: existingDocId,
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
}
}
58 changes: 40 additions & 18 deletions apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ export interface DocumentData {
}

export interface ProcessingOptions {
chunkSize: number
minCharactersPerChunk: number
recipe: string
lang: string
chunkOverlap: number
chunkSize?: number
minCharactersPerChunk?: number
recipe?: string
lang?: string
chunkOverlap?: number
}

export interface DocumentJobData {
Expand Down Expand Up @@ -668,27 +668,40 @@ export function isTriggerAvailable(): boolean {
export async function processDocumentsWithTrigger(
documents: DocumentProcessingPayload[],
requestId: string
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
if (!isTriggerAvailable()) {
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
}

try {
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)

const jobPromises = documents.map(async (document) => {
const job = await tasks.trigger('knowledge-process-document', document)
return job.id
})

const jobIds = await Promise.all(jobPromises)
const MAX_BATCH_SIZE = 1000
const batchIds: string[] = []

for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
const batchResult = await tasks.batchTrigger(
'knowledge-process-document',
chunk.map((doc) => ({
payload: doc,
options: {
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`],
},
}))
)
batchIds.push(batchResult.batchId)
}

logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
logger.info(
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
)

return {
success: true,
message: `${documents.length} document processing jobs triggered`,
jobIds,
batchIds,
}
} catch (error) {
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
Expand Down Expand Up @@ -1590,10 +1603,19 @@ export async function retryDocumentProcessing(
chunkOverlap: kbConfig.overlap,
}

processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch(
(error: unknown) => {
logger.error(`[${requestId}] Background retry processing error:`, error)
}
await processDocumentsWithQueue(
[
{
documentId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
},
],
knowledgeBaseId,
processingOptions,
requestId
)

logger.info(`[${requestId}] Document retry initiated: ${documentId}`)
Expand Down
Loading