Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ env:
REPLICA: 3
RUN_TYPE: "PR RUN"
SETUP_CONTRACT_IMAGE: "ethersphere/bee-localchain"
SETUP_CONTRACT_IMAGE_TAG: "0.9.4"
SETUP_CONTRACT_IMAGE_TAG: "0.9.3-rc4"
BEELOCAL_BRANCH: "main"
BEEKEEPER_BRANCH: "master"
BEEKEEPER_METRICS_ENABLED: false
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ethereum/go-ethereum v1.15.11
github.com/ethersphere/batch-archive v0.0.4
github.com/ethersphere/go-price-oracle-abi v0.6.9
github.com/ethersphere/go-storage-incentives-abi v0.9.4
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4
github.com/ethersphere/go-sw3-abi v0.6.9
github.com/ethersphere/langos v1.0.0
github.com/go-playground/validator/v10 v10.11.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ github.com/ethersphere/batch-archive v0.0.4 h1:PHmwQfmUEyDJgoX2IqI/R0alQ63+aLPXf
github.com/ethersphere/batch-archive v0.0.4/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU=
github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0=
github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s=
github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ=
github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
Expand Down
9 changes: 9 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type metrics struct {
LevelDBStats *prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter
WaitChunkRetrievalTime prometheus.Gauge

ReserveMissingBatch prometheus.Gauge
}
Expand Down Expand Up @@ -163,6 +164,14 @@ func newMetrics() metrics {
Help: "Number of times the expiry worker was fired.",
},
),
WaitChunkRetrievalTime: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "wait_chunk_retrieval_time",
Help: "Total time spent waiting for chunk retrieval across all workers.",
},
),
}
}

Expand Down
34 changes: 33 additions & 1 deletion pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,24 @@ func (db *DB) ReserveSample(
g, ctx := errgroup.WithContext(ctx)

allStats := &SampleStats{}
// Track total wait time across all workers
var totalWaitTime int64
var waitTimeLock sync.Mutex

statsLock := sync.Mutex{}
addStats := func(stats SampleStats) {
statsLock.Lock()
allStats.add(stats)
statsLock.Unlock()
}

// Function to safely add worker wait times to the total
addWaitTime := func(waitTime int64) {
waitTimeLock.Lock()
totalWaitTime += waitTime
waitTimeLock.Unlock()
}

t := time.Now()

excludedBatchIDs, err := db.batchesBelowValue(minBatchBalance)
Expand Down Expand Up @@ -125,11 +136,27 @@ func (db *DB) ReserveSample(
g.Go(func() error {
wstat := SampleStats{}
hasher := bmt.NewHasher(prefixHasherFactory)

// Track wait times
var (
lastChunkTime time.Time
waitTime int64 = 0
)

defer func() {
addStats(wstat)
addWaitTime(waitTime)
}()

for chItem := range chunkC {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this loops we want to measure only the waiting for a chunk or the total worker execution time? It seems that it measures the second one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. we need to lower this overall time.

// Measure wait time between chunks
now := time.Now()
if !lastChunkTime.IsZero() {
chunkWaitTime := now.Sub(lastChunkTime).Nanoseconds()
// Simply add to total wait time
waitTime += chunkWaitTime
}

// exclude chunks who's batches balance are below minimum
if _, found := excludedBatchIDs[string(chItem.BatchID)]; found {
wstat.BelowBalanceIgnored++
Expand Down Expand Up @@ -172,6 +199,8 @@ func (db *DB) ReserveSample(
case <-ctx.Done():
return ctx.Err()
}

lastChunkTime = time.Now()
}

return nil
Expand Down Expand Up @@ -259,6 +288,10 @@ func (db *DB) ReserveSample(

allStats.TotalDuration = time.Since(t)

// Set the total wait time metric
db.metrics.WaitChunkRetrievalTime.Set(float64(totalWaitTime))
db.logger.Debug("total chunk retrieval wait time", "wait_time", totalWaitTime)

if err := g.Wait(); err != nil {
db.logger.Info("reserve sampler finished with error", "err", err, "duration", time.Since(t), "storage_radius", committedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats))

Expand Down Expand Up @@ -421,7 +454,6 @@ func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error)
sort.Slice(items, func(i, j int) bool {
return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1
})

return Sample{Items: items}, nil
}

Expand Down
Loading