Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexer/pkg/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-ccv/protocol"
)
Expand Down Expand Up @@ -47,6 +48,10 @@ type MessageStorageReader interface {
GetMessage(ctx context.Context, messageID protocol.Bytes32) (MessageWithMetadata, error)
// QueryMessages retrieves all messages that matches the filter set
QueryMessages(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) ([]MessageWithMetadata, error)
// GetProcessingMessages returns a page of messages currently in PROCESSING status
// whose ingestion_timestamp is after createdAfter (i.e. still within the visibility window).
// Used at startup to resume tasks that were interrupted before completion.
GetProcessingMessages(ctx context.Context, createdAfter time.Time, limit, offset uint64) ([]MessageWithMetadata, error)
}

// MessageStorageWriter provides the interface to update message state in storage.
Expand Down
6 changes: 6 additions & 0 deletions indexer/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type SchedulerConfig struct {
BaseDelay int `toml:"BaseDelay"`
// MaxDelay defines the maximum number of milliseconds to wait before retrying the message.
MaxDelay int `toml:"MaxDelay"`
// MaxHeapSize is the maximum number of delayed tasks the scheduler heap may hold at once.
// Schedule blocks until a slot is free; TrySchedule returns ErrSchedulerFull immediately.
// 0 means unbounded.
MaxHeapSize int `toml:"MaxHeapSize"`
}

type PoolConfig struct {
Expand All @@ -64,6 +68,8 @@ type PoolConfig struct {
// WorkerTimeout is the number of seconds a worker can attempt to retrieve verifications for
// Note: This value should always be higher then the maximum timeout on the slowest configured verifier.
WorkerTimeout int `toml:"WorkerTimeout"`
// HydrationBatchSize controls how many PROCESSING messages are loaded from storage per page during startup hydration.
HydrationBatchSize uint64 `toml:"HydrationBatchSize"`
}

// APIConfig provides all configuration for the API inside the indexer.
Expand Down
53 changes: 53 additions & 0 deletions indexer/pkg/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
opQueryCCVData = "QueryCCVData"
opBatchInsertCCVData = "BatchInsertCCVData"
opQueryMessages = "QueryMessages"
opGetProcessingMessages = "GetProcessingMessages"
opUpdateMessageStatus = "UpdateMessageStatus"
opCreateDiscoveryState = "CreateDiscoveryState"
opPersistDiscoveryBatch = "PersistDiscoveryBatch"
Expand Down Expand Up @@ -553,6 +554,58 @@ func (d *PostgresStorage) QueryMessages(
return results, nil
}

// GetProcessingMessages returns a page of messages with PROCESSING status whose
// ingestion_timestamp is after createdAfter.
func (d *PostgresStorage) GetProcessingMessages(ctx context.Context, createdAfter time.Time, limit, offset uint64) ([]common.MessageWithMetadata, error) {
startQueryMetric := time.Now()
var err error
defer func() {
d.monitoring.Metrics().RecordStorageQueryDuration(ctx, time.Since(startQueryMetric), opGetProcessingMessages, err != nil)
}()

query := `
SELECT
message_id,
message,
status,
lastErr,
source_chain_selector,
dest_chain_selector,
ingestion_timestamp,
message_ccv_addresses
FROM indexer.messages
WHERE status = $1
AND ingestion_timestamp > $2
ORDER BY ingestion_timestamp ASC
LIMIT $3 OFFSET $4
`

rows, err := d.queryContext(ctx, query, common.MessageProcessingString, createdAfter, limit, offset)
if err != nil {
d.lggr.Errorw("Failed to query processing messages", "error", err)
return nil, fmt.Errorf("failed to query processing messages: %w", err)
}
defer func() {
if cerr := rows.Close(); cerr != nil {
d.lggr.Errorw("Failed to close rows", "error", cerr)
}
}()

var results []common.MessageWithMetadata
for rows.Next() {
message, err := d.scanMessage(rows)
if err != nil {
return nil, fmt.Errorf("failed to scan message: %w", err)
}
results = append(results, message)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating over rows: %w", err)
}
return results, nil
}

// UpdateMessageStatus implements common.IndexerStorage.
func (d *PostgresStorage) UpdateMessageStatus(ctx context.Context, messageID protocol.Bytes32, status common.MessageStatus, lastErr string) error {
startUpdateMetric := time.Now()
Expand Down
71 changes: 67 additions & 4 deletions indexer/pkg/worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// ErrSchedulerFull is returned by TrySchedule when the heap has reached MaxHeapSize.
var ErrSchedulerFull = errors.New("scheduler heap is full")

type Scheduler struct {
lggr logger.Logger
config config.SchedulerConfig
Expand All @@ -20,6 +23,9 @@ type Scheduler struct {
delayHeap *DelayHeap
ready chan *Task
dlq chan *Task
// slots is a counting semaphore that bounds the heap size.
// nil means unbounded (MaxHeapSize == 0).
slots chan struct{}
wg sync.WaitGroup
startOnce sync.Once
stopOnce sync.Once
Expand All @@ -35,6 +41,11 @@ func NewScheduler(lggr logger.Logger, config config.SchedulerConfig) (*Scheduler
delayHeap := &DelayHeap{}
heap.Init(delayHeap)

var slots chan struct{}
if config.MaxHeapSize > 0 {
slots = make(chan struct{}, config.MaxHeapSize)
}

return &Scheduler{
lggr: lggr,
config: config,
Expand All @@ -43,6 +54,7 @@ func NewScheduler(lggr logger.Logger, config config.SchedulerConfig) (*Scheduler
stopCh: make(chan struct{}),
ready: make(chan *Task, 1),
dlq: make(chan *Task, 1),
slots: slots,
}, nil
}

Expand Down Expand Up @@ -80,6 +92,11 @@ func (s *Scheduler) run(ctx context.Context) {
tasks := s.delayHeap.PopAllReady()
s.mu.Unlock()
for _, task := range tasks {
// Release the heap slot before blocking on the ready channel so
// that new tasks can be scheduled while we wait for a worker.
if s.slots != nil {
<-s.slots
}
select {
case s.ready <- task:
case <-s.stopCh:
Expand Down Expand Up @@ -137,30 +154,76 @@ func (s *Scheduler) DLQ() <-chan *Task {
return s.dlq
}

// Enqueue enqueues t for execution. If the heap is at capacity (MaxHeapSize > 0)
// it blocks until a slot becomes available or ctx is canceled.
func (s *Scheduler) Enqueue(ctx context.Context, t *Task) error {
if t == nil {
return errors.New("cannot enqueue nil task")
}
shouldEnqueue, delay := s.shouldEnqueue(t)
if !shouldEnqueue {
s.dlq <- t
return errors.New("unable to enqueue, max attempts reached. sending to dlq")
return errors.New("task TTL expired, sent to DLQ")
}

t.attempt++
t.runAt = time.Now().Add(delay)

// If there is no delay, the task is ready immediately and should be sent to the ready channel.
if delay == 0 {
select {
case s.ready <- t:
return nil
case <-ctx.Done():
return errors.New("unable to enqueue, context deadline exceeded")
return fmt.Errorf("enqueue cancelled: %w", ctx.Err())
}
}

if s.slots != nil {
select {
case s.slots <- struct{}{}:
case <-ctx.Done():
return fmt.Errorf("enqueue cancelled waiting for heap slot: %w", ctx.Err())
}
}

s.mu.Lock()
defer s.mu.Unlock()
heap.Push(s.delayHeap, t)
return nil
}

// TryEnqueue enqueues t for execution. If the heap is at capacity it returns
// ErrSchedulerFull immediately without blocking.
func (s *Scheduler) TryEnqueue(t *Task) error {
if t == nil {
return errors.New("cannot enqueue nil task")
}
shouldEnqueue, delay := s.shouldEnqueue(t)
if !shouldEnqueue {
s.dlq <- t
return errors.New("task TTL expired, sent to DLQ")
}

t.attempt++
t.runAt = time.Now().Add(delay)

if delay == 0 {
select {
case s.ready <- t:
return nil
default:
return ErrSchedulerFull
}
}

if s.slots != nil {
select {
case s.slots <- struct{}{}:
default:
return ErrSchedulerFull
}
}

// Otherwise schedule for future execution on the delay heap
s.mu.Lock()
defer s.mu.Unlock()
heap.Push(s.delayHeap, t)
Expand Down
4 changes: 2 additions & 2 deletions indexer/pkg/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type TaskResult struct {
UnavailableCCVs int
}

func NewTask(lggr logger.Logger, message protocol.VerifierResult, registry *registry.VerifierRegistry, storage common.IndexerStorage, verificationVisabilityWindow time.Duration) (*Task, error) {
func NewTask(lggr logger.Logger, message protocol.VerifierResult, registry *registry.VerifierRegistry, storage common.IndexerStorage, ttl time.Time) (*Task, error) {
if lggr == nil {
return nil, fmt.Errorf("logger is required")
}
Expand All @@ -53,7 +53,7 @@ func NewTask(lggr logger.Logger, message protocol.VerifierResult, registry *regi
storage: storage,
attempt: 0,
lastErr: nil,
ttl: time.Now().Add(verificationVisabilityWindow),
ttl: ttl,
}, nil
}

Expand Down
14 changes: 7 additions & 7 deletions indexer/pkg/worker/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,25 @@ func TestNewTask_NilValidations(t *testing.T) {
ms := mocks.NewMockIndexerStorage(t)

t.Run("nil logger", func(t *testing.T) {
_, err := NewTask(nil, msg, reg, ms, time.Second)
_, err := NewTask(nil, msg, reg, ms, time.Now().Add(time.Second))
require.Error(t, err)
require.Contains(t, err.Error(), "logger is required")
})

t.Run("nil registry", func(t *testing.T) {
_, err := NewTask(lggr, msg, nil, ms, time.Second)
_, err := NewTask(lggr, msg, nil, ms, time.Now().Add(time.Second))
require.Error(t, err)
require.Contains(t, err.Error(), "registry is required")
})

t.Run("nil storage", func(t *testing.T) {
_, err := NewTask(lggr, msg, reg, nil, time.Second)
_, err := NewTask(lggr, msg, reg, nil, time.Now().Add(time.Second))
require.Error(t, err)
require.Contains(t, err.Error(), "storage is required")
})

t.Run("all valid", func(t *testing.T) {
task, err := NewTask(lggr, msg, reg, ms, time.Second)
task, err := NewTask(lggr, msg, reg, ms, time.Now().Add(time.Second))
require.NoError(t, err)
require.NotNil(t, task)
})
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestCollectVerifierResults(t *testing.T) {
lggr := logger.Test(t)
msg := protocol.VerifierResult{}
ms := mocks.NewMockIndexerStorage(t)
task, err := NewTask(lggr, msg, registry.NewVerifierRegistry(), ms, time.Second)
task, err := NewTask(lggr, msg, registry.NewVerifierRegistry(), ms, time.Now().Add(time.Second))
require.NoError(t, err)

mid := task.messageID
Expand All @@ -257,7 +257,7 @@ func TestCollectVerifierResults(t *testing.T) {
require.NoError(t, reg.AddVerifier(addr, "test-verifier", r))

// create task with our registry
task2, err := NewTask(lggr, msg, reg, mocks.NewMockIndexerStorage(t), time.Second)
task2, err := NewTask(lggr, msg, reg, mocks.NewMockIndexerStorage(t), time.Now().Add(time.Second))
require.NoError(t, err)

res := task2.collectVerifierResults(context.Background(), []*readers.VerifierReader{r})
Expand All @@ -284,7 +284,7 @@ func TestCollectVerifierResults(t *testing.T) {
require.NoError(t, aerr)
require.NoError(t, reg.AddVerifier(addr2, "verifier-2", r1))

task3, err := NewTask(lggr, msg, reg, mocks.NewMockIndexerStorage(t), time.Second)
task3, err := NewTask(lggr, msg, reg, mocks.NewMockIndexerStorage(t), time.Now().Add(time.Second))
require.NoError(t, err)

res := task3.collectVerifierResults(context.Background(), []*readers.VerifierReader{r1, r2})
Expand Down
10 changes: 6 additions & 4 deletions indexer/pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func Execute(ctx context.Context, task *Task) (*TaskResult, error) {
}

// Log out useful information about this run
task.logger.Infof("Source Specified CCVs %s", totalVerifiers)
task.logger.Infof("Exisiting Verifications %s", existingVerifiers)
task.logger.Infof("Attempting to Retrieve %s", attemptingToRetrieve)
task.logger.Infof("Unknown CCVs %s", unknownCCVs)
task.logger.Infow("Task verifier summary",
"sourceCCVs", totalVerifiers,
"existingVerifications", existingVerifiers,
"attemptingToRetrieve", attemptingToRetrieve,
"unknownCCVs", unknownCCVs,
)

// Process all verifier calls concurrently and collect successful results.
// Each verifier reader returns a channel that will emit one result when ready.
Expand Down
Loading
Loading