CCIP-11513 hydrate Indexer with pending tasks#1130
Conversation
| Message: msg.Message, | ||
| MessageCCVAddresses: msg.MessageCCVAddresses, | ||
| } | ||
| task, err := NewTask(p.logger, vr, p.registry, p.storage, msg.Metadata.IngestionTimestamp.Add(p.scheduler.VerificationVisibilityWindow())) |
There was a problem hiding this comment.
In order to continue same TTL I call IngestionTimestamp + VerificationVisibilityWindow
| } | ||
| p.logger.Infow("Enqueueing new Message", "messageID", message.VerifierResult.MessageID.String()) | ||
| task, err := NewTask(p.logger, message.VerifierResult, p.registry, p.storage, p.scheduler.VerificationVisibilityWindow()) | ||
| task, err := NewTask(p.logger, message.VerifierResult, p.registry, p.storage, message.Metadata.IngestionTimestamp.Add(p.scheduler.VerificationVisibilityWindow())) |
There was a problem hiding this comment.
I think we here also need to use IngestionTimestamp + VerificationVisibilityWindow
IngestionTimestamp is the right timestamp (NTP-provided) for Task lifetime
61d8140 to
55777b8
Compare
55777b8 to
9750311
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves indexer restart reliability by hydrating the worker pool on startup with messages already persisted in PROCESSING state, allowing interrupted work to resume. It also adjusts task TTL handling to preserve the original ingestion-based deadline when tasks are restored or enqueued.
Changes:
- Added storage support to fetch
PROCESSINGmessages and hydrate them into the worker scheduler on startup. - Changed worker task construction to use an absolute TTL timestamp (derived from ingestion time + visibility window) instead of computing TTL from
time.Now(). - Updated tests/mocks and improved worker logging to use structured fields.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/mocks/mock_IndexerStorage.go | Adds mock support for the new GetProcessingMessages storage method. |
| indexer/pkg/worker/worker.go | Switches verifier summary logging to structured Infow. |
| indexer/pkg/worker/worker_pool.go | Hydrates pending PROCESSING messages from storage into the scheduler on startup; updates TTL handling for newly discovered messages. |
| indexer/pkg/worker/worker_pool_test.go | Updates tests for ingestion-based TTL and adds coverage for hydration behavior. |
| indexer/pkg/worker/task.go | Changes NewTask to accept an absolute TTL (time.Time) and preserves it across retries/restores. |
| indexer/pkg/worker/task_test.go | Updates task tests to use the new absolute TTL signature. |
| indexer/pkg/storage/postgres.go | Implements GetProcessingMessages query for PROCESSING messages. |
| indexer/pkg/common/storage.go | Extends the storage interface with GetProcessingMessages. |
Files not reviewed (1)
- internal/mocks/mock_IndexerStorage.go: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
b5d491b to
6bb8c78
Compare
6bb8c78 to
74cd32f
Compare
|
Code coverage report:
|
Description
Improves indexer restart reliability by hydrating the worker pool with messages that were already in PROCESSING state, so interrupted work can resume on startup.
Also adds storage support for fetching processing messages, updates task TTL handling so restored tasks keep the right deadline, refreshes tests/mocks, and includes a small logging cleanup.
Testing
Checklist
changelogdirectory)just lint fix- no new lint errorsjust generate- mocks and protobufs are up to date