Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/chainaccessor/config_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func processSourceChainConfigResults(

v, err := sourceChainResults[i].GetResult()
if err != nil {
lggr.Errorw("Failed to get source chain config from result",
lggr.Warnw("Failed to get source chain config from result",
"chain", chain,
"error", err)
Comment on lines +81 to 83
continue
Expand Down
9 changes: 6 additions & 3 deletions pkg/reader/ccip.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,15 @@ func newCCIPChainReaderWithConfigPollerInternal(
lggr.Errorw("failed to sync contracts", "err", err)
}

// After contracts are synced, start the background polling
// After contracts are synced, start the background polling.
// The config poller is load-bearing: without the background refresh, transiently
// missing source chain configs would only recover via the inline-retry backoff
// inside configPollerV2.GetOfframpSourceChainConfigs, which is best-effort. Fail
// reader initialization rather than silently degrade.
lggr.Info("Starting config background polling")
if err := reader.configPoller.Start(ctx); err != nil {
// Log the error but don't fail - we can still function without background polling
// by fetching configs on demand
lggr.Errorw("failed to start config background polling", "err", err)
return nil, fmt.Errorf("start config background polling: %w", err)
}

return reader, nil
Expand Down
38 changes: 31 additions & 7 deletions pkg/reader/config_poller_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
sourceChainMu sync.RWMutex
staticSourceChainConfigs map[cciptypes.ChainSelector]StaticSourceChainConfig
sourceChainRefresh time.Time // Single timestamp for all source chain configs
// attemptedSourceChains records the wall-clock time of the most recent fetch attempt
// that included each source chain. Chains in this map that are NOT in
// staticSourceChainConfigs were either unconfigured on-chain or had an RPC error.
// Re-fetching inline on every call creates a doom-loop of redundant RPC calls and
// error log spam, so we suppress inline retries when the last attempt was within
// configPollerV2.refreshPeriod. After that backoff elapses, inline retries resume so
// that recovery does not depend solely on the background poller being healthy.
attemptedSourceChains map[cciptypes.ChainSelector]time.Time
}

// newConfigPollerV2 creates a new instance of configPollerV2 with improved batch fetching capabilities.
Expand Down Expand Up @@ -251,7 +259,7 @@
//
// Returns a map of chain selectors to their static source chain configurations,
// or an error if the operation fails.
func (c *configPollerV2) GetOfframpSourceChainConfigs(

Check failure on line 262 in pkg/reader/config_poller_v2.go

View workflow job for this annotation

GitHub Actions / build-lint-test

cyclomatic complexity 14 of func `(*configPollerV2).GetOfframpSourceChainConfigs` is high (> 13) (gocyclo)
ctx context.Context,
destChain cciptypes.ChainSelector,
sourceChains []cciptypes.ChainSelector,
Expand Down Expand Up @@ -297,16 +305,22 @@
staticSourceChainConfig, exists := destChainCache.staticSourceChainConfigs[chain]
if exists {
cachedSourceConfigs[chain] = staticSourceChainConfig
} else {
// This chain isn't in cache yet
continue
}
// Chain isn't in cache. Suppress the inline re-fetch only if we tried to fetch it
// recently (within refreshPeriod). After that backoff has elapsed we resume inline
// retries so recovery still happens even if the background poller is unhealthy or
// has been killed. Chains never attempted always trigger an inline fetch.
lastAttempted, attempted := destChainCache.attemptedSourceChains[chain]
if !attempted || time.Since(lastAttempted) >= c.refreshPeriod {
missingChains = append(missingChains, chain)
}
}

// If all chains are in cache, return them immediately
// If all chains are in cache or were attempted within the suppression window, return immediately
if len(missingChains) == 0 {
destChainCache.sourceChainMu.RUnlock()
c.lggr.Debugw("All source chain configs found in cache",
c.lggr.Debugw("All source chain configs found in cache or attempted within backoff window",
"destChain", c.destChainSelector,
"sourceChains", filteredSourceChains)
return cachedSourceConfigs, nil
Expand Down Expand Up @@ -354,6 +368,7 @@

cache := &chainCache{
staticSourceChainConfigs: make(map[cciptypes.ChainSelector]StaticSourceChainConfig),
attemptedSourceChains: make(map[cciptypes.ChainSelector]time.Time),
}
c.chainCaches[chainSel] = cache
return cache
Expand Down Expand Up @@ -446,14 +461,23 @@
cache.chainConfigMu.Unlock()

// Acquire StaticSourceChainConfigs lock and update
if fetchingForDestChain && len(sourceChainConfigs) > 0 {
if fetchingForDestChain {
now := time.Now()
cache.sourceChainMu.Lock()
for chain, cfg := range sourceChainConfigs {
cache.staticSourceChainConfigs[chain] = staticSourceChainConfigFromSourceChainConfig(cfg)
}
cache.sourceChainRefresh = time.Now()
// Record the time of this fetch attempt for every source chain we asked about.
// Chains that were attempted but not returned (due to RPC errors or being
// unconfigured on-chain) won't trigger inline re-fetches for the next
// refreshPeriod, after which inline retries resume so recovery doesn't depend
// solely on the background poller.
for _, chain := range sourceChainSelectors {
cache.attemptedSourceChains[chain] = now
}
cache.sourceChainRefresh = now
cache.sourceChainMu.Unlock()
} else if !fetchingForDestChain && len(sourceChainConfigs) > 0 {
} else if len(sourceChainConfigs) > 0 {
c.lggr.Errorw("OffRamp SourceChainConfigs were returned when fetching configs from a source chain, "+
"this is not expected",
"destChainSelector", c.destChainSelector,
Expand Down
290 changes: 290 additions & 0 deletions pkg/reader/config_poller_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,293 @@ func chainSelectorSliceMatcher(expected []cciptypes.ChainSelector) func([]ccipty
return true
}
}

// TestConfigPollerV2_FailedSourceChainDoesNotRetriggerInlineFetch verifies that when a source chain
// config fetch fails (e.g. RPC error), subsequent calls to GetOfframpSourceChainConfigs do NOT
// trigger another inline batchRefreshChainAndSourceConfigs call within the suppression window.
// This prevents the "doom-loop" where persistently failing chains cause redundant RPC calls
// and error log spam on every tick.
func TestConfigPollerV2_FailedSourceChainDoesNotRetriggerInlineFetch(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
// Use a long refresh period so the inline-retry backoff comfortably covers the test
// (suppression is `time.Since(lastAttempted) < refreshPeriod`).
cPollerV2.refreshPeriod = 30 * time.Second
ctx := context.Background()

sourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2}
expectedChainConfig := createMockChainConfigSnapshot()

// First fetch: sourceChain1 succeeds, sourceChain2 is missing from the response (simulates RPC failure
// where processSourceChainConfigResults skips the chain due to an error).
partialSourceConfigs := createMockSourceChainConfigs([]cciptypes.ChainSelector{sourceChain1})

accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, partialSourceConfigs, nil).Once()

// First call: triggers inline fetch since nothing is cached
configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
// Only sourceChain1 should be returned (sourceChain2 failed)
assert.Len(t, configs, 1)
assert.Contains(t, configs, sourceChain1)
assert.NotContains(t, configs, sourceChain2)

// Second call with the SAME chains: should NOT trigger another GetAllConfigsLegacy call
// because sourceChain2 was already attempted (it's in attemptedSourceChains).
configs2, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
// Still only sourceChain1 (sourceChain2 remains failed but no re-fetch happened)
assert.Len(t, configs2, 1)
assert.Contains(t, configs2, sourceChain1)

// Verify GetAllConfigsLegacy was only called ONCE total, the second call did not trigger a re-fetch
accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 1)
}

// TestConfigPollerV2_BackgroundPollerRetriesFailedSourceChains verifies that the background
// poller successfully retries and recovers source chains that previously failed.
func TestConfigPollerV2_BackgroundPollerRetriesFailedSourceChains(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
ctx := context.Background()

sourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2}
expectedChainConfig := createMockChainConfigSnapshot()
emptySourceConfigs := make(map[cciptypes.ChainSelector]cciptypes.SourceChainConfig)

// First fetch: only sourceChain1 succeeds (sourceChain2 is missing, simulates RPC failure)
partialSourceConfigs := createMockSourceChainConfigs([]cciptypes.ChainSelector{sourceChain1})

accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, partialSourceConfigs, nil).Once()

// Initial inline fetch
configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs, 1)
assert.Contains(t, configs, sourceChain1)

// Now simulate the background poller succeeding for both chains on the dest chain refresh
fullSourceConfigs := createMockSourceChainConfigs(sourceChains)
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, fullSourceConfigs, nil).Maybe()

// refreshAllKnownChains also refreshes individual source chains (not just dest),
// so we need to mock those too.
emptyChains := make([]cciptypes.ChainSelector, 0)
for _, chain := range sourceChains {
accessors[chain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(emptyChains))).
Return(createMockChainConfigSnapshot(), emptySourceConfigs, nil).Maybe()
}

// Simulate background poller running
cPollerV2.refreshAllKnownChains()

// Now GetOfframpSourceChainConfigs should return BOTH chains from cache
configs2, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs2, 2)
assert.Contains(t, configs2, sourceChain1)
assert.Contains(t, configs2, sourceChain2)
}

// TestConfigPollerV2_NewSourceChainStillTriggersInlineFetch verifies that a truly new source
// chain (never attempted before) still triggers an inline fetch, even if other chains have
// already been attempted.
func TestConfigPollerV2_NewSourceChainStillTriggersInlineFetch(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
ctx := context.Background()

initialChains := []cciptypes.ChainSelector{sourceChain1}
expectedChainConfig := createMockChainConfigSnapshot()
initialSourceConfigs := createMockSourceChainConfigs(initialChains)

// First fetch: sourceChain1 only
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(initialChains))).
Return(expectedChainConfig, initialSourceConfigs, nil).Once()

configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, initialChains)
require.NoError(t, err)
assert.Len(t, configs, 1)

// Now request with a NEW chain (sourceChain3) that was never attempted
newChains := []cciptypes.ChainSelector{sourceChain1, sourceChain3}
allKnownChains := []cciptypes.ChainSelector{sourceChain1, sourceChain3}
allSourceConfigs := createMockSourceChainConfigs(allKnownChains)

accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(allKnownChains))).
Return(expectedChainConfig, allSourceConfigs, nil).Once()

// This should trigger an inline fetch because sourceChain3 was never attempted
configs2, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, newChains)
require.NoError(t, err)
assert.Len(t, configs2, 2)
assert.Contains(t, configs2, sourceChain1)
assert.Contains(t, configs2, sourceChain3)

// Verify GetAllConfigsLegacy was called twice (once for initial, once for new chain)
accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 2)
}

// TestConfigPollerV2_AllSourceChainsFail_NoInlineRetry verifies that when ALL source chains
// fail on the initial fetch, subsequent calls within the suppression window still don't
// trigger inline re-fetches.
func TestConfigPollerV2_AllSourceChainsFail_NoInlineRetry(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
cPollerV2.refreshPeriod = 30 * time.Second
ctx := context.Background()

sourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2}
expectedChainConfig := createMockChainConfigSnapshot()

// All source chains fail, empty map returned (simulates all RPC calls failing)
emptySourceConfigs := make(map[cciptypes.ChainSelector]cciptypes.SourceChainConfig)

accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, emptySourceConfigs, nil).Once()

// First call: triggers inline fetch
configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Empty(t, configs)

// Second call: should NOT trigger another fetch
configs2, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Empty(t, configs2)

// Verify only one call was made
accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 1)
}

// TestConfigPollerV2_BatchErrorDoesNotPoisonAttemptedSet verifies that when the top-level
// GetAllConfigsLegacy call returns an error (as opposed to a successful response with partial
// per-chain failures), attemptedSourceChains is NOT populated. Inline retries must continue
// across transient batch failures (e.g., provider RPC unavailable) so we don't get wedged on
// the first failed call.
func TestConfigPollerV2_BatchErrorDoesNotPoisonAttemptedSet(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
cPollerV2.refreshPeriod = 30 * time.Second
ctx := context.Background()

sourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2}
expectedChainConfig := createMockChainConfigSnapshot()
fullSourceConfigs := createMockSourceChainConfigs(sourceChains)
var nilSourceConfigs map[cciptypes.ChainSelector]cciptypes.SourceChainConfig

// First call: top-level batch errors (simulates RPC provider unavailable).
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(cciptypes.ChainConfigSnapshot{}, nilSourceConfigs, errors.New("rpc unavailable")).Once()

_, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.Error(t, err)

// attemptedSourceChains must remain empty after a batch-level error so the next call
// is treated as a true cache miss and triggers another inline fetch.
destCache := cPollerV2.getOrCreateChainCache(destChain)
require.NotNil(t, destCache)
destCache.sourceChainMu.RLock()
assert.Empty(t, destCache.attemptedSourceChains,
"attemptedSourceChains must not be populated when GetAllConfigsLegacy returns an error")
destCache.sourceChainMu.RUnlock()

// Second call: top-level batch now succeeds for both chains.
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, fullSourceConfigs, nil).Once()

configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs, 2)
assert.Contains(t, configs, sourceChain1)
assert.Contains(t, configs, sourceChain2)

// Verify GetAllConfigsLegacy was called exactly twice (one error + one success).
accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 2)
}

// TestConfigPollerV2_InlineRetryResumesAfterBackoff verifies that once the suppression window
// elapses, a still-missing source chain triggers another inline fetch. This guards the recovery
// path used when the background poller is unhealthy or has been killed.
func TestConfigPollerV2_InlineRetryResumesAfterBackoff(t *testing.T) {
cPollerV2, accessors := setupConfigPollerV2(t)
// Very short backoff so the test can wait it out without flakiness.
cPollerV2.refreshPeriod = 50 * time.Millisecond
ctx := context.Background()

sourceChains := []cciptypes.ChainSelector{sourceChain1, sourceChain2}
expectedChainConfig := createMockChainConfigSnapshot()

// First fetch: sourceChain1 succeeds, sourceChain2 is missing.
partialSourceConfigs := createMockSourceChainConfigs([]cciptypes.ChainSelector{sourceChain1})
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, partialSourceConfigs, nil).Once()

configs, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs, 1)
assert.Contains(t, configs, sourceChain1)

// While the suppression window is open, no additional fetch should fire.
configs2, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs2, 1)
accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 1)

// Wait for the backoff to elapse, then expect an inline retry that recovers sourceChain2.
time.Sleep(75 * time.Millisecond)

fullSourceConfigs := createMockSourceChainConfigs(sourceChains)
accessors[destChain].On(
"GetAllConfigsLegacy",
mock.Anything,
destChain,
mock.MatchedBy(chainSelectorSliceMatcher(sourceChains))).
Return(expectedChainConfig, fullSourceConfigs, nil).Once()

configs3, err := cPollerV2.GetOfframpSourceChainConfigs(ctx, destChain, sourceChains)
require.NoError(t, err)
assert.Len(t, configs3, 2)
assert.Contains(t, configs3, sourceChain1)
assert.Contains(t, configs3, sourceChain2)

accessors[destChain].AssertNumberOfCalls(t, "GetAllConfigsLegacy", 2)
}
Loading