Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions oracle/diviner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"github.com/bisoncraft/mesh/oracle/sources"
)

const (
errBaseDelay = 10 * time.Second
errMaxDelay = 5 * time.Minute
)

// diviner wraps a Source and handles periodic fetching and emitting of
// price and fee rate updates.
type diviner struct {
Expand All @@ -22,6 +27,8 @@ type diviner struct {
nextFetchInfo atomic.Value // networkSchedule
errorInfo atomic.Value // fetchErrorInfo
getNetworkSchedule func() networkSchedule
errBaseDelay time.Duration
errMaxDelay time.Duration
}

type fetchErrorInfo struct {
Expand All @@ -43,7 +50,18 @@ func newDiviner(
resetTimer: make(chan struct{}),
getNetworkSchedule: getNetworkSchedule,
onScheduleChanged: onScheduleChanged,
errBaseDelay: errBaseDelay,
errMaxDelay: errMaxDelay,
}
}

// calcBackoff computes exponential backoff: min(baseDelay * 2^attempt, maxDelay).
func (d *diviner) calcBackoff(attempt int) time.Duration {
delay := d.errBaseDelay * (1 << uint(attempt))
if delay > d.errMaxDelay {
return d.errMaxDelay
}
return delay
}

// fetchScheduleInfo returns the current fetch schedule info.
Expand Down Expand Up @@ -115,6 +133,7 @@ func (d *diviner) reschedule() {
func (d *diviner) run(ctx context.Context) {
timer := time.NewTimer(0)
defer timer.Stop()
var consecutiveErrors int

for {
select {
Expand All @@ -128,19 +147,20 @@ func (d *diviner) run(ctx context.Context) {
case <-timer.C:
if err := d.fetchUpdates(ctx); err != nil {
d.log.Errorf("Failed to fetch divination: %v", err)
// Retry after 1 minute on errors.
const errPeriod = time.Minute
consecutiveErrors++
errTime := time.Now()
d.errorInfo.Store(fetchErrorInfo{message: err.Error(), stamp: errTime})
info := d.fetchScheduleInfo()
if info.NextFetchTime.IsZero() {
info = d.getNetworkSchedule()
}
info.NextFetchTime = errTime.Add(errPeriod)
backoff := d.calcBackoff(consecutiveErrors - 1)
info.NextFetchTime = errTime.Add(backoff)
d.nextFetchInfo.Store(info)
d.fireScheduleChanged(info)
timer.Reset(errPeriod)
timer.Reset(backoff)
} else {
consecutiveErrors = 0
d.errorInfo.Store(fetchErrorInfo{message: "", stamp: time.Time{}})
info := d.getNetworkSchedule()
timer.Reset(time.Until(info.NextFetchTime))
Expand Down
107 changes: 96 additions & 11 deletions oracle/diviner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ func (m *mockSource) FetchRates(ctx context.Context) (*sources.RateInfo, error)
return m.fetchFunc(ctx)
}

func newTestDiviner(
src sources.Source,
publishUpdate func(ctx context.Context, update *OracleUpdate) error,
log slog.Logger,
getNetworkSchedule func() networkSchedule,
onScheduleChanged func(*OracleSnapshot),
baseDelay, maxDelay time.Duration,
) *diviner {
return &diviner{
source: src,
log: log,
publishUpdate: publishUpdate,
resetTimer: make(chan struct{}),
getNetworkSchedule: getNetworkSchedule,
onScheduleChanged: onScheduleChanged,
errBaseDelay: baseDelay,
errMaxDelay: maxDelay,
}
}

func TestDiviner(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -106,6 +126,15 @@ func TestDiviner(t *testing.T) {
fetchErr: fmt.Errorf("fetch error"),
expectErrorMsg: true,
},
{
name: "consecutive fetch failures",
quota: &sources.QuotaStatus{
FetchesRemaining: 42,
FetchesLimit: 100,
},
fetchErr: fmt.Errorf("fetch error"),
expectErrorMsg: true,
},
}

for _, test := range tests {
Expand All @@ -114,6 +143,7 @@ func TestDiviner(t *testing.T) {
log := slog.NewBackend(os.Stdout).Logger("test")

resetTime := time.Now().Add(10 * time.Minute)
var fetchCount int
src := &mockSource{
name: "test-source",
weight: 0.8,
Expand All @@ -126,6 +156,14 @@ func TestDiviner(t *testing.T) {
},
fetchFunc: func(ctx context.Context) (*sources.RateInfo, error) {
if test.fetchErr != nil {
// For consecutive failures test, allow two failures
if test.name == "consecutive fetch failures" {
fetchCount++
if fetchCount <= 2 {
return nil, test.fetchErr
}
return test.rateInfo, nil
}
return nil, test.fetchErr
}
return test.rateInfo, nil
Expand Down Expand Up @@ -156,27 +194,55 @@ func TestDiviner(t *testing.T) {
scheduleCh <- update
}

div := newDiviner(src, publishUpdate, log, getNetworkSchedule, onScheduleChanged)
var div *diviner
if test.name == "consecutive fetch failures" {
// Use short delays for faster test
div = newTestDiviner(src, publishUpdate, log, getNetworkSchedule, onScheduleChanged, 10*time.Millisecond, 500*time.Millisecond)
} else {
div = newDiviner(src, publishUpdate, log, getNetworkSchedule, onScheduleChanged)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go div.run(ctx)

var (
update *OracleUpdate
scheduleUpdate *OracleSnapshot
update *OracleUpdate
scheduleUpdate *OracleSnapshot
scheduleUpdates []*OracleSnapshot
)

deadline := time.After(10 * time.Second)
for update == nil || scheduleUpdate == nil {
deadline := time.After(500 * time.Millisecond)
isConsecutiveTest := test.name == "consecutive fetch failures"

for {
select {
case update = <-updateCh:
case scheduleUpdate = <-scheduleCh:
case u := <-updateCh:
update = u
case s := <-scheduleCh:
if scheduleUpdate == nil {
scheduleUpdate = s
}
scheduleUpdates = append(scheduleUpdates, s)
case <-deadline:
t.Fatal("Timed out waiting for updates")
}
if test.fetchErr != nil && scheduleUpdate != nil {
break

if isConsecutiveTest {
// For consecutive failures: need two schedule updates
if len(scheduleUpdates) >= 2 {
break
}
} else if test.fetchErr != nil {
// For error cases: just need one schedule update
if scheduleUpdate != nil {
break
}
} else {
// For success cases: need both update and schedule update
if update != nil && scheduleUpdate != nil {
break
}
}
}

Expand Down Expand Up @@ -233,8 +299,27 @@ func TestDiviner(t *testing.T) {
t.Fatalf("Unexpected schedule update source status: %#v", srcStatus)
}

if test.fetchErr != nil && srcStatus.NextFetchTime.Sub(baseTime) < 50*time.Second {
t.Errorf("Expected retry next fetch to be ~1 minute later, got %v", srcStatus.NextFetchTime.Sub(baseTime))
if test.name == "consecutive fetch failures" {
if len(scheduleUpdates) < 2 {
t.Fatalf("Expected 2 schedule updates, got %d", len(scheduleUpdates))
}

firstStatus := scheduleUpdates[0].Sources["test-source"]
if firstStatus == nil {
t.Fatal("Expected first schedule update to contain 'test-source'")
}

secondStatus := scheduleUpdates[1].Sources["test-source"]
if secondStatus == nil {
t.Fatal("Expected second schedule update to contain 'test-source'")
}

delay := secondStatus.NextFetchTime.Sub(*firstStatus.NextFetchTime)
expected := 20 * time.Millisecond
tolerance := 5 * time.Millisecond
if delay < expected-tolerance || delay > expected+tolerance {
t.Errorf("Expected second retry backoff ~20ms, got %v", delay)
}
}
})
}
Expand Down