Skip to content

fix(knowledge): route connector doc processing through queue instead of fire-and-forget#3754

Merged
waleedlatif1 merged 4 commits intostagingfrom
waleedlatif1/fix-connector-doc-processing
Mar 25, 2026
Merged

fix(knowledge): route connector doc processing through queue instead of fire-and-forget#3754
waleedlatif1 merged 4 commits intostagingfrom
waleedlatif1/fix-connector-doc-processing

Conversation

@waleedlatif1
Copy link
Collaborator

Summary

  • Fix connector documents getting stuck in "processing" — fire-and-forget processDocumentAsync calls were killed when the Trigger.dev sync task exited
  • Route all document processing through processDocumentsWithQueue (Trigger.dev → Redis → in-memory) so each doc gets its own task lifecycle
  • Batch all new/updated documents into a single processDocumentsWithQueue call per sync instead of N+1 individual calls
  • Expand stuck doc retry to include docs abandoned mid-processing (>45 min stale), with a 7-day retry cap
  • Fix lastSyncDocCount to use actual DB count instead of API response count
  • Switch processDocumentsWithTrigger to use batchTrigger with idempotency keys
  • Fix retryDocumentProcessing to use processDocumentsWithQueue instead of fire-and-forget

Type of Change

  • Bug fix

Testing

Tested manually

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@cursor
Copy link

cursor bot commented Mar 25, 2026

PR Summary

Medium Risk
Changes the connector sync and document-processing dispatch path to enqueue work via processDocumentsWithQueue/Trigger.dev batching, which can affect background job execution and retry behavior. Moderate risk due to new batching/idempotency semantics and expanded criteria for reprocessing stuck documents.

Overview
Connector sync now enqueues document processing via the queue instead of fire-and-forget async calls. addDocument/updateDocument return DocumentData, the sync collects added/updated docs and submits them in one processDocumentsWithQueue batch after the sync completes.

Stuck document retries are broadened and safer. Retries now include stale processing docs (>45 minutes) in addition to pending/failed, exclude very old uploads (>7 days), and enqueue retries in bulk.

Trigger.dev dispatch is optimized and more observable. Sync triggers are tagged, processDocumentsWithTrigger uses batchTrigger with per-document idempotency keys/tags, ProcessingOptions fields are made optional, and connector lastSyncDocCount is set from an actual DB count rather than the external API list length.

Written by Cursor Bugbot for commit 85d9fcb. Configure here.

@vercel
Copy link

vercel bot commented Mar 25, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Mar 25, 2026 3:55am

Request Review

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 25, 2026

Greptile Summary

This PR eliminates the root cause of connector documents getting permanently stuck in 'processing' by replacing all fire-and-forget processDocumentAsync calls with properly awaited processDocumentsWithQueue calls that route through Trigger.dev → Redis → in-memory, ensuring each document gets its own managed task lifecycle. It also tightens the stuck-doc retry heuristic (adds stale-processing detection at 45 min, 7-day retry cap), fixes lastSyncDocCount to reflect the true DB count, and upgrades processDocumentsWithTrigger to use batchTrigger with idempotency keys.

  • addDocument / updateDocument now return DocumentData instead of firing processing inline; the caller (executeSync) collects these into pendingProcessing[] and enqueues them in a single batch after the sync loop.
  • Stuck-doc retry expanded to cover docs stuck in 'processing' for > 45 minutes (handles crashed Trigger.dev tasks), with a 7-day upload-age cap to avoid retrying very old orphaned records.
  • processDocumentsWithTrigger switched from N sequential tasks.trigger calls to chunked tasks.batchTrigger with per-document idempotency keys (doc-process-{documentId}-{requestId}), significantly reducing API round-trips.
  • retryDocumentProcessing upgraded from fire-and-forget to await processDocumentsWithQueue, so errors now surface to the caller rather than being silently swallowed.
  • ProcessingOptions fields made optional to cleanly support the sync engine passing {} (KB config is used as the fallback inside processDocumentAsync).
  • All three issues from the prior review round (sync log ordering, NULL processingStartedAt, jobIdsbatchIds rename) are confirmed resolved in this HEAD.

Confidence Score: 5/5

  • Safe to merge — all prior review concerns are resolved and the logic is sound with proper error handling throughout.
  • All three issues from the previous review round are confirmed fixed at HEAD (sync log ordering, NULL processingStartedAt guard, batchIds rename). The core refactor correctly defers document processing to after the sync loop, eliminates fire-and-forget patterns, and adds proper fallback/error handling at each queue step. The only remaining item is a P2 suggestion about MAX_BATCH_SIZE potentially exceeding Trigger.dev's per-batch limit, which is non-blocking and easily verified/fixed.
  • No files require special attention beyond the minor MAX_BATCH_SIZE note in service.ts.

Important Files Changed

Filename Overview
apps/sim/lib/knowledge/connectors/sync-engine.ts Core sync logic refactored: removes all fire-and-forget processDocumentAsync calls, collects DocumentData returns from addDocument/updateDocument into pendingProcessing, then batches all processing through processDocumentsWithQueue after the sync loop. Stuck-doc retry now covers stale-processing docs (>45 min) with a 7-day cap. lastSyncDocCount fixed to use actual DB count. completeSyncLog correctly called after both queue enqueues. Previous review concerns all addressed.
apps/sim/lib/knowledge/documents/service.ts Switches processDocumentsWithTrigger from N individual tasks.trigger calls to chunked tasks.batchTrigger with idempotency keys; renames return field from jobIds to batchIds. ProcessingOptions fields made optional to support empty-options calls from the sync engine. retryDocumentProcessing upgraded from fire-and-forget to await processDocumentsWithQueue. Minor concern: MAX_BATCH_SIZE = 1000 may exceed Trigger.dev's per-batch limit.

Sequence Diagram

sequenceDiagram
    participant S as executeSync
    participant DB as Database
    participant Q as processDocumentsWithQueue
    participant T as Trigger.dev (batchTrigger)
    participant R as Redis/In-memory Queue
    participant W as processDocumentAsync

    S->>DB: fetch existing docs & external docs
    S->>DB: addDocument / updateDocument (returns DocumentData)
    S->>S: collect into pendingProcessing[]

    S->>DB: query stuckDocs (pending/failed/stale-processing, <7 days old)
    alt stuckDocs.length > 0
        S->>Q: processDocumentsWithQueue(stuckDocs, {})
        alt Trigger.dev available
            Q->>T: batchTrigger (chunks of 500, idempotency keys)
            T-->>W: processDocumentAsync per doc
        else fallback
            Q->>R: addJob per doc
            R-->>W: processDocumentAsync per doc
        end
    end

    alt pendingProcessing.length > 0
        S->>Q: processDocumentsWithQueue(pendingProcessing, {})
        alt Trigger.dev available
            Q->>T: batchTrigger (chunks of 500, idempotency keys)
            T-->>W: processDocumentAsync per doc
        else fallback
            Q->>R: addJob per doc
            R-->>W: processDocumentAsync per doc
        end
    end

    S->>DB: completeSyncLog('completed')
    S->>DB: count actual docs → lastSyncDocCount
    S->>DB: update knowledgeConnector status
Loading

Reviews (3): Last reviewed commit: "fix(knowledge): move completeSyncLog aft..." | Re-trigger Greptile

@waleedlatif1
Copy link
Collaborator Author

@greptile

@waleedlatif1
Copy link
Collaborator Author

@cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

@waleedlatif1
Copy link
Collaborator Author

@greptile

@waleedlatif1 waleedlatif1 merged commit 7af7a22 into staging Mar 25, 2026
11 checks passed
@waleedlatif1 waleedlatif1 deleted the waleedlatif1/fix-connector-doc-processing branch March 25, 2026 04:07
Sg312 pushed a commit that referenced this pull request Mar 25, 2026
…of fire-and-forget (#3754)

* fix(knowledge): route connector doc processing through queue instead of fire-and-forget

* fix(knowledge): rename jobIds to batchIds in processDocumentsWithTrigger return type

* improvement(knowledge): add Trigger.dev tags for connector sync and document processing tasks

* fix(knowledge): move completeSyncLog after doc enqueue, handle NULL processingStartedAt in stuck doc query
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant