Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/pushsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type metrics struct {
ReceiptDepth *prometheus.CounterVec
ShallowReceiptDepth *prometheus.CounterVec
ShallowReceipt prometheus.Counter
OverdraftRefresh prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -146,6 +147,12 @@ func newMetrics() metrics {
},
[]string{"depth"},
),
OverdraftRefresh: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "overdraft_refresh",
Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.",
}),
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
olog "github.com/opentracing/opentracing-go/log"
"golang.org/x/time/rate"
)

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -99,6 +100,7 @@ type PushSync struct {
stabilizer stabilization.Subscriber

shallowReceiptTolerance uint8
overDraftRefreshLimiter *rate.Limiter
}

type receiptResult struct {
Expand Down Expand Up @@ -148,6 +150,7 @@ func New(
errSkip: skippeers.NewList(time.Minute),
stabilizer: stabilizer,
shallowReceiptTolerance: shallowReceiptTolerance,
overDraftRefreshLimiter: rate.NewLimiter(rate.Every(time.Second), 1),
}

ps.validStamp = ps.validStampWrapper(validStamp)
Expand Down Expand Up @@ -424,7 +427,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
continue // there is still an inflight request, wait for it's result
}

ps.logger.Debug("sleeping to refresh overdraft balance", "chunk_address", ch.Address())
ps.metrics.OverdraftRefresh.Inc()
if ps.overDraftRefreshLimiter.Allow() {
ps.logger.Debug("sleeping to refresh overdraft balance")
}

select {
case <-time.After(overDraftRefresh):
Expand Down
Loading