Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions oracle/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func aggregatePriceSources(sources map[string]*priceUpdate) float64 {
if weight == 0 {
continue
}
// Skip zero or negative prices.
if entry.price <= 0 {
continue
}
totalWeight += weight
weightedSum += weight * entry.price
}
Expand Down
15 changes: 9 additions & 6 deletions oracle/diviner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/bisoncraft/mesh/oracle/sources"
)

const fetchUpdatesTimeout = 30 * time.Second

// diviner wraps a Source and handles periodic fetching and emitting of
// price and fee rate updates.
type diviner struct {
Expand Down Expand Up @@ -96,11 +98,9 @@ func (d *diviner) fetchUpdates(ctx context.Context) error {
}
}

go func() {
if err := d.publishUpdate(ctx, update); err != nil {
d.log.Errorf("Failed to publish oracle update: %v", err)
}
}()
if err := d.publishUpdate(ctx, update); err != nil {
d.log.Errorf("Failed to publish oracle update: %v", err)
}

return nil
}
Expand All @@ -126,7 +126,10 @@ func (d *diviner) run(ctx context.Context) {
d.nextFetchInfo.Store(info)
d.fireScheduleChanged(info)
case <-timer.C:
if err := d.fetchUpdates(ctx); err != nil {
fetchCtx, cancel := context.WithTimeout(ctx, fetchUpdatesTimeout)
err := d.fetchUpdates(fetchCtx)
cancel()
if err != nil {
d.log.Errorf("Failed to fetch divination: %v", err)
// Retry after 1 minute on errors.
const errPeriod = time.Minute
Expand Down
8 changes: 7 additions & 1 deletion oracle/fetch_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package oracle
import (
"sync"
"time"

"github.com/decred/slog"
)

const trackingPeriod = 24 * time.Hour
Expand All @@ -16,6 +18,7 @@ type fetchRecord struct {

// fetchTracker tracks fetch events for the past 24 hours.
type fetchTracker struct {
log slog.Logger
mtx sync.Mutex
records []fetchRecord
// To reduce memory, records store uint16 IDs rather than full strings.
Expand All @@ -31,8 +34,9 @@ type fetchTracker struct {
}

// newFetchTracker creates a new fetchTracker.
func newFetchTracker() *fetchTracker {
func newFetchTracker(log slog.Logger) *fetchTracker {
return &fetchTracker{
log: log,
sourceIDs: make(map[string]uint16),
nodeIDs: make(map[string]uint16),
counts: make(map[uint16]map[uint16]int),
Expand All @@ -46,10 +50,12 @@ func (ft *fetchTracker) recordFetch(source, nodeID string, stamp time.Time) {
defer ft.mtx.Unlock()
sourceID, ok := assignID(source, ft.sourceIDs, &ft.sourceNames, &ft.nextSourceID)
if !ok {
ft.log.Warnf("source ID space exhausted, disabling fetch tracking for source: %s", source)
return
}
nodeIDInt, ok := assignID(nodeID, ft.nodeIDs, &ft.nodeNames, &ft.nextNodeID)
if !ok {
ft.log.Warnf("node ID space exhausted, disabling fetch tracking for node: %s", nodeID)
return
}
r := fetchRecord{
Expand Down
57 changes: 52 additions & 5 deletions oracle/fetch_tracker_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package oracle

import (
"os"
"testing"
"time"

"github.com/decred/slog"
)

func TestFetchTracker_RecordAndCounts(t *testing.T) {
ft := newFetchTracker()
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)
now := time.Now()

ft.recordFetch("source1", "node-a", now)
Expand All @@ -27,7 +32,9 @@ func TestFetchTracker_RecordAndCounts(t *testing.T) {
}

func TestFetchTracker_LatestPerSource(t *testing.T) {
ft := newFetchTracker()
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)
now := time.Now()

ft.recordFetch("source1", "node-a", now.Add(-time.Hour))
Expand All @@ -45,7 +52,9 @@ func TestFetchTracker_LatestPerSource(t *testing.T) {
}

func TestFetchTracker_CountsExcludes24hOld(t *testing.T) {
ft := newFetchTracker()
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)
now := time.Now()

ft.recordFetch("source1", "node-a", now.Add(-25*time.Hour))
Expand All @@ -58,7 +67,9 @@ func TestFetchTracker_CountsExcludes24hOld(t *testing.T) {
}

func TestFetchTracker_OutOfOrderExpiry(t *testing.T) {
ft := newFetchTracker()
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)
now := time.Now()

// Insert a recent record followed by an expired one (out of order).
Expand All @@ -72,7 +83,9 @@ func TestFetchTracker_OutOfOrderExpiry(t *testing.T) {
}

func TestFetchTracker_Empty(t *testing.T) {
ft := newFetchTracker()
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)

counts := ft.fetchCounts()
if len(counts) != 0 {
Expand All @@ -84,3 +97,37 @@ func TestFetchTracker_Empty(t *testing.T) {
t.Errorf("expected empty latest, got %d entries", len(latest))
}
}

func TestFetchTracker_IDExhaustionLogging(t *testing.T) {
// This test verifies that when ID space exhausts, a warning is logged.
// We simulate exhaustion by forcing nextSourceID and nextNodeID to their max.
backend := slog.NewBackend(os.Stdout)
log := backend.Logger("test")
ft := newFetchTracker(log)

// Manually exhaust the source ID space
ft.nextSourceID = ^uint16(0) // Max uint16 value (65535)

now := time.Now()
// This should log a warning about source ID space exhaustion
ft.recordFetch("new-source", "node-a", now)

// Verify that no fetch was recorded (since source ID assignment failed)
counts := ft.fetchCounts()
if len(counts) != 0 {
t.Errorf("expected no fetches recorded after source ID exhaustion, got %d sources", len(counts))
}

// Reset source ID space and exhaust node ID space
ft.nextSourceID = 0
ft.nextNodeID = ^uint16(0) // Max uint16 value

// This should log a warning about node ID space exhaustion
ft.recordFetch("source1", "new-node", now)

// Verify that no fetch was recorded (since node ID assignment failed)
counts = ft.fetchCounts()
if len(counts) != 0 {
t.Errorf("expected no fetches recorded after node ID exhaustion, got %d sources", len(counts))
}
}
2 changes: 1 addition & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func New(cfg *Config) (*Oracle, error) {
publishUpdate: cfg.PublishUpdate,
onStateUpdate: cfg.OnStateUpdate,
quotaManager: quotaManager,
fetchTracker: newFetchTracker(),
fetchTracker: newFetchTracker(cfg.Log),
nodeID: cfg.NodeID,
}

Expand Down
Loading